Когда оператор keyBy(...) или rebalance() пересылает запись на другой parallel instance — это не Java-вызов и не разделяемая структура данных. Это сериализация в буфер, передача через Netty по TCP, чтение из буфера на другой стороне, десериализация. Этот путь называется shuffle, и он отвечает за половину тюнинга любого реального Flink-кластера.
В этом уроке мы пройдём по shuffle path в обе стороны: от вызова collector.collect(record) внутри оператора до того момента, когда этот record вылезает из processElement(...) на другом TaskManager. Если ты понимаешь все классы, через которые он проходит, ты понимаешь почему backpressure возникает именно там, а не «где-то в Flink».
Кто за что отвечает: NettyShuffleEnvironment
В каждом TaskManager при старте создаётся один NettyShuffleEnvironment (модуль flink-runtime, пакет org.apache.flink.runtime.io.network). Это центральная точка сетевого I/O для всего TM. Внутри живут:
NetworkBufferPool— пулMemorySegmentдля исходящих и входящих буферов. Размер пула задаётсяtaskmanager.memory.network.min/max(см. урок про memory model).ConnectionManager— реализация Netty (NettyConnectionManager). Один Netty server слушает наtaskmanager.data.port(по умолчанию0, то есть рандомный порт), и один client со своим event loop для исходящих соединений.ResultPartitionManager— реестр всехResultPartition(исходящих) на этом TM. Когда удалённый TM просит «дайте мне данные partition X subpartition Y», именно он находит нужную партицию.KvStateService— отдельный сервис для queryable state, нас сейчас не интересует.
Каждый Task при старте регистрируется в этом окружении: получает свои ResultPartition (по числу исходящих edges с этим оператором), создаёт InputGate для каждой входящей edge, и привязывает ввод-вывод к Netty.
В коде Flink shuffle разделён абстракцией ShuffleService (FLIP-31). Netty — это дефолтная реализация (NettyShuffleServiceFactory). Есть альтернативы для batch: Remote Shuffle Service (Celeborn-подобный), который выгружает данные на отдельный кластер. Для streaming практически всегда используется Netty.
ResultPartition: producer side
Когда задача (например, map-оператор) генерирует запись, она вызывает RecordWriter.emit(record). Этот writer:
- Сериализует запись через
TypeSerializer. Если это POJO — черезPojoSerializer, если Avro —AvroSerializer, если Kryo (fallback) — соответствующий. - Определяет target subpartition. Для
keyBy— черезKeyGroupStreamPartitioner, дляrebalance()— round-robin, дляforward()— всегда 0. - Пишет сериализованные байты в текущий
BufferBuilderсоответствующего subpartition.
ResultPartition (PipelinedResultPartition для streaming) — это контейнер из нескольких ResultSubpartition, по одной на каждый downstream consumer. Если у тебя map с parallelism 4, идёт в keyBy -> sink с parallelism 4 — то у каждого map’а будет 1 ResultPartition с 4 subpartitions.
Один map-таск (producer) пишет в один ResultPartition. Внутри — по одной ResultSubpartition на каждый downstream consumer. Каждый subpartition имеет свой буфер.
Когда буфер заполнен (или вышел output flush timeout — задаётся execution.buffer-timeout, по умолчанию 100ms), он становится «доступен» — попадает в очередь готовых буферов subpartition’а. Дальше его подхватит либо локальный consumer, либо Netty server, чтобы отправить на удалённый TM.
InputGate: consumer side
На consumer стороне каждая входящая edge — это один InputGate. У оператора с N входами (например, connect(...)) — N inputGates. Внутри inputGate’а — массив InputChannel’ов, по одному на каждый upstream producer.
Producer parallelism = 4, Consumer parallelism = 3:
- consumer index 0: InputGate с 4 InputChannel (по одному на каждый из 4 producers)
- consumer index 1: то же самое
- consumer index 2: то же самое
InputGate.pollNext() возвращает следующий буфер с данными (или событие — barrier, end-of-partition). Внутри он умеет:
- Опрашивать все каналы, отдавать первый, у кого есть данные.
- Обрабатывать checkpoint barriers (см. модуль 06).
- Возвращать
EndOfPartitionEvent, когда producer закончил работу.
Сериализованные байты, пришедшие в буфер, оператор десериализует обратно в record через RecordDeserializer. Это симметрично producer-side.
LocalInputChannel vs RemoteInputChannel
Главный split на consumer-side — какого типа InputChannel создаётся. Это решается на этапе deploy task’а (SingleInputGateFactory.create(...)):
- Если upstream producer запущен в том же TaskManager (другой slot, но та же JVM) — создаётся
LocalInputChannel. Он напрямую читаетResultSubpartitionчерезResultPartitionManager.createSubpartitionView(...). Никакого Netty, никакого TCP, никакой сериализации между процессами — буфер просто переходит из subpartition’а в inputChannel по in-memory reference. - Если upstream — на другом TM — создаётся
RemoteInputChannel. Тогда при инициализации канала Flink черезConnectionManager.createPartitionRequestClient(...)открывает TCP-соединение к удалённому TM (если ещё не открыто — соединения мультиплексируются) и шлётPartitionRequest(partitionId, subpartitionIndex, initialCredit). Producer-side Netty server подхватывает request, и начинается streaming буферов.
Если producer и consumer в одной JVM — LocalInputChannel без Netty. Если в разных JVM — RemoteInputChannel через Netty/TCP. InputGate скрывает разницу.
Это знание сразу даёт практический критерий: co-location. Если ты сделал slotSharingGroup("group-a") и обе стороны попали в один slot — между ними LocalInputChannel, в десятки раз быстрее, чем RemoteInputChannel. Если ты сильно загружен сериализацией — это первое, на что смотреть.
Полный shuffle path: end-to-end
Сложим всё вместе. Производитель шлёт запись, как она доходит до удалённого consumer.
9 шагов от collect() на producer-side до processElement() на consumer-side. Каждая стрелка — реальный метод в коде.
Заметь: между шагами 5 и 6 — настоящий TCP. Между 4 и 5 на consumer стороне разница между local и remote — local выкидывает шаги 5-7 целиком.
Mini-deep dive: BufferBuilder и BufferConsumer
Один концептуальный момент, который чаще всего путает: BufferBuilder и BufferConsumer — это разные взгляды на один и тот же MemorySegment.
BufferBuilder— producer-side handle. Через негоRecordWriterпишет байты, увеличиваяwriterIndex.BufferConsumer— consumer-side handle на тот же сегмент. Через него Netty (или LocalInputChannel) читает уже записанные байты, поreaderIndexкоторый догоняетwriterIndex.
То есть один и тот же буфер читается, пока ещё пишется. Это позволяет начать передавать данные downstream до того, как буфер заполнится — снижает latency. Когда producer вызовет finish() — buffer считается закрытым.
Размер буфера — taskmanager.memory.segment-size, по умолчанию 32 KiB. Это тот самый размер, который потом тюнится buffer debloating’ом (см. урок 4 этого модуля), и который ограничивает worst-case latency между записью и её доставкой.
Если в job graph у тебя operator chain (forward strategy между двумя операторами и они в одном slot) — Flink убирает shuffle вообще. Запись из chained-предка попадает прямо в processElement() следующего оператора через обычный Java-вызов. Это самый быстрый «shuffle» — его просто нет. См. JobGraphGenerator и StreamingJobGraphGenerator.chainOperatorsAndAddToChain(...).
Чтение source
Если хочешь увидеть код своими глазами:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironment.java— фабрика и lifecycle.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.javaиPipelinedResultPartition.java— producer side.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java,LocalInputChannel.java,RemoteInputChannel.java— consumer side.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java— точка входа на producer side.flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java— Netty handler, который мы будем подробно разбирать в уроке 2.
В flink-runtime/src/test/java/org/apache/flink/runtime/io/network/ лежат integration-тесты, которые поднимают локальный shuffle между двумя TM в JVM-mode — хорошая отправная точка для дебага.
Почему это важно
Каждый раз, когда ты видишь в Web UI красную полосу backpressure — это значит, что какой-то из элементов цепочки ResultPartition -> Netty -> RemoteInputChannel не успевает. Без понимания, что внутри shuffle path именно эти классы, ты будешь крутить параметры наугад. Со знанием — будешь смотреть на метрики каждого слоя и понимать, где boilneck.
Следующий урок — как именно producer узнаёт, можно ли ему отправлять данные. Спойлер: через credit-based flow control, который заменил голый TCP backpressure в Flink 1.5.
Чек-лист
NettyShuffleEnvironment— центральная сущность shuffle на TM. СодержитNetworkBufferPool,ConnectionManager,ResultPartitionManager.- На producer side:
ResultPartition= NResultSubpartition, по одной на каждый downstream consumer. Внутри — очередьBufferConsumer. - На consumer side:
InputGate= NInputChannel, по одному на каждый upstream producer. Тип канала —LocalInputChannel(та же JVM) илиRemoteInputChannel(другая JVM). - Сериализация через
TypeSerializer, выбор target — черезChannelSelector. Размер буфера —taskmanager.memory.segment-size(по умолчанию 32 KiB). LocalInputChannelпропускает Netty/TCP/сериализацию между процессами — критично для co-located операторов.- Operator chain (forward strategy в одном slot) убирает shuffle вообще.