Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 28 мин
Продвинутый
Credit-basedFlow controlBackpressureFLINK-7282NettyCheckpoint barriers

В прошлом уроке мы оставили один вопрос открытым: на шаге 5 shuffle path NettyServerHandler «ждёт credits от downstream». Что это значит? Кто кому что выдаёт? Откуда credits берутся? Этот урок — про то, как именно Flink контролирует поток данных между producer и consumer.

Pipes в Linux: anonymous pipes и backpressure через buffer

Как было: TCP-only backpressure (до 1.5)

До версии 1.5 (2018, FLINK-7282) Flink полагался на встроенный TCP flow control. Идея простая: если consumer не успевает читать из сокета, TCP window закрывается, и producer block’ится на write(). Это работает, но даёт три серьёзные проблемы:

Проблема 1: один TCP-канал — один backpressure-домен. Между двумя TM Flink держит один TCP-соединение, через которое мультиплексируются все subpartition-пары. Если один медленный consumer на одной из subpartition заблокировал канал — все остальные subpartition тоже замирают. Это «head-of-line blocking».

Проблема 2: гранулярность. TCP видит байты, а не логические записи. Backpressure включается на уровне сокета, и producer узнаёт о нём только когда socket.write() вернёт WouldBlock. К этому моменту в socket buffer уже накопилось десятки KiB.

Проблема 3 — критическая для Flink: checkpoint barriers застревают. Когда producer хочет вставить checkpoint barrier в поток, он шлёт его как обычную запись. Если канал заблокирован TCP backpressure’ом — barrier не идёт через сокет, и downstream не начинает свой checkpoint. Это раздувает время checkpoint’а и в худшем случае приводит к таймаутам.

Команда Flink перепроектировала весь network stack, и в 1.5 появился credit-based flow control. С тех пор это дефолт, и legacy режим даже выкинули в 1.16.

Идея credit-based: бухгалтерия на уровне subpartition

Credit — это разрешение отправить один буфер. Consumer выдаёт producer’у конкретное число credits (например, «я готов принять 3 буфера от subpartition X»), и producer не может отправить больше. Когда consumer освобождает буфер (десериализовал, отдал в оператор) — он шлёт CreditNotification обратно: «+1 credit на subpartition X».

Главное: бухгалтерия ведётся per subpartition, а не на TCP-канал. Если один downstream забит, его credits = 0, и producer перестаёт ему слать. Но соседняя subpartition в том же TCP-канале спокойно продолжает работать. Никакого head-of-line blocking.

Credit-based: бухгалтерия по subpartition

Consumer хранит счётчик credits на каждый RemoteInputChannel. При каждом освобождении буфера — +1 credit. Producer хранит зеркало этого счётчика и шлёт буферы, только если есть credits.

Producer TM AResultPartition view per consumer: { creditAvailable: int }
sub 0 -> cons 0credits=4 (можно слать 4 buf)
sub 1 -> cons 1credits=0 (нельзя!)cons 1 перегружен. Producer ждёт CreditNotification и при этом продолжает свободно слать в sub 0.
sub 2 -> cons 2credits=2
один TCP-канал между TM A и TM Bмультиплексирует все 3 subpartition pair, бэкпрешер per-channel — независимый
Consumer TM BRemoteInputChannel per producer: { creditsAvailable, exclusiveBuffers, floatingBuffers }

Производитель не блокируется на write() — он просто не зовёт write(), если credits нет. Никакого socket buffer переполнения, никакого замораживания TCP, никакого head-of-line blocking. И главное — в обход bottleneck’а можно беспрепятственно прокидывать управляющие сообщения, в первую очередь checkpoint barriers.

Из чего складываются credits: exclusive + floating

В Flink каждый RemoteInputChannel владеет своим набором credits, разбитым на две группы:

  • Exclusive buffers — буферы, гарантированно зарезервированные за этим конкретным каналом. Задаются taskmanager.network.memory.buffers-per-channel, по умолчанию 2. Эти буферы всегда доступны для credits — даже когда поступает нагрузка с других каналов.
  • Floating buffers — общий пул, который канал может позаимствовать при наличии нагрузки. Задаются taskmanager.network.memory.floating-buffers-per-gate, по умолчанию 8 на весь InputGate. Если канал A слышит «у меня очередь, нужно ещё буферов» — он просит у пула.

Сумма exclusive + floating определяет, сколько credits консьюмер может раздать. Каждый раз когда консьюмер освобождает буфер (то есть оператор считал данные и буфер вернулся в пул) — credit возвращается каналу, и канал шлёт AddCreditMessage производителю.

В уроке 3 этого модуля мы разберём пулы подробно. Пока запомни: число credits, которое consumer может выдать на subpartition — это сумма доступных ему exclusive + floating буферов.

Протокол: какие фреймы летают между TM

Чтобы увидеть, как credit-based работает на проводе, разберём упрощённый sequence одного запроса partition’а от инициализации до steady state. Это происходит при старте каждого RemoteInputChannel и при каждом изменении credits.

Consumer (RemoteInputChannel)
Netty client
Netty server
Producer (PipelinedSubpartitionView)
requestSubpartition(partitionId, subIdx, initialCredit=2)PartitionRequest framecreateSubpartitionView()notifyDataAvailable() + credit=2BufferResponse frame x2onBuffer() x2оператор processElement(), буфер освобождёнnotifyCreditAvailable(+1)AddCredit frameaddCredit(+1), availableCredit=1BufferResponse (если данные есть)

Из диаграммы видна основная динамика: producer всегда знает, можно ли слать. Если credits = 0, он просто не пишет в Netty. Никакого блокирования на сокете, никаких таймаутов, никакого случайного backpressure.

Преимущества для checkpoint barriers

Самое важное практическое следствие credit-based: barriers идут вне ограничения credits. В коде это BarrierAlignmentMessage и EventAnnouncement — у них отдельный handler в Netty pipeline.

Когда checkpoint coordinator отправляет barrier sources, source шлёт его в shuffle path. В legacy TCP-режиме barrier застревал в socket buffer вместе с обычными записями. В credit-based barrier едет отдельным фреймом, который не съедает credit и доходит downstream немедленно. Это в разы ускоряет alignment time (см. модуль 06 про checkpoints) и почти убирает таймауты на нагруженных пайплайнах.

TIP

Если ты видишь в Web UI огромный checkpoint alignment duration на каком-то operator’е — credit-based уже даёт всё, что может. Дальше нужно смотреть в сторону unaligned checkpoints (FLINK-14551, в 1.11+) или buffer debloating (FLINK-23451, в 1.14+, см. урок 4 этого модуля).

Конфигурация и наблюдение

Главные параметры credit-based в flink-conf.yaml:

taskmanager.network.memory.buffers-per-channel: 2
taskmanager.network.memory.floating-buffers-per-gate: 8
taskmanager.network.memory.max-buffers-per-channel: 10
taskmanager.network.batch-shuffle.compression.enabled: false

max-buffers-per-channel — ограничение на то, сколько floating buffers один канал может позаимствовать. Защита от ситуации, когда один жадный канал съел весь пул.

Метрики, которые стоит мониторить (доступны в Prometheus exporter):

  • flink_taskmanager_job_task_buffers_inputQueueLength — сколько буферов сидит в очереди на consumer-side.
  • flink_taskmanager_job_task_buffers_outputQueueLength — то же на producer-side.
  • flink_taskmanager_job_task_isBackPressured — boolean: задача сейчас в backpressure.
  • flink_taskmanager_job_task_busyTimeMsPerSecond / idleTimeMsPerSecond — соотношение даёт busy ratio (см. урок 4 про backpressure detection).

В Web UI: вкладка Job -> BackPressure показывает агрегированный статус (OK, LOW, HIGH). За кулисами это статистическая выборка thread stack traces (старый sampling-based подход), но в современных версиях основной критерий — isBackPressured метрика, считающаяся прямо в RecordWriter’е.

Mini-deep: где это в коде

  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java — клиентская сторона: получает BufferResponse, шлёт AddCredit.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java — серверная сторона: следит за availableCredit на каждом view, выбирает следующий буфер для отправки.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkSequenceViewReader.java — обёртка над PipelinedSubpartitionView, которая хранит credit accounting.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java — методы notifyCreditAvailable(), onBuffer(), getNextBuffer().

Чтение source

Если хочешь увидеть весь жизненный цикл credits — пройди отладчиком тесты CreditBasedPartitionRequestClientHandlerTest и PartitionRequestQueueTest в flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/. Они поднимают локальный Netty pipeline в JVM и эмулируют consumer/producer диалог — самый быстрый способ увидеть протокол на живом коде.

Чек-лист

  • До 1.5 backpressure был TCP-only: head-of-line blocking, барьеры застревали. FLINK-7282 это исправил.
  • Credit = разрешение отправить один буфер. Бухгалтерия ведётся per subpartition.
  • Credits = exclusive (per-channel, 2 по умолчанию) + floating (per-gate, 8 по умолчанию). Сумма ограничена max-buffers-per-channel.
  • Producer хранит зеркало credit counter; шлёт буферы, только если credits > 0. Иначе ждёт AddCreditMessage.
  • Checkpoint barriers едут отдельным каналом, не съедая credits — это главный мотив миграции.
  • В коде: CreditBasedPartitionRequestClientHandler (client), PartitionRequestQueue (server), RemoteInputChannel (consumer).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. Какая главная проблема TCP-only backpressure (до Flink 1.5), которую решил credit-based flow control?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4