| — основа UNIX-композиции
Pipe | соединяет stdout первой команды со stdin второй. Это самый мощный механизм shell — он позволяет строить сложные pipelines из простых команд.
$ cmd1 | cmd2 | cmd3
# stdin -> cmd1 -> cmd2 -> cmd3 -> stdout
Каждая команда — отдельный процесс, читает stdin, пишет в stdout. Kernel между ними создаёт pipe (pipe() syscall) — кольцевой буфер в памяти (~64 KB на Linux). Процессы работают параллельно: пока cmd1 пишет, cmd2 уже читает и обрабатывает.
# Простой пример:
$ cat /etc/passwd | grep bash | wc -l
3
# cat -> grep -> wc, параллельно
UNIX-философия в действии
Ken Thompson, Dennis Ritchie и Brian Kernighan сформулировали принципы UNIX:
- Каждая программа делает одно — но хорошо
- Программы соединяются через текстовые потоки
- Текстовые потоки — универсальный интерфейс
Pipe — это реализация этой философии. Не нужна одна «mega-tool» для всего — нужны маленькие, специализированные инструменты, которые комбинируются.
# Топ-10 ошибочных DAG за день — без mega-tool, но из 6 простых:
$ cat /var/log/airflow/scheduler.log \
| grep ERROR \
| awk '{print $7}' \
| sort \
| uniq -c \
| sort -rn \
| head -10
Каждый шаг — отдельная программа:
cat— потокgrep— фильтр по patternawk— extract колонкиsort— упорядочитьuniq -c— group + countsort -rn— отсортировать по counthead— top N
Любой шаг можно заменить или вставить новый. Это modular design на уровне OS.
Под капотом: что делает kernel
$ cmd1 | cmd2
Шаги:
- Shell делает
pipe(fds)— kernel создаёт kernel-buffer (~64 KB), возвращает два FD:fds[0](read end),fds[1](write end). fork()для cmd1. В child:dup2(fds[1], 1)— stdout cmd1 -> pipe-writeclose(fds[0]); close(fds[1])— закрыть оригинальные FDexecve("cmd1")
fork()для cmd2. В child:dup2(fds[0], 0)— stdin cmd2 ← pipe-readclose(fds[0]); close(fds[1])execve("cmd2")
- Shell в parent закрывает оба
fds[0],fds[1]и ждёт.
Cmd1 пишет в pipe, kernel буферизирует, cmd2 читает. Когда cmd1 завершится — pipe-write end закрывается, cmd2 получает EOF на следующем read.
Параллельность: cmd1 и cmd2 — независимые процессы. Они выполняются одновременно. Если cmd2 медленнее, kernel buffer заполнится и cmd1 блокируется на write() пока cmd2 не прочитает. Это back-pressure механизм — не нужно ничего настраивать.
Реальные DE-pipeline’ы
1. Полный pipeline: log -> metrics
$ tail -F /var/log/nginx/access.log \
| awk '{print $9, $7}' \
| grep -v ^200 \
| awk '{counts[$2]++} END {for (u in counts) print counts[u], u}' \
| sort -rn | head -20
Realtime-monitoring: топ-20 URL с не-200 status кодами.
2. Multi-source aggregation
$ zcat /var/log/*.log.*.gz \
| grep ERROR \
| awk -F'service=' '{print $2}' \
| awk '{print $1}' \
| sort | uniq -c | sort -rn
Разархивирует все .gz логи, ищет ERROR, group by service. Один pipeline вместо bash-loop с zgrep.
3. Conditional execution
$ grep -q ERROR /var/log/app.log && echo "Errors found!" | mail -s "Alert" [email protected]
# Если grep нашёл ERROR (exit 0) — отправить email
&& — exec следующую команду только если предыдущая успешна (exit 0).
4. Streaming данных через несколько transformations
$ ./extract.py | python3 transform.py | psql -c "COPY orders FROM stdin CSV"
# Python extract -> Python transform -> COPY в Postgres напрямую через stdin
# Без temp-файлов, всё streamable
Это classic ETL без intermediate storage — данные просто текут через программы.
Pipes изнутри — kernel-буфер и IPCpipefail: правильные exit codes
По default, exit code pipeline = exit code последней команды:
$ false | true
$ echo $?
0 # Хотя false вернул 1!
Это часто скрывает ошибки. В bash есть set -o pipefail:
$ set -o pipefail
$ false | true
$ echo $?
1 # Теперь exit ненулевой если ЛЮБАЯ команда pipeline упала
Полный exit code = первый ненулевой из всех команд (или 0 если все 0).
В production-скриптах:
#!/bin/bash
set -euo pipefail
# -e: exit on error
# -u: error on unset variable
# -o pipefail: errors in pipeline propagate
curl -fsSL "$URL" | jq '.users[]' | psql -c "INSERT ..."
# Если curl упадёт (например, 500 error) — скрипт остановится
# Без pipefail: jq получит пустой input, выйдет 0, psql вставит ноль строк, скрипт «успешен»
set -euo pipefail — must-have в start каждого production-bash-скрипта.
Большие данные через pipe
Pipe можно использовать для terabyte-scale data, потому что данные streamable:
# Скачать gzip-CSV, декодировать, фильтровать, агрегировать —
# всё без интермедиэйт файлов, без полной загрузки в память:
$ curl -sL https://example.com/huge.csv.gz \
| gunzip \
| awk -F',' 'NR > 1 && $5 == "USA" {sum += $3} END {print sum}'
Это buffer-size memory — десятки KB, не размер файла. Linux pipes — это рабочая лошадка ETL даже на TB-данных.
Pipe vs xargs
Pipe передаёт stdin одной команды на stdin следующей. Если нужно использовать вывод как аргументы, не stdin — нужен xargs:
# rm каждый файл из списка:
$ find . -name '*.tmp' | xargs rm
# find выводит имена; xargs строит rm file1 file2 ... и запускает
# Сравни с pipe:
$ find . -name '*.tmp' | rm
# Не работает! rm не читает имена со stdin, ждёт аргументы.
xargs — про это в уроке 05.
SIGPIPE и broken pipe
$ yes | head -3
y
y
y
yes бесконечно пишет ‘y\n’. head -3 берёт 3 строки и закрывается. Что происходит с yes?
Когда head закрывается, kernel закрывает read end pipe. Следующая попытка yes написать в pipe вызывает SIGPIPE — сигнал, который по default kill’ит процесс. yes завершается.
Это elegant механизм: верхние команды pipeline останавливаются, когда нижние больше не читают. Не нужно отдельно их kill’ить.
Программы могут перехватить SIGPIPE и обработать gracefully (Python через signal.signal(signal.SIGPIPE, ...)). Многие cli-инструменты делают это.
В bash в скриптах bekanntes сообщение «broken pipe» иногда выскакивает — это и есть SIGPIPE. Обычно безвредно если упало то, что выше head.
Команды, которые ломают pipeline
Некоторые команды не работают с pipe by default:
$ echo "hello" | grep o # [x] работает
$ echo "hello" | vim # [X] vim не интерактивен
$ echo "hello" | rm # [X] rm не читает stdin
$ echo "hello" | cp dest # [X] cp не читает stdin
Многие GUI/interactive tools не работают. И многие команды, которые ждут аргументы файлов (rm, cp, mv), нужно использовать с xargs:
$ echo "/tmp/file" | xargs rm
$ find . -name '*.tmp' | xargs rm
Попробуй сам
- Базовый pipe:
ls /etc | wc -l - Длинный pipeline:
cat /etc/passwd | awk -F':' '{print $7}' | sort | uniq -c | sort -rn # Распределение shells - pipefail demo:
bash -c 'false | true; echo $?' bash -c 'set -o pipefail; false | true; echo $?' # Первая выведет 0, вторая — 1 - SIGPIPE:
yes | head -3 # head закрывается, yes получает SIGPIPE - Кросс-platform ETL:
echo "alice,30,USA" | awk -F',' '{print $1, $3}' | tr ' ' '|' # alice|USA
macOS-различия
- Pipes — POSIX, работают идентично на всех Unix-системах.
- Pipe buffer size на macOS — 64 KB (был 16KB на старых macOS, но
pipe(2)says 16KB; в реальности 64KB после Sierra). pipefail— POSIX опция, есть на bash и zsh.- SIGPIPE — POSIX, работает идентично.
Главное
|соединяет stdout первой команды со stdin второй через kernel buffer (~64 KB).- Команды pipeline работают параллельно, kernel buffer обеспечивает back-pressure.
- UNIX-философия: каждая программа делает одно; программы соединяются через text streams.
set -o pipefail— must-have для production. Без него exit code = последней команды, ошибки скрываются.- Streaming через pipe — обрабатывает данные любого размера с фиксированной memory.
- SIGPIPE автоматически завершает upstream-команды, когда downstream закрывается.
- Для использования вывода как аргументов (не stdin) —
xargs. - Не все команды работают с pipe (rm, cp, vim, mv по аргументам) — нужны
xargs.