Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 24 мин
Продвинутый
SourceReaderMailboxFetcher threadspollNextSplitFetcherManagerSourceReaderBase

SourceReader internals: mailbox, pollNext, fetcher threads

В предыдущем уроке мы посмотрели на FLIP-27 на уровне ролей: enumerator на JM, reader на TM. Теперь спустимся в reader. У него очень специфичная threading-модель, понимание которой объясняет, почему source-ы Flink не блокируются на медленных I/O, как ровно работают checkpoint barriers, и почему pollNext написан так странно (возвращает enum вместо данных).

Этот урок — про mailbox-модель, threading separation между task-thread и fetcher-threads, и про абстракцию SplitFetcherManager, которая делает основную работу за тебя в production-коннекторах.

Синхронизация потоков: mutex, monitor, event

Mailbox: один thread для одного оператора

В Flink каждый Task (instance оператора на subtask) выполняется на одном thread-е — task-thread. Это умышленное решение, унаследованное от Streaming Dataflow Model: операторы single-threaded, никаких internal threads, никаких synchronization concerns внутри user-кода.

Но Task должен делать много вещей:

  • Читать input от network buffer-ов или source-ов.
  • Обрабатывать records (вызывать user functions).
  • Обрабатывать checkpoint barriers (snapshot state).
  • Обрабатывать watermarks.
  • Обрабатывать timers (для KeyedProcessFunction).
  • Coordinator events (от OperatorCoordinator на JM).

Все эти задачи нужно делать по очереди на одном thread-е, не блокируя друг друга. Здесь и появляется mailbox.

Mailbox — это очередь “задач” (Runnable / Mail), которые task-thread выполняет последовательно. Архитектурно это очень похоже на actor model (привет, Akka).

Mailbox loop в task-thread
SourceprocessInputГлавный default-action — обработка records. Вызывает pollNext() у SourceReader-а, эмитит records в operator chain
MailboxMail QueueConcurrent очередь Mail-объектов. Каждый Mail — Runnable + metadata. Mailbox executor добавляет от других threads (network IO, coordinator events, fetcher threads)
Task ThreadMailboxProcessorОдин thread на Task. Loop: пока есть Mail — выполняет их. Когда mailbox пустой — запускает default action (processInput). Поток управления полностью single-threaded внутри Task
Mail 1checkpoint barrierКогда network channel получает CheckpointBarrier, он постит Mail в mailbox: 'do snapshot для checkpoint N'
Mail 2coordinator eventSourceCoordinator на JM прислал AddSplitEvent — он попадает в mailbox как Mail. Reader обработает на task-thread, без race conditions
Mail 3timer fireProcessingTimeService срабатывает по wall-clock — постит Mail с user-timer callback. Так timers не race-ят с processElement
Mail 4fetcher wakeupFetcher-thread имеет данные — постит Mail 'есть данные, попробуй pollNext снова'. Это hint для loop-а — не блокироваться в idle-wait

Mailbox executor — это MailboxExecutor, доступен через getRuntimeContext(). Любой код может post-нуть task в mailbox: mailboxExecutor.execute(() -> doWork(), "description").

Для SourceReader это критично:

public class MySourceReader implements SourceReader<MyRecord, MySplit> {

    private final MailboxExecutor mailboxExecutor;

    public MySourceReader(SourceReaderContext context) {
        this.mailboxExecutor = context.getMailboxExecutor();
    }

    @Override
    public CompletableFuture<Void> isAvailable() {
        // Reader сообщает task-thread-у:
        // "вызывай pollNext снова когда этот future complete"
        return elementsQueue.getAvailabilityFuture();
    }
}

isAvailable() — это ключевой API. Task-thread вызывает pollNext(), если возвращается NOTHING_AVAILABLE, он смотрит на isAvailable() future. Если он incomplete, task-thread “паркуется” — переходит к другим mailbox-задачам. Когда future complete (fetcher закинул данные), task-thread знает: можно опять вызывать pollNext.

Эта non-blocking-модель — основа всего перформанса Flink. Task-thread никогда не спит на IO.


Fetcher threads: где реально происходит I/O

SourceReader не делает blocking I/O на task-thread-е. Если ты сам напишешь kafkaConsumer.poll(Duration.ofSeconds(1)) в pollNext — task-thread заблокируется на секунду, checkpoint barriers не обработаются, watermarks застрянут.

Решение — fetcher threads. Это отдельные background threads, которые делают I/O (poll Kafka, read file blocks, query DB), складывают результаты в очередь, и notify task-thread-а через mailbox.

SplitFetcherManager — стандартная абстракция, которая управляет fetcher-threads:

Fetcher threads и handoff в task-thread
Assigned SplitsSplits, назначенные этому reader-у через addSplits. Reader разделяет их по fetcher-threads (один или несколько на reader, в зависимости от стратегии)
SplitFetcherManagerSplitFetcherManager — координатор fetcher-threads. Решает: один fetcher на reader (simple) или один fetcher на split (parallel). По умолчанию — один fetcher на reader для Kafka, один на split для files
Fetcher Thread 1Fetcher-thread — отдельный thread в pool-е SplitFetcherManager-а. Делает blocking I/O: consumer.poll(), inputStream.read(). Когда есть данные — push в elementsQueue
Fetcher Thread 2
Fetcher Thread N
elementsQueueFutureCompletingBlockingQueue — специальная очередь Flink. Combines blocking queue с CompletableFuture для signal-а: 'есть данные'. Task-thread получает Mail когда future complete
Task ThreadTask-thread на TaskManager. Вызывает pollNext() — поллит elementsQueue (non-blocking). Если пусто — isAvailable() returns future, task-thread занимается другим
Emit to pipeline

FutureCompletingBlockingQueue — clever структура. Это BlockingQueue<RecordsWithSplitIds> + CompletableFuture<Void> (“future of availability”). Producer (fetcher) пушит запись и complete-ит future. Consumer (task-thread) забирает запись и ressets future. Если очередь пустая — future incomplete — task-thread знает: ждать.

Размер queue — конфигурируемый. Слишком маленький — fetcher часто блокируется (records.size > queue.capacity). Слишком большой — память тратится на буферизацию. Дефолт — 2 элемента в очереди (один в обработке, один pre-fetched).


SourceReaderBase: standard implementation

В Flink есть SourceReaderBase<E, T, SplitT, SplitStateT> — абстрактный класс, который реализует mailbox/fetcher boilerplate. Большинство коннекторов наследуются от него.

Тебе нужно реализовать:

  • SplitReader<E, SplitT> — per-split fetcher. Метод fetch() блокирующий, делает I/O, возвращает RecordsWithSplitIds<E>. Это вызывается из fetcher-thread.
  • RecordEmitter<E, T, SplitStateT> — converter. Из raw record E (от SplitReader) делает T (тип pipeline). Обновляет state split-а (offset, position). Это вызывается из task-thread в pollNext.
  • Configuration: какой SplitFetcherManager использовать (single-threaded, multi-threaded), какой queue size.
public class KafkaSplitReader implements SplitReader<ConsumerRecord<?, ?>, KafkaSourceSplit> {

    private final KafkaConsumer<byte[], byte[]> consumer;
    private final Map<TopicPartition, KafkaSourceSplit> currentSplits = new HashMap<>();

    @Override
    public RecordsWithSplitIds<ConsumerRecord<?, ?>> fetch() throws IOException {
        ConsumerRecords<byte[], byte[]> records;
        try {
            records = consumer.poll(Duration.ofMillis(pollTimeoutMs));
        } catch (WakeupException ex) {
            // Wakeup используется для прерывания poll-а при shutdown
            return new EmptyRecordsWithSplitIds();
        }

        return new KafkaPartitionSplitRecords(records, currentSplits);
    }

    @Override
    public void handleSplitsChanges(SplitsChange<KafkaSourceSplit> splitsChanges) {
        if (splitsChanges instanceof SplitsAddition) {
            // Subscribe consumer к новым partitions
            List<TopicPartition> partitions = splitsChanges.splits().stream()
                .map(KafkaSourceSplit::getTopicPartition)
                .collect(Collectors.toList());
            consumer.assign(partitions);

            // Seek to startingOffset for each new split
            for (KafkaSourceSplit split : splitsChanges.splits()) {
                consumer.seek(split.getTopicPartition(), split.getStartingOffset());
                currentSplits.put(split.getTopicPartition(), split);
            }
        }
    }

    @Override
    public void wakeUp() {
        // Прерывает blocked poll() - вызывается task-thread-ом для unblock fetcher
        consumer.wakeup();
    }

    @Override
    public void close() {
        consumer.close();
    }
}
public class KafkaRecordEmitter implements RecordEmitter<
    ConsumerRecord<?, ?>, RowData, KafkaSourceSplitState> {

    @Override
    public void emitRecord(
        ConsumerRecord<?, ?> record,
        SourceOutput<RowData> output,
        KafkaSourceSplitState splitState) {

        // Десериализация
        RowData row = deserializer.deserialize(record);

        // Эмит в pipeline с timestamp
        output.collect(row, record.timestamp());

        // Обновляем state split-а (для checkpoint)
        splitState.setCurrentOffset(record.offset() + 1);
    }
}

Заметь разделение:

  • SplitReader.fetch() — блокирующий I/O, на fetcher-thread.
  • RecordEmitter.emitRecord() — non-blocking convert + emit, на task-thread.

SourceReaderBase склеивает их через очередь. Ты не пишешь mailbox-код, threading-код, queue management.


InputStatus и backpressure

pollNext возвращает InputStatus:

StatusСемантикаПоведение task-loop
MORE_AVAILABLEЕсть ещё records прямо сейчасСнова вызывает pollNext
NOTHING_AVAILABLEСейчас нет данныхЖдёт isAvailable() future
END_OF_INPUTSource закончился (bounded mode)Закрывает operator chain

MORE_AVAILABLE — это hint, что pipeline может продолжать без yield-а. NOTHING_AVAILABLE — yield, дай другим mailbox-задачам отработать.

Backpressure (когда downstream медленнее source-а) обрабатывается естественно: downstream operator не может принять records, output network buffer заполнен, операторская chain блокируется. Это передаётся обратно в source через output.collect() — collect просто блокируется (поскольку всё на одном thread-е).

В fetcher-side: если elementsQueue заполнена (потому что task-thread не успевает discharge-ить), fetcher блокируется на put. Это backpressure до уровня external system (Kafka consumer перестанет poll-ить, Kafka broker увидит, что consumer не fetch-ит — но это нормально, consumer lag будет расти).

TIP

В Flink 1.18+ появилась явная pause()/resume() API для backpressure-аware source-ов. Это позволяет downstream operator-у signal-ить source-у: “приостанови fetcher-ы”. Используется в Kafka source через pauseOrResumeSplits() — он на consumer.pause() для конкретных partitions. Это даёт более fine-grained control чем естественный backpressure.


Checkpoint в source: snapshotState

Когда CheckpointBarrier приходит на input source-а, Flink:

  1. Делает CheckpointBarrier -> Mail в mailbox source-task.
  2. Mail обрабатывается: вызывает source.snapshotState(checkpointId).
  3. SourceReader возвращает List<MySplit> — текущее состояние всех его splits (offsets, positions).
  4. Splits сериализуются и пишутся в checkpoint storage (S3, HDFS, etc.).
  5. Барьер передаётся downstream.

Важно: snapshotState вызывается на task-thread, в один момент с другой работой задачи. Это значит, что SplitFetcherManager должен быть thread-safe в части snapshot-а — task-thread читает текущие offsets fetcher-а, fetcher может одновременно их update-ить. Решение — fetcher не апдейтит state напрямую, он только пушит в elementsQueue. State обновляется только в RecordEmitter.emitRecord — а это на task-thread. То есть snapshot-time state — это всегда offsets уже emit-нутых records.

Records, которые в elementsQueue но ещё не emit-нуты — после restart re-fetch-нутся (мы знаем offset последнего emitted record, fetcher seek-нется туда). Это нормально, и это даёт нам exactly-once-once-completed semantics.


Производительность: что реально измеряется

Метрики SourceReader (SourceReaderMetricGroup):

  • numRecordsIn / numRecordsInPerSecond — throughput.
  • currentFetchEventTimeLag — между event-time данных и system-time fetch-а (включая Kafka lag).
  • currentEmitEventTimeLag — между event-time данных и system-time emit (после backpressure).
  • sourceIdleTime — сколько времени reader idle (нет splits / нет данных).

Анализ:

  • numRecordsInPerSecond низкий, но sourceIdleTime высокий -> нет данных в source, всё нормально, просто потока мало.
  • numRecordsInPerSecond низкий, sourceIdleTime низкий -> reader не idle, но не emit-ит -> backpressure от downstream.
  • currentFetchEventTimeLag высокий -> Kafka consumer lag, не успевает fetch-ить.
  • currentEmitEventTimeLag >> currentFetchEventTimeLag -> fetch ok, но emit медленный -> backpressure.

Эти метрики разделяют source-side проблемы от pipeline-side. Если высокий fetch lag — копай Kafka (broker, network, consumer settings). Если высокий emit lag при низком fetch lag — копай downstream operators.

WARNING

“sourceIdleTime” в Flink-метриках это время суммарно (counter), не percentage. Чтобы получить idle ratio, нужно делить на elapsed time. Используй PromQL: rate(flink_taskmanager_job_task_operator_sourceIdleTime[1m]) / 1000 — это idle seconds per second, эквивалент idle ratio (0.0 = always busy, 1.0 = always idle).


Custom source: cheatsheet

Если ты пишешь source с нуля:

  1. Реши: использовать SourceReaderBase или нет. Если internal система имеет blocking API — почти всегда use base. Если у тебя async client (Netty, gRPC, async DB driver) — можешь свой reader без fetcher-threads (просто заверни callbacks в mailbox-tasks).

  2. Реализуй Split: minimal serializable description. Включи всё, что нужно для restart-а (offset, position).

  3. Реализуй SplitReader: open in handleSplitsChanges, read in fetch, close in close. fetch-метод должен respond-ить на wakeUp() для interrupt-а.

  4. Реализуй RecordEmitter: deserialize + emit + обновить state. Не делай тяжёлой работы тут — emitter должен быть быстрый.

  5. Тестируй с MiniCluster: запусти reader в StreamExecutionEnvironment.getExecutionEnvironment() локально, послушай метрики, посмотри как ведёт себя с искусственной backpressure-нагрузкой.

  6. Тестируй failover: задампи job в savepoint, рестартни на больший parallelism, убедись что splits правильно redistribute-ются.


Проверка знанийKnowledge check
В custom source reader-е, в pollNext() ты делаешь dbClient.query(splitId, lastOffset) (блокирующий call, может занять 100-300мс). Job работает, throughput низкий, checkpoint duration p99 = 12 секунд при interval 30 секунд, alignment time = 11 секунд. Почему checkpoint такой долгий, и как починить?
ОтветAnswer
Проблема: pollNext делает blocking I/O на task-thread. Это нарушает mailbox model. Когда CheckpointBarrier приходит в input источника, она ставится Mail в mailbox. Но task-thread сейчас залип в pollNext (300мс на dbClient.query). Mail ждёт. Когда query вернулась, task-thread обработал record (вызвал downstream), снова вызвал pollNext, снова query 300мс. Mailbox с barrier-ом обрабатывается только когда task-thread между poll-ами успевает выскочить. Если ты делаешь много records подряд — mailbox-задачи starve. Alignment time высокий не из-за барьеров от других inputs, а потому что source-task сам не успевает propagate барьер дальше (он ждёт окна между query). Решение: вынеси blocking query в fetcher-thread. Используй SourceReaderBase + SplitReader, где fetch() вызывается из background thread. dbClient.query идёт в fetch(), результаты в elementsQueue. pollNext только poll-ит queue (микросекунды), отдаёт control обратно task-loop-у. Mailbox processes barriers сразу после каждого records emit, alignment time падает до ~миллисекунд. Bonus: throughput тоже вырастет — потому что pipeline downstream может работать пока fetcher fetch-ит следующий batch.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В Flink mailbox-модели почему SourceReader.pollNext() возвращает InputStatus вместо собственно records?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4