Learning Platform
Глоссарий Troubleshooting
Урок 10.03 · 25 мин
Начальный
PipesCompositionUNIX philosophypipefailPipeline

| — основа UNIX-композиции

Pipe | соединяет stdout первой команды со stdin второй. Это самый мощный механизм shell — он позволяет строить сложные pipelines из простых команд.

$ cmd1 | cmd2 | cmd3
# stdin -> cmd1 -> cmd2 -> cmd3 -> stdout

Каждая команда — отдельный процесс, читает stdin, пишет в stdout. Kernel между ними создаёт pipe (pipe() syscall) — кольцевой буфер в памяти (~64 KB на Linux). Процессы работают параллельно: пока cmd1 пишет, cmd2 уже читает и обрабатывает.

# Простой пример:
$ cat /etc/passwd | grep bash | wc -l
3
# cat -> grep -> wc, параллельно

UNIX-философия в действии

Ken Thompson, Dennis Ritchie и Brian Kernighan сформулировали принципы UNIX:

  1. Каждая программа делает одно — но хорошо
  2. Программы соединяются через текстовые потоки
  3. Текстовые потоки — универсальный интерфейс

Pipe — это реализация этой философии. Не нужна одна «mega-tool» для всего — нужны маленькие, специализированные инструменты, которые комбинируются.

# Топ-10 ошибочных DAG за день — без mega-tool, но из 6 простых:
$ cat /var/log/airflow/scheduler.log \
    | grep ERROR \
    | awk '{print $7}' \
    | sort \
    | uniq -c \
    | sort -rn \
    | head -10

Каждый шаг — отдельная программа:

  1. cat — поток
  2. grep — фильтр по pattern
  3. awk — extract колонки
  4. sort — упорядочить
  5. uniq -c — group + count
  6. sort -rn — отсортировать по count
  7. head — top N

Любой шаг можно заменить или вставить новый. Это modular design на уровне OS.

Под капотом: что делает kernel

$ cmd1 | cmd2

Шаги:

  1. Shell делает pipe(fds) — kernel создаёт kernel-buffer (~64 KB), возвращает два FD: fds[0] (read end), fds[1] (write end).
  2. fork() для cmd1. В child:
    • dup2(fds[1], 1) — stdout cmd1 -> pipe-write
    • close(fds[0]); close(fds[1]) — закрыть оригинальные FD
    • execve("cmd1")
  3. fork() для cmd2. В child:
    • dup2(fds[0], 0) — stdin cmd2 ← pipe-read
    • close(fds[0]); close(fds[1])
    • execve("cmd2")
  4. Shell в parent закрывает оба fds[0], fds[1] и ждёт.

Cmd1 пишет в pipe, kernel буферизирует, cmd2 читает. Когда cmd1 завершится — pipe-write end закрывается, cmd2 получает EOF на следующем read.

Параллельность: cmd1 и cmd2 — независимые процессы. Они выполняются одновременно. Если cmd2 медленнее, kernel buffer заполнится и cmd1 блокируется на write() пока cmd2 не прочитает. Это back-pressure механизм — не нужно ничего настраивать.

Реальные DE-pipeline’ы

1. Полный pipeline: log -> metrics

$ tail -F /var/log/nginx/access.log \
    | awk '{print $9, $7}' \
    | grep -v ^200 \
    | awk '{counts[$2]++} END {for (u in counts) print counts[u], u}' \
    | sort -rn | head -20

Realtime-monitoring: топ-20 URL с не-200 status кодами.

2. Multi-source aggregation

$ zcat /var/log/*.log.*.gz \
    | grep ERROR \
    | awk -F'service=' '{print $2}' \
    | awk '{print $1}' \
    | sort | uniq -c | sort -rn

Разархивирует все .gz логи, ищет ERROR, group by service. Один pipeline вместо bash-loop с zgrep.

3. Conditional execution

$ grep -q ERROR /var/log/app.log && echo "Errors found!" | mail -s "Alert" [email protected]
# Если grep нашёл ERROR (exit 0) — отправить email

&& — exec следующую команду только если предыдущая успешна (exit 0).

4. Streaming данных через несколько transformations

$ ./extract.py | python3 transform.py | psql -c "COPY orders FROM stdin CSV"
# Python extract -> Python transform -> COPY в Postgres напрямую через stdin
# Без temp-файлов, всё streamable

Это classic ETL без intermediate storage — данные просто текут через программы.

Pipes изнутри — kernel-буфер и IPC

pipefail: правильные exit codes

По default, exit code pipeline = exit code последней команды:

$ false | true
$ echo $?
0    # Хотя false вернул 1!

Это часто скрывает ошибки. В bash есть set -o pipefail:

$ set -o pipefail
$ false | true
$ echo $?
1    # Теперь exit ненулевой если ЛЮБАЯ команда pipeline упала

Полный exit code = первый ненулевой из всех команд (или 0 если все 0).

В production-скриптах:

#!/bin/bash
set -euo pipefail
# -e: exit on error
# -u: error on unset variable
# -o pipefail: errors in pipeline propagate

curl -fsSL "$URL" | jq '.users[]' | psql -c "INSERT ..."
# Если curl упадёт (например, 500 error) — скрипт остановится
# Без pipefail: jq получит пустой input, выйдет 0, psql вставит ноль строк, скрипт «успешен» 

set -euo pipefail — must-have в start каждого production-bash-скрипта.

Большие данные через pipe

Pipe можно использовать для terabyte-scale data, потому что данные streamable:

# Скачать gzip-CSV, декодировать, фильтровать, агрегировать —
# всё без интермедиэйт файлов, без полной загрузки в память:
$ curl -sL https://example.com/huge.csv.gz \
    | gunzip \
    | awk -F',' 'NR > 1 && $5 == "USA" {sum += $3} END {print sum}'

Это buffer-size memory — десятки KB, не размер файла. Linux pipes — это рабочая лошадка ETL даже на TB-данных.

Pipe vs xargs

Pipe передаёт stdin одной команды на stdin следующей. Если нужно использовать вывод как аргументы, не stdin — нужен xargs:

# rm каждый файл из списка:
$ find . -name '*.tmp' | xargs rm
# find выводит имена; xargs строит rm file1 file2 ... и запускает

# Сравни с pipe:
$ find . -name '*.tmp' | rm
# Не работает! rm не читает имена со stdin, ждёт аргументы.

xargs — про это в уроке 05.

SIGPIPE и broken pipe

$ yes | head -3
y
y
y

yes бесконечно пишет ‘y\n’. head -3 берёт 3 строки и закрывается. Что происходит с yes?

Когда head закрывается, kernel закрывает read end pipe. Следующая попытка yes написать в pipe вызывает SIGPIPE — сигнал, который по default kill’ит процесс. yes завершается.

Это elegant механизм: верхние команды pipeline останавливаются, когда нижние больше не читают. Не нужно отдельно их kill’ить.

Программы могут перехватить SIGPIPE и обработать gracefully (Python через signal.signal(signal.SIGPIPE, ...)). Многие cli-инструменты делают это.

В bash в скриптах bekanntes сообщение «broken pipe» иногда выскакивает — это и есть SIGPIPE. Обычно безвредно если упало то, что выше head.

Команды, которые ломают pipeline

Некоторые команды не работают с pipe by default:

$ echo "hello" | grep o     # [x] работает
$ echo "hello" | vim         # [X] vim не интерактивен
$ echo "hello" | rm          # [X] rm не читает stdin
$ echo "hello" | cp dest     # [X] cp не читает stdin

Многие GUI/interactive tools не работают. И многие команды, которые ждут аргументы файлов (rm, cp, mv), нужно использовать с xargs:

$ echo "/tmp/file" | xargs rm
$ find . -name '*.tmp' | xargs rm

Попробуй сам

  1. Базовый pipe:
    ls /etc | wc -l
  2. Длинный pipeline:
    cat /etc/passwd | awk -F':' '{print $7}' | sort | uniq -c | sort -rn
    # Распределение shells
  3. pipefail demo:
    bash -c 'false | true; echo $?'
    bash -c 'set -o pipefail; false | true; echo $?'
    # Первая выведет 0, вторая — 1
  4. SIGPIPE:
    yes | head -3
    # head закрывается, yes получает SIGPIPE
  5. Кросс-platform ETL:
    echo "alice,30,USA" | awk -F',' '{print $1, $3}' | tr ' ' '|'
    # alice|USA

macOS-различия

  • Pipes — POSIX, работают идентично на всех Unix-системах.
  • Pipe buffer size на macOS — 64 KB (был 16KB на старых macOS, но pipe(2) says 16KB; в реальности 64KB после Sierra).
  • pipefail — POSIX опция, есть на bash и zsh.
  • SIGPIPE — POSIX, работает идентично.
Проверка знанийKnowledge check
Объясни: почему 'cat huge.log | grep ERROR | head' не загружает huge.log целиком в память, даже если файл 100 GB?
ОтветAnswer
Pipe — это streaming, не batch. Kernel создаёт kernel buffer ~64 KB между процессами. Cat читает huge.log chunk-by-chunk (обычно 4-64 KB blocks) и пишет в pipe. Grep читает из pipe, обрабатывает построчно, выводит совпадения в свой pipe-к-head. Head читает первые 10 строк и закрывается. Когда head закрывается, kernel закрывает read-end второго pipe. Следующая попытка grep написать в этот pipe вызывает SIGPIPE — grep умирает. Когда grep умирает, kernel закрывает read-end первого pipe. Cat при следующем write в первый pipe тоже получает SIGPIPE и умирает. Весь pipeline завершается за миллисекунды, прочитав от huge.log только tot объём, который понадобился grep чтобы найти 10 ERROR-строк. Память: каждый процесс использует свои несколько MB (cat — буферы чтения, grep — regex DFA, head — счётчик). Пропускная способность ограничена pipe buffer (~64KB) — kernel блокирует upstream когда downstream медленнее. Это в bash называется back-pressure, в облачных платформах — async streaming. Это та же модель что Spark Streaming или Kafka Streams — но 'голым' kernel-mechanism, без фреймворков. Поэтому Linux pipes пригодны для TB-data ETL.

Главное

  • | соединяет stdout первой команды со stdin второй через kernel buffer (~64 KB).
  • Команды pipeline работают параллельно, kernel buffer обеспечивает back-pressure.
  • UNIX-философия: каждая программа делает одно; программы соединяются через text streams.
  • set -o pipefail — must-have для production. Без него exit code = последней команды, ошибки скрываются.
  • Streaming через pipe — обрабатывает данные любого размера с фиксированной memory.
  • SIGPIPE автоматически завершает upstream-команды, когда downstream закрывается.
  • Для использования вывода как аргументов (не stdin) — xargs.
  • Не все команды работают с pipe (rm, cp, vim, mv по аргументам) — нужны xargs.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что физически происходит, когда ты запускаешь 'cmd1 | cmd2'?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5