SourceReader internals: mailbox, pollNext, fetcher threads
В предыдущем уроке мы посмотрели на FLIP-27 на уровне ролей: enumerator на JM, reader на TM. Теперь спустимся в reader. У него очень специфичная threading-модель, понимание которой объясняет, почему source-ы Flink не блокируются на медленных I/O, как ровно работают checkpoint barriers, и почему pollNext написан так странно (возвращает enum вместо данных).
Этот урок — про mailbox-модель, threading separation между task-thread и fetcher-threads, и про абстракцию SplitFetcherManager, которая делает основную работу за тебя в production-коннекторах.
Mailbox: один thread для одного оператора
В Flink каждый Task (instance оператора на subtask) выполняется на одном thread-е — task-thread. Это умышленное решение, унаследованное от Streaming Dataflow Model: операторы single-threaded, никаких internal threads, никаких synchronization concerns внутри user-кода.
Но Task должен делать много вещей:
- Читать input от network buffer-ов или source-ов.
- Обрабатывать records (вызывать user functions).
- Обрабатывать checkpoint barriers (snapshot state).
- Обрабатывать watermarks.
- Обрабатывать timers (для KeyedProcessFunction).
- Coordinator events (от OperatorCoordinator на JM).
Все эти задачи нужно делать по очереди на одном thread-е, не блокируя друг друга. Здесь и появляется mailbox.
Mailbox — это очередь “задач” (Runnable / Mail), которые task-thread выполняет последовательно. Архитектурно это очень похоже на actor model (привет, Akka).
Mailbox executor — это MailboxExecutor, доступен через getRuntimeContext(). Любой код может post-нуть task в mailbox: mailboxExecutor.execute(() -> doWork(), "description").
Для SourceReader это критично:
public class MySourceReader implements SourceReader<MyRecord, MySplit> {
private final MailboxExecutor mailboxExecutor;
public MySourceReader(SourceReaderContext context) {
this.mailboxExecutor = context.getMailboxExecutor();
}
@Override
public CompletableFuture<Void> isAvailable() {
// Reader сообщает task-thread-у:
// "вызывай pollNext снова когда этот future complete"
return elementsQueue.getAvailabilityFuture();
}
}
isAvailable() — это ключевой API. Task-thread вызывает pollNext(), если возвращается NOTHING_AVAILABLE, он смотрит на isAvailable() future. Если он incomplete, task-thread “паркуется” — переходит к другим mailbox-задачам. Когда future complete (fetcher закинул данные), task-thread знает: можно опять вызывать pollNext.
Эта non-blocking-модель — основа всего перформанса Flink. Task-thread никогда не спит на IO.
Fetcher threads: где реально происходит I/O
SourceReader не делает blocking I/O на task-thread-е. Если ты сам напишешь kafkaConsumer.poll(Duration.ofSeconds(1)) в pollNext — task-thread заблокируется на секунду, checkpoint barriers не обработаются, watermarks застрянут.
Решение — fetcher threads. Это отдельные background threads, которые делают I/O (poll Kafka, read file blocks, query DB), складывают результаты в очередь, и notify task-thread-а через mailbox.
SplitFetcherManager — стандартная абстракция, которая управляет fetcher-threads:
FutureCompletingBlockingQueue — clever структура. Это BlockingQueue<RecordsWithSplitIds> + CompletableFuture<Void> (“future of availability”). Producer (fetcher) пушит запись и complete-ит future. Consumer (task-thread) забирает запись и ressets future. Если очередь пустая — future incomplete — task-thread знает: ждать.
Размер queue — конфигурируемый. Слишком маленький — fetcher часто блокируется (records.size > queue.capacity). Слишком большой — память тратится на буферизацию. Дефолт — 2 элемента в очереди (один в обработке, один pre-fetched).
SourceReaderBase: standard implementation
В Flink есть SourceReaderBase<E, T, SplitT, SplitStateT> — абстрактный класс, который реализует mailbox/fetcher boilerplate. Большинство коннекторов наследуются от него.
Тебе нужно реализовать:
SplitReader<E, SplitT>— per-split fetcher. Методfetch()блокирующий, делает I/O, возвращаетRecordsWithSplitIds<E>. Это вызывается из fetcher-thread.RecordEmitter<E, T, SplitStateT>— converter. Из raw record E (от SplitReader) делает T (тип pipeline). Обновляет state split-а (offset, position). Это вызывается из task-thread в pollNext.- Configuration: какой
SplitFetcherManagerиспользовать (single-threaded, multi-threaded), какой queue size.
public class KafkaSplitReader implements SplitReader<ConsumerRecord<?, ?>, KafkaSourceSplit> {
private final KafkaConsumer<byte[], byte[]> consumer;
private final Map<TopicPartition, KafkaSourceSplit> currentSplits = new HashMap<>();
@Override
public RecordsWithSplitIds<ConsumerRecord<?, ?>> fetch() throws IOException {
ConsumerRecords<byte[], byte[]> records;
try {
records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
} catch (WakeupException ex) {
// Wakeup используется для прерывания poll-а при shutdown
return new EmptyRecordsWithSplitIds();
}
return new KafkaPartitionSplitRecords(records, currentSplits);
}
@Override
public void handleSplitsChanges(SplitsChange<KafkaSourceSplit> splitsChanges) {
if (splitsChanges instanceof SplitsAddition) {
// Subscribe consumer к новым partitions
List<TopicPartition> partitions = splitsChanges.splits().stream()
.map(KafkaSourceSplit::getTopicPartition)
.collect(Collectors.toList());
consumer.assign(partitions);
// Seek to startingOffset for each new split
for (KafkaSourceSplit split : splitsChanges.splits()) {
consumer.seek(split.getTopicPartition(), split.getStartingOffset());
currentSplits.put(split.getTopicPartition(), split);
}
}
}
@Override
public void wakeUp() {
// Прерывает blocked poll() - вызывается task-thread-ом для unblock fetcher
consumer.wakeup();
}
@Override
public void close() {
consumer.close();
}
}
public class KafkaRecordEmitter implements RecordEmitter<
ConsumerRecord<?, ?>, RowData, KafkaSourceSplitState> {
@Override
public void emitRecord(
ConsumerRecord<?, ?> record,
SourceOutput<RowData> output,
KafkaSourceSplitState splitState) {
// Десериализация
RowData row = deserializer.deserialize(record);
// Эмит в pipeline с timestamp
output.collect(row, record.timestamp());
// Обновляем state split-а (для checkpoint)
splitState.setCurrentOffset(record.offset() + 1);
}
}
Заметь разделение:
SplitReader.fetch()— блокирующий I/O, на fetcher-thread.RecordEmitter.emitRecord()— non-blocking convert + emit, на task-thread.
SourceReaderBase склеивает их через очередь. Ты не пишешь mailbox-код, threading-код, queue management.
InputStatus и backpressure
pollNext возвращает InputStatus:
| Status | Семантика | Поведение task-loop |
|---|---|---|
MORE_AVAILABLE | Есть ещё records прямо сейчас | Снова вызывает pollNext |
NOTHING_AVAILABLE | Сейчас нет данных | Ждёт isAvailable() future |
END_OF_INPUT | Source закончился (bounded mode) | Закрывает operator chain |
MORE_AVAILABLE — это hint, что pipeline может продолжать без yield-а. NOTHING_AVAILABLE — yield, дай другим mailbox-задачам отработать.
Backpressure (когда downstream медленнее source-а) обрабатывается естественно: downstream operator не может принять records, output network buffer заполнен, операторская chain блокируется. Это передаётся обратно в source через output.collect() — collect просто блокируется (поскольку всё на одном thread-е).
В fetcher-side: если elementsQueue заполнена (потому что task-thread не успевает discharge-ить), fetcher блокируется на put. Это backpressure до уровня external system (Kafka consumer перестанет poll-ить, Kafka broker увидит, что consumer не fetch-ит — но это нормально, consumer lag будет расти).
В Flink 1.18+ появилась явная pause()/resume() API для backpressure-аware source-ов. Это позволяет downstream operator-у signal-ить source-у: “приостанови fetcher-ы”. Используется в Kafka source через pauseOrResumeSplits() — он на consumer.pause() для конкретных partitions. Это даёт более fine-grained control чем естественный backpressure.
Checkpoint в source: snapshotState
Когда CheckpointBarrier приходит на input source-а, Flink:
- Делает
CheckpointBarrier->Mailв mailbox source-task. - Mail обрабатывается: вызывает
source.snapshotState(checkpointId). - SourceReader возвращает
List<MySplit>— текущее состояние всех его splits (offsets, positions). - Splits сериализуются и пишутся в checkpoint storage (S3, HDFS, etc.).
- Барьер передаётся downstream.
Важно: snapshotState вызывается на task-thread, в один момент с другой работой задачи. Это значит, что SplitFetcherManager должен быть thread-safe в части snapshot-а — task-thread читает текущие offsets fetcher-а, fetcher может одновременно их update-ить. Решение — fetcher не апдейтит state напрямую, он только пушит в elementsQueue. State обновляется только в RecordEmitter.emitRecord — а это на task-thread. То есть snapshot-time state — это всегда offsets уже emit-нутых records.
Records, которые в elementsQueue но ещё не emit-нуты — после restart re-fetch-нутся (мы знаем offset последнего emitted record, fetcher seek-нется туда). Это нормально, и это даёт нам exactly-once-once-completed semantics.
Производительность: что реально измеряется
Метрики SourceReader (SourceReaderMetricGroup):
- numRecordsIn / numRecordsInPerSecond — throughput.
- currentFetchEventTimeLag — между event-time данных и system-time fetch-а (включая Kafka lag).
- currentEmitEventTimeLag — между event-time данных и system-time emit (после backpressure).
- sourceIdleTime — сколько времени reader idle (нет splits / нет данных).
Анализ:
numRecordsInPerSecondнизкий, ноsourceIdleTimeвысокий -> нет данных в source, всё нормально, просто потока мало.numRecordsInPerSecondнизкий,sourceIdleTimeнизкий -> reader не idle, но не emit-ит -> backpressure от downstream.currentFetchEventTimeLagвысокий -> Kafka consumer lag, не успевает fetch-ить.currentEmitEventTimeLag>>currentFetchEventTimeLag-> fetch ok, но emit медленный -> backpressure.
Эти метрики разделяют source-side проблемы от pipeline-side. Если высокий fetch lag — копай Kafka (broker, network, consumer settings). Если высокий emit lag при низком fetch lag — копай downstream operators.
“sourceIdleTime” в Flink-метриках это время суммарно (counter), не percentage. Чтобы получить idle ratio, нужно делить на elapsed time. Используй PromQL: rate(flink_taskmanager_job_task_operator_sourceIdleTime[1m]) / 1000 — это idle seconds per second, эквивалент idle ratio (0.0 = always busy, 1.0 = always idle).
Custom source: cheatsheet
Если ты пишешь source с нуля:
-
Реши: использовать SourceReaderBase или нет. Если internal система имеет blocking API — почти всегда use base. Если у тебя async client (Netty, gRPC, async DB driver) — можешь свой reader без fetcher-threads (просто заверни callbacks в mailbox-tasks).
-
Реализуй Split: minimal serializable description. Включи всё, что нужно для restart-а (offset, position).
-
Реализуй SplitReader: open in
handleSplitsChanges, read infetch, close inclose. fetch-метод должен respond-ить наwakeUp()для interrupt-а. -
Реализуй RecordEmitter: deserialize + emit + обновить state. Не делай тяжёлой работы тут — emitter должен быть быстрый.
-
Тестируй с MiniCluster: запусти reader в
StreamExecutionEnvironment.getExecutionEnvironment()локально, послушай метрики, посмотри как ведёт себя с искусственной backpressure-нагрузкой. -
Тестируй failover: задампи job в savepoint, рестартни на больший parallelism, убедись что splits правильно redistribute-ются.