В прошлом уроке мы оставили один вопрос открытым: на шаге 5 shuffle path NettyServerHandler «ждёт credits от downstream». Что это значит? Кто кому что выдаёт? Откуда credits берутся? Этот урок — про то, как именно Flink контролирует поток данных между producer и consumer.
Pipes в Linux: anonymous pipes и backpressure через bufferКак было: TCP-only backpressure (до 1.5)
До версии 1.5 (2018, FLINK-7282) Flink полагался на встроенный TCP flow control. Идея простая: если consumer не успевает читать из сокета, TCP window закрывается, и producer block’ится на write(). Это работает, но даёт три серьёзные проблемы:
Проблема 1: один TCP-канал — один backpressure-домен. Между двумя TM Flink держит один TCP-соединение, через которое мультиплексируются все subpartition-пары. Если один медленный consumer на одной из subpartition заблокировал канал — все остальные subpartition тоже замирают. Это «head-of-line blocking».
Проблема 2: гранулярность. TCP видит байты, а не логические записи. Backpressure включается на уровне сокета, и producer узнаёт о нём только когда socket.write() вернёт WouldBlock. К этому моменту в socket buffer уже накопилось десятки KiB.
Проблема 3 — критическая для Flink: checkpoint barriers застревают. Когда producer хочет вставить checkpoint barrier в поток, он шлёт его как обычную запись. Если канал заблокирован TCP backpressure’ом — barrier не идёт через сокет, и downstream не начинает свой checkpoint. Это раздувает время checkpoint’а и в худшем случае приводит к таймаутам.
Команда Flink перепроектировала весь network stack, и в 1.5 появился credit-based flow control. С тех пор это дефолт, и legacy режим даже выкинули в 1.16.
Идея credit-based: бухгалтерия на уровне subpartition
Credit — это разрешение отправить один буфер. Consumer выдаёт producer’у конкретное число credits (например, «я готов принять 3 буфера от subpartition X»), и producer не может отправить больше. Когда consumer освобождает буфер (десериализовал, отдал в оператор) — он шлёт CreditNotification обратно: «+1 credit на subpartition X».
Главное: бухгалтерия ведётся per subpartition, а не на TCP-канал. Если один downstream забит, его credits = 0, и producer перестаёт ему слать. Но соседняя subpartition в том же TCP-канале спокойно продолжает работать. Никакого head-of-line blocking.
Consumer хранит счётчик credits на каждый RemoteInputChannel. При каждом освобождении буфера — +1 credit. Producer хранит зеркало этого счётчика и шлёт буферы, только если есть credits.
Производитель не блокируется на write() — он просто не зовёт write(), если credits нет. Никакого socket buffer переполнения, никакого замораживания TCP, никакого head-of-line blocking. И главное — в обход bottleneck’а можно беспрепятственно прокидывать управляющие сообщения, в первую очередь checkpoint barriers.
Из чего складываются credits: exclusive + floating
В Flink каждый RemoteInputChannel владеет своим набором credits, разбитым на две группы:
- Exclusive buffers — буферы, гарантированно зарезервированные за этим конкретным каналом. Задаются
taskmanager.network.memory.buffers-per-channel, по умолчанию 2. Эти буферы всегда доступны для credits — даже когда поступает нагрузка с других каналов. - Floating buffers — общий пул, который канал может позаимствовать при наличии нагрузки. Задаются
taskmanager.network.memory.floating-buffers-per-gate, по умолчанию 8 на весь InputGate. Если канал A слышит «у меня очередь, нужно ещё буферов» — он просит у пула.
Сумма exclusive + floating определяет, сколько credits консьюмер может раздать. Каждый раз когда консьюмер освобождает буфер (то есть оператор считал данные и буфер вернулся в пул) — credit возвращается каналу, и канал шлёт AddCreditMessage производителю.
В уроке 3 этого модуля мы разберём пулы подробно. Пока запомни: число credits, которое consumer может выдать на subpartition — это сумма доступных ему exclusive + floating буферов.
Протокол: какие фреймы летают между TM
Чтобы увидеть, как credit-based работает на проводе, разберём упрощённый sequence одного запроса partition’а от инициализации до steady state. Это происходит при старте каждого RemoteInputChannel и при каждом изменении credits.
Из диаграммы видна основная динамика: producer всегда знает, можно ли слать. Если credits = 0, он просто не пишет в Netty. Никакого блокирования на сокете, никаких таймаутов, никакого случайного backpressure.
Преимущества для checkpoint barriers
Самое важное практическое следствие credit-based: barriers идут вне ограничения credits. В коде это BarrierAlignmentMessage и EventAnnouncement — у них отдельный handler в Netty pipeline.
Когда checkpoint coordinator отправляет barrier sources, source шлёт его в shuffle path. В legacy TCP-режиме barrier застревал в socket buffer вместе с обычными записями. В credit-based barrier едет отдельным фреймом, который не съедает credit и доходит downstream немедленно. Это в разы ускоряет alignment time (см. модуль 06 про checkpoints) и почти убирает таймауты на нагруженных пайплайнах.
Если ты видишь в Web UI огромный checkpoint alignment duration на каком-то operator’е — credit-based уже даёт всё, что может. Дальше нужно смотреть в сторону unaligned checkpoints (FLINK-14551, в 1.11+) или buffer debloating (FLINK-23451, в 1.14+, см. урок 4 этого модуля).
Конфигурация и наблюдение
Главные параметры credit-based в flink-conf.yaml:
taskmanager.network.memory.buffers-per-channel: 2
taskmanager.network.memory.floating-buffers-per-gate: 8
taskmanager.network.memory.max-buffers-per-channel: 10
taskmanager.network.batch-shuffle.compression.enabled: false
max-buffers-per-channel — ограничение на то, сколько floating buffers один канал может позаимствовать. Защита от ситуации, когда один жадный канал съел весь пул.
Метрики, которые стоит мониторить (доступны в Prometheus exporter):
flink_taskmanager_job_task_buffers_inputQueueLength— сколько буферов сидит в очереди на consumer-side.flink_taskmanager_job_task_buffers_outputQueueLength— то же на producer-side.flink_taskmanager_job_task_isBackPressured— boolean: задача сейчас в backpressure.flink_taskmanager_job_task_busyTimeMsPerSecond/idleTimeMsPerSecond— соотношение даёт busy ratio (см. урок 4 про backpressure detection).
В Web UI: вкладка Job -> BackPressure показывает агрегированный статус (OK, LOW, HIGH). За кулисами это статистическая выборка thread stack traces (старый sampling-based подход), но в современных версиях основной критерий — isBackPressured метрика, считающаяся прямо в RecordWriter’е.
Mini-deep: где это в коде
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java— клиентская сторона: получает BufferResponse, шлёт AddCredit.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java— серверная сторона: следит за availableCredit на каждом view, выбирает следующий буфер для отправки.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkSequenceViewReader.java— обёртка надPipelinedSubpartitionView, которая хранит credit accounting.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java— методыnotifyCreditAvailable(),onBuffer(),getNextBuffer().
Чтение source
Если хочешь увидеть весь жизненный цикл credits — пройди отладчиком тесты CreditBasedPartitionRequestClientHandlerTest и PartitionRequestQueueTest в flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/. Они поднимают локальный Netty pipeline в JVM и эмулируют consumer/producer диалог — самый быстрый способ увидеть протокол на живом коде.
Чек-лист
- До 1.5 backpressure был TCP-only: head-of-line blocking, барьеры застревали. FLINK-7282 это исправил.
- Credit = разрешение отправить один буфер. Бухгалтерия ведётся per subpartition.
- Credits = exclusive (per-channel, 2 по умолчанию) + floating (per-gate, 8 по умолчанию). Сумма ограничена
max-buffers-per-channel. - Producer хранит зеркало credit counter; шлёт буферы, только если credits > 0. Иначе ждёт
AddCreditMessage. - Checkpoint barriers едут отдельным каналом, не съедая credits — это главный мотив миграции.
- В коде:
CreditBasedPartitionRequestClientHandler(client),PartitionRequestQueue(server),RemoteInputChannel(consumer).