Learning Platform
Глоссарий Troubleshooting
Урок 04.01 · 28 мин
Продвинутый
Network stackShuffleNettyInputGateResultPartitionTaskManager

Когда оператор keyBy(...) или rebalance() пересылает запись на другой parallel instance — это не Java-вызов и не разделяемая структура данных. Это сериализация в буфер, передача через Netty по TCP, чтение из буфера на другой стороне, десериализация. Этот путь называется shuffle, и он отвечает за половину тюнинга любого реального Flink-кластера.

Shuffle в Spark: дорогая операция распределённых вычислений

В этом уроке мы пройдём по shuffle path в обе стороны: от вызова collector.collect(record) внутри оператора до того момента, когда этот record вылезает из processElement(...) на другом TaskManager. Если ты понимаешь все классы, через которые он проходит, ты понимаешь почему backpressure возникает именно там, а не «где-то в Flink».

Кто за что отвечает: NettyShuffleEnvironment

В каждом TaskManager при старте создаётся один NettyShuffleEnvironment (модуль flink-runtime, пакет org.apache.flink.runtime.io.network). Это центральная точка сетевого I/O для всего TM. Внутри живут:

  • NetworkBufferPool — пул MemorySegment для исходящих и входящих буферов. Размер пула задаётся taskmanager.memory.network.min/max (см. урок про memory model).
  • ConnectionManager — реализация Netty (NettyConnectionManager). Один Netty server слушает на taskmanager.data.port (по умолчанию 0, то есть рандомный порт), и один client со своим event loop для исходящих соединений.
  • ResultPartitionManager — реестр всех ResultPartition (исходящих) на этом TM. Когда удалённый TM просит «дайте мне данные partition X subpartition Y», именно он находит нужную партицию.
  • KvStateService — отдельный сервис для queryable state, нас сейчас не интересует.

Каждый Task при старте регистрируется в этом окружении: получает свои ResultPartition (по числу исходящих edges с этим оператором), создаёт InputGate для каждой входящей edge, и привязывает ввод-вывод к Netty.

NOTE

В коде Flink shuffle разделён абстракцией ShuffleService (FLIP-31). Netty — это дефолтная реализация (NettyShuffleServiceFactory). Есть альтернативы для batch: Remote Shuffle Service (Celeborn-подобный), который выгружает данные на отдельный кластер. Для streaming практически всегда используется Netty.

ResultPartition: producer side

Когда задача (например, map-оператор) генерирует запись, она вызывает RecordWriter.emit(record). Этот writer:

  1. Сериализует запись через TypeSerializer. Если это POJO — через PojoSerializer, если Avro — AvroSerializer, если Kryo (fallback) — соответствующий.
  2. Определяет target subpartition. Для keyBy — через KeyGroupStreamPartitioner, для rebalance() — round-robin, для forward() — всегда 0.
  3. Пишет сериализованные байты в текущий BufferBuilder соответствующего subpartition.

ResultPartition (PipelinedResultPartition для streaming) — это контейнер из нескольких ResultSubpartition, по одной на каждый downstream consumer. Если у тебя map с parallelism 4, идёт в keyBy -> sink с parallelism 4 — то у каждого map’а будет 1 ResultPartition с 4 subpartitions.

ResultPartition: 1 producer, N subpartitions

Один map-таск (producer) пишет в один ResultPartition. Внутри — по одной ResultSubpartition на каждый downstream consumer. Каждый subpartition имеет свой буфер.

Producer Task (map, parallelism=4, index=0)RecordWriter -> ChannelSelector -> SubpartitionBufferBuilder
ResultSubpartition 0-> consumer 0Получит данные ключей, попавших в KeyGroup, который маппится на consumer 0
ResultSubpartition 1-> consumer 1Получит данные ключей для consumer 1
ResultSubpartition 2-> consumer 2
ResultSubpartition 3-> consumer 3
каждая subpartitionхранит очередь BufferConsumer'овBufferConsumer оборачивает MemorySegment. Producer заполняет, Netty server вытаскивает и отсылает downstream.

Когда буфер заполнен (или вышел output flush timeout — задаётся execution.buffer-timeout, по умолчанию 100ms), он становится «доступен» — попадает в очередь готовых буферов subpartition’а. Дальше его подхватит либо локальный consumer, либо Netty server, чтобы отправить на удалённый TM.

InputGate: consumer side

На consumer стороне каждая входящая edge — это один InputGate. У оператора с N входами (например, connect(...)) — N inputGates. Внутри inputGate’а — массив InputChannel’ов, по одному на каждый upstream producer.

Producer parallelism = 4, Consumer parallelism = 3:
- consumer index 0:  InputGate с 4 InputChannel (по одному на каждый из 4 producers)
- consumer index 1:  то же самое
- consumer index 2:  то же самое

InputGate.pollNext() возвращает следующий буфер с данными (или событие — barrier, end-of-partition). Внутри он умеет:

  • Опрашивать все каналы, отдавать первый, у кого есть данные.
  • Обрабатывать checkpoint barriers (см. модуль 06).
  • Возвращать EndOfPartitionEvent, когда producer закончил работу.

Сериализованные байты, пришедшие в буфер, оператор десериализует обратно в record через RecordDeserializer. Это симметрично producer-side.

LocalInputChannel vs RemoteInputChannel

Главный split на consumer-side — какого типа InputChannel создаётся. Это решается на этапе deploy task’а (SingleInputGateFactory.create(...)):

  • Если upstream producer запущен в том же TaskManager (другой slot, но та же JVM) — создаётся LocalInputChannel. Он напрямую читает ResultSubpartition через ResultPartitionManager.createSubpartitionView(...). Никакого Netty, никакого TCP, никакой сериализации между процессами — буфер просто переходит из subpartition’а в inputChannel по in-memory reference.
  • Если upstream — на другом TM — создаётся RemoteInputChannel. Тогда при инициализации канала Flink через ConnectionManager.createPartitionRequestClient(...) открывает TCP-соединение к удалённому TM (если ещё не открыто — соединения мультиплексируются) и шлёт PartitionRequest(partitionId, subpartitionIndex, initialCredit). Producer-side Netty server подхватывает request, и начинается streaming буферов.
Два типа InputChannel в одном InputGate

Если producer и consumer в одной JVM — LocalInputChannel без Netty. Если в разных JVM — RemoteInputChannel через Netty/TCP. InputGate скрывает разницу.

TaskManager B :: Consumer Task (sink, index=0)InputGate
LocalInputChannelResultPartitionManager.createSubpartitionView()Прямая ссылка на ResultSubpartition в той же JVM. Буфер передаётся по reference, нулевое копирование.
upstream: TM Bmap index=2 (та же JVM, другой slot)
RemoteInputChannelNetty client -> TCP -> Netty serverСоединение мультиплексируется: один TCP-канал на пару TM, внутри много логических PartitionRequest
upstream: TM Amap index=0, 1, 3 (другая JVM)

Это знание сразу даёт практический критерий: co-location. Если ты сделал slotSharingGroup("group-a") и обе стороны попали в один slot — между ними LocalInputChannel, в десятки раз быстрее, чем RemoteInputChannel. Если ты сильно загружен сериализацией — это первое, на что смотреть.

Полный shuffle path: end-to-end

Сложим всё вместе. Производитель шлёт запись, как она доходит до удалённого consumer.

Shuffle path: producer TM -> consumer TM

9 шагов от collect() на producer-side до processElement() на consumer-side. Каждая стрелка — реальный метод в коде.

1. UserCode -> collector.collect(record)StreamOperator вызывает RecordWriter.emit(record)
2. RecordWriter -> ChannelSelectorвыбирает target subpartition (KeyGroupStreamPartitioner для keyBy)
3. RecordWriter -> TypeSerializerPOJO / Avro / Kryo serialize в MemorySegment (BufferBuilder)
4. ResultSubpartition.add(BufferConsumer)буфер заполнен, добавлен в очередь готовых
5. NettyServerHandler выбирает буфер из очередиждёт credits от downstream (см. урок 02), потом write() в channel
6. TCP <-> Netty client на consumer TMByteBuf приходит в EventLoop client'а
7. CreditBasedPartitionRequestClientHandlerдекодирует frame, кладёт буфер в RemoteInputChannel
8. InputGate.pollNext() -> RemoteInputChannel.getNextBuffer()оператор тянет буфер, десериализует
9. RecordDeserializer -> StreamOperator.processElement(record)record снова в user code, теперь на consumer TM

Заметь: между шагами 5 и 6 — настоящий TCP. Между 4 и 5 на consumer стороне разница между local и remote — local выкидывает шаги 5-7 целиком.

Mini-deep dive: BufferBuilder и BufferConsumer

Один концептуальный момент, который чаще всего путает: BufferBuilder и BufferConsumer — это разные взгляды на один и тот же MemorySegment.

  • BufferBuilder — producer-side handle. Через него RecordWriter пишет байты, увеличивая writerIndex.
  • BufferConsumer — consumer-side handle на тот же сегмент. Через него Netty (или LocalInputChannel) читает уже записанные байты, по readerIndex который догоняет writerIndex.

То есть один и тот же буфер читается, пока ещё пишется. Это позволяет начать передавать данные downstream до того, как буфер заполнится — снижает latency. Когда producer вызовет finish() — buffer считается закрытым.

Размер буфера — taskmanager.memory.segment-size, по умолчанию 32 KiB. Это тот самый размер, который потом тюнится buffer debloating’ом (см. урок 4 этого модуля), и который ограничивает worst-case latency между записью и её доставкой.

TIP

Если в job graph у тебя operator chain (forward strategy между двумя операторами и они в одном slot) — Flink убирает shuffle вообще. Запись из chained-предка попадает прямо в processElement() следующего оператора через обычный Java-вызов. Это самый быстрый «shuffle» — его просто нет. См. JobGraphGenerator и StreamingJobGraphGenerator.chainOperatorsAndAddToChain(...).

Чтение source

Если хочешь увидеть код своими глазами:

  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java — фабрика и lifecycle.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java и PipelinedResultPartition.java — producer side.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java, LocalInputChannel.java, RemoteInputChannel.java — consumer side.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java — точка входа на producer side.
  • flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java — Netty handler, который мы будем подробно разбирать в уроке 2.

В flink-runtime/src/test/java/org/apache/flink/runtime/io/network/ лежат integration-тесты, которые поднимают локальный shuffle между двумя TM в JVM-mode — хорошая отправная точка для дебага.

Почему это важно

Каждый раз, когда ты видишь в Web UI красную полосу backpressure — это значит, что какой-то из элементов цепочки ResultPartition -> Netty -> RemoteInputChannel не успевает. Без понимания, что внутри shuffle path именно эти классы, ты будешь крутить параметры наугад. Со знанием — будешь смотреть на метрики каждого слоя и понимать, где boilneck.

Следующий урок — как именно producer узнаёт, можно ли ему отправлять данные. Спойлер: через credit-based flow control, который заменил голый TCP backpressure в Flink 1.5.

Проверка знанийKnowledge check
У тебя map с parallelism=8 шлёт данные в keyBy -> sink с parallelism=8. Все 16 тасков размещены в 2 TaskManagers (по 8 slot'ов). По 4 map и по 4 sink на каждом TM. Сколько LocalInputChannel и RemoteInputChannel будет суммарно на одном sink-операторе?
ОтветAnswer
Для каждого sink-таска создаётся InputGate, в нём по одному InputChannel на каждый upstream map-task. Map'ов 8 штук, значит 8 каналов в каждом sink. Из них 4 — на map'ы того же TM (LocalInputChannel) и 4 — на map'ы другого TM (RemoteInputChannel). Итого: 4 Local + 4 Remote = 8 каналов на каждый sink-таск. Через эти 4 Remote каналы Flink мультиплексирует трафик в одно TCP-соединение между двумя TM (по умолчанию один TCP на пару TM, см. taskmanager.network.netty.num-arenas).

Чек-лист

  • NettyShuffleEnvironment — центральная сущность shuffle на TM. Содержит NetworkBufferPool, ConnectionManager, ResultPartitionManager.
  • На producer side: ResultPartition = N ResultSubpartition, по одной на каждый downstream consumer. Внутри — очередь BufferConsumer.
  • На consumer side: InputGate = N InputChannel, по одному на каждый upstream producer. Тип канала — LocalInputChannel (та же JVM) или RemoteInputChannel (другая JVM).
  • Сериализация через TypeSerializer, выбор target — через ChannelSelector. Размер буфера — taskmanager.memory.segment-size (по умолчанию 32 KiB).
  • LocalInputChannel пропускает Netty/TCP/сериализацию между процессами — критично для co-located операторов.
  • Operator chain (forward strategy в одном slot) убирает shuffle вообще.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Producer-таск map с parallelism=4 шлёт данные в keyBy -> sink с parallelism=8. Сколько ResultSubpartition будет внутри ResultPartition одного producer-таска?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4