Learning Platform
Глоссарий Troubleshooting
Урок 14.01 · 25 мин
Продвинутый
FLIP-27Source V2SplitEnumeratorSourceReaderSplit assignmentBounded/Unbounded

FLIP-27 Source V2: SplitEnumerator плюс SourceReader

Старый SourceFunction (теперь deprecated) был одним из самых проблемных API в Flink. Это просто run(SourceContext) метод, который ты должен был как-то распараллелить, синхронизировать с checkpoint barriers, обработать watermarks, разделить нагрузку между subtasks. Каждый коннектор изобретал свой подход. Kafka, Kinesis, File source — все по-разному. FLIP-27 (введён в Flink 1.11, основной API с Flink 1.14, в 2.2 — единственный поддерживаемый) разделил source на две роли с чёткой ответственностью.

Этот урок — про то, как устроен Source V2 на уровне процессов: что физически живёт в JobManager, что в TaskManager, как они общаются, как один и тот же API даёт и bounded (batch), и unbounded (streaming) семантику.

KafkaSource и watermarks через Source V2 API Consumer Groups: параллельное чтение из Kafka

Две роли: discovery и consumption

Главная идея FLIP-27 — разделить открытие источников данных (discovery) и чтение данных (consumption).

В старом API эти две задачи смешивались внутри одного SourceFunction. Например, FlinkKafkaConsumer сам открывал topic, обнаруживал партиции, назначал их subtasks через сложную логику в open(), и сам же читал данные в run(). В результате — если ты хотел сделать свой коннектор, тебе нужно было повторить всю эту машинерию.

FLIP-27 разделил это:

SplitEnumerator — централизованная компонента, живёт в одной инстанции на JobManager (внутри JobMaster). Её задача — обнаружить splits (логические единицы работы: партиция Kafka, файл, range в БД), и назначать их readers. Она знает обо всём source-е целиком.

SourceReader — параллельные инстанции, по одной на subtask. Живут на TaskManagers. Получают splits, читают данные, эмитят records в pipeline. Они не знают друг о друге — каждый видит только свои splits.

Source V2: компоненты и где они живут
JobManagerJobManager — процесс координации. Содержит SplitEnumerator всех source-ов job-а. В Flink 2.x SplitEnumerator живёт внутри OperatorCoordinator, который сам сидит в JobMaster
RPC
SplitEnumeratorSplitEnumerator — централизованная роль. Один на source. Обнаруживает splits (Kafka partitions, files, DB ranges), назначает их readers. Знает глобальное состояние source-а
TaskManager 1TaskManager 1 — процесс выполнения. Содержит subtask source-оператора, который запускает SourceReader. В JVM может быть несколько SourceReader-ов разных source-ов
addSplits
SourceReader[0]SourceReader subtask 0 — параллельный читатель. Получает splits через SourceCoordinatorContext, открывает их (Kafka consumer per split, file handle per split), читает в pollNext()
TaskManager 2
addSplits
SourceReader[1]SourceReader subtask 1 — независимая инстанция. Не знает о других readers. SplitEnumerator решает, кому что назначить через requestSplit/handleSplitRequest
TaskManager 3
addSplits
SourceReader[2]SourceReader subtask 2 — может получить 0, 1 или несколько splits. Если splits меньше parallelism — некоторые readers будут idle. Enumerator должен это правильно отрабатывать через isIdle markers

Каждый из этих компонентов — пользовательский Java-класс. SplitEnumerator реализует SplitEnumerator<SplitT, CheckpointT>, SourceReader реализует SourceReader<T, SplitT>. Source — фабрика, которая их создаёт: Source.createEnumerator() и Source.createReader().


Что такое Split

Split — это сериализуемое описание единицы работы. Не сами данные, а указатель на них.

Для каждого коннектора split-ы свои:

  • KafkaSourceSplit: TopicPartition + startingOffset + stoppingOffset (или -1 для unbounded). Сериализованный — буквально несколько строк.
  • FileSourceSplit: путь к файлу + start offset + length. Один файл может быть один split (если не разбиваем) или несколько (для splittable форматов типа Parquet).
  • JdbcSourceSplit: SQL-запрос + range (WHERE id BETWEEN 1000 AND 2000).
  • HybridSourceSplit: композитный — wrap другого split-а с metadata.

Сплиты должны быть сериализуемы (SimpleVersionedSerializer<SplitT>), потому что они отправляются по сети от enumerator-а к reader-ам, и сохраняются в checkpoint-ах. В checkpoint попадают активные splits каждого reader-а (которые он сейчас читает) + pending splits в enumerator-е (которые он ещё не назначил).

// Пример: KafkaSourceSplit
public class KafkaSourceSplit implements SourceSplit {
    private final TopicPartition partition;
    private final long startingOffset;
    private final long stoppingOffset; // -1 для unbounded

    @Override
    public String splitId() {
        return partition.topic() + "-" + partition.partition();
    }
}

// Сериализатор — обычно простая бинарная форма
public class KafkaSplitSerializer
    implements SimpleVersionedSerializer<KafkaSourceSplit> {

    @Override
    public byte[] serialize(KafkaSourceSplit split) {
        // topic | partition | startOffset | stopOffset
        DataOutputSerializer out = new DataOutputSerializer(64);
        out.writeUTF(split.getTopicPartition().topic());
        out.writeInt(split.getTopicPartition().partition());
        out.writeLong(split.getStartingOffset());
        out.writeLong(split.getStoppingOffset());
        return out.getCopyOfBuffer();
    }
}

splitId уникален в пределах source-а — он используется для tracking-а в state-ах enumerator-а и reader-а.


Жизненный цикл SplitEnumerator

SplitEnumerator — это долгоживущий объект на JobManager, у которого есть состояние и периодические задачи.

Главные методы:

public class KafkaSourceEnumerator
    implements SplitEnumerator<KafkaSourceSplit, KafkaSourceEnumState> {

    // Вызывается при старте source-а на JM
    @Override
    public void start() {
        // Запускает периодический discovery новых партиций
        context.callAsync(
            this::discoverNewPartitions,
            this::processDiscoveredPartitions,
            0L, // initial delay
            partitionDiscoveryIntervalMs); // periodic
    }

    // Reader просит split — обычно когда у него нет работы
    @Override
    public void handleSplitRequest(int subtaskId, String hostname) {
        if (pendingSplits.isEmpty()) {
            // Сейчас работы нет, пометим reader как ожидающий
            readersAwaitingSplit.put(subtaskId, hostname);
            return;
        }
        KafkaSourceSplit split = pendingSplits.poll();
        context.assignSplit(split, subtaskId);
    }

    // Subtask добавлен (например, после rescale)
    @Override
    public void addReader(int subtaskId) {
        // Можно сразу назначить какие-то splits, или ждать handleSplitRequest
    }

    // Subtask вернул splits (например, при failover)
    @Override
    public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
        pendingSplits.addAll(splits);
        // Перераспределим оставшимся readers
        assignPendingSplits();
    }

    // Snapshot для checkpoint
    @Override
    public KafkaSourceEnumState snapshotState(long checkpointId) {
        return new KafkaSourceEnumState(
            assignedPartitions, pendingSplits, currentOffsets);
    }
}

Discovery работает асинхронно. context.callAsync(supplier, callback) — это паттерн, в котором тяжёлая работа (например, REST-вызов к Kafka admin client для list-а партиций) выполняется в отдельном thread pool-е, а callback вызывается на main mailbox thread-е enumerator-а. Это критично для thread-safety: всё состояние enumerator-а touch-ается только из одного thread-а.

processDiscoveredPartitions — это callback, который обнаружил delta (новые партиции появились / старые удалились), создаёт новые splits и raspределяет их. Здесь же разруливается dynamic partition discovery в Kafka — когда оператор добавляет партиции в топик в runtime, Flink их подхватит без рестарта job.


Жизненный цикл SourceReader

SourceReader — это mailbox-based компонент. Он не запускает свой thread loop; вместо этого pollNext(ReaderOutput) вызывается task-thread-ом TaskManager-а через mailbox executor.

public class KafkaSourceReader implements SourceReader<RowData, KafkaSourceSplit> {

    private final Map<TopicPartition, KafkaConsumer<?, ?>> consumers; // per split
    private final Queue<KafkaSourceSplit> assignedSplits;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds> elementsQueue;
    private final SplitFetcherManager fetcherManager; // background fetcher threads

    @Override
    public void start() {
        // Запрашиваем splits у enumerator-а
        context.sendSplitRequest();
    }

    @Override
    public InputStatus pollNext(ReaderOutput<RowData> output) throws Exception {
        RecordsWithSplitIds records = elementsQueue.poll();
        if (records == null) {
            return InputStatus.NOTHING_AVAILABLE;
        }

        // Эмитим records в pipeline
        SourceOutput<RowData> splitOutput = output.createOutputForSplit(records.splitId());
        while (records.hasNext()) {
            splitOutput.collect(records.next(), records.timestamp());
        }

        return InputStatus.MORE_AVAILABLE;
    }

    @Override
    public void addSplits(List<KafkaSourceSplit> splits) {
        // Передаём в fetcher manager, он откроет consumer-ы
        fetcherManager.addSplits(splits);
    }

    @Override
    public List<KafkaSourceSplit> snapshotState(long checkpointId) {
        // Сериализуем текущие позиции для всех splits
        return fetcherManager.snapshotSplits();
    }
}

Ключевая вещь — pollNext() не блокируется. Если нет данных, она возвращает NOTHING_AVAILABLE, и task-thread может заняться другими делами (mailbox messages, checkpoint barriers, timers). Чтение из Kafka происходит в fetcher threads — отдельных background threads, которые делают poll() к Kafka consumer, складывают RecordsWithSplitIds в FutureCompletingBlockingQueue. pollNext-метод просто забирает оттуда.

Это очень важная архитектурная решение: task thread никогда не блокируется на IO. Это позволяет checkpoint barriers и watermarks обрабатываться вовремя даже когда source медленный.


Unified bounded и unbounded

Старый API имел отдельные SourceFunction (для streaming) и InputFormat (для batch). FLIP-27 объединил это через Boundedness.

public class KafkaSource implements Source<RowData, KafkaSourceSplit, KafkaSourceEnumState> {

    private final Boundedness boundedness;

    @Override
    public Boundedness getBoundedness() {
        return boundedness; // BOUNDED или CONTINUOUS_UNBOUNDED
    }
}

В bounded mode:

  • SplitEnumerator знает финальный набор splits заранее (например, list файлов в директории, или partitions с stopping offset).
  • Когда enumerator раздал все splits и все readers закончили — source emit-ит END_OF_INPUT.
  • Pipeline переключается в batch-режим: операторы могут использовать batch-оптимизации (sort-merge join, no incremental processing).

В unbounded mode:

  • SplitEnumerator периодически discovery-ит новые splits (новые партиции, новые файлы).
  • Readers никогда не получают END_OF_INPUT.
  • Pipeline в streaming-режиме: continuous, с checkpoint-ами, с watermarks.

Один и тот же source может работать в обоих режимах — пользователь выбирает при создании:

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("orders")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setBounded(OffsetsInitializer.latest()) // делает source bounded
    // .setUnbounded(OffsetsInitializer.committedOffsets()) // делает unbounded
    .build();

Это разблокирует один из больших wins Flink 2.x — unified batch and streaming pipelines. Один и тот же job можно запустить и в batch (replay исторических данных), и в streaming-режиме без переписывания source-а.

NOTE

Hybrid source (FLIP-150) — это композиция двух или более sources, где они подхватывают друг друга. Например, сначала читаем 30 дней истории из S3 (bounded), потом без gap-а переключаемся на Kafka (unbounded). HybridSource оборачивает их и передаёт offset-handshake — где остановился первый, оттуда начинает второй. Это работает только в FLIP-27 API.


Координация enumerator плюс reader через OperatorCoordinator

Под капотом коммуникация между SplitEnumerator (JM) и SourceReader (TM) проходит через OperatorCoordinator — generic-механизм Flink для JM-side компоненты любого оператора.

SourceCoordinator (наследник OperatorCoordinator) hosts enumerator. Он принимает события от reader-ов и шлёт им команды:

СобытиеНаправлениеОписание
RequestSplitEventReader -> CoordinatorReader просит split (когда нет работы)
SourceEventWrapperReader bidirectional CoordinatorПроизвольное custom событие (для коннекторов)
ReaderRegistrationEventReader -> CoordinatorReader зарегистрировался (start или после failover)
AddSplitEventCoordinator -> ReaderНазначить splits
NoMoreSplitsEventCoordinator -> ReaderБольше splits не будет (bounded mode конец)

Все эти события сериализуются и передаются через OperatorEvent-механизм, который flow-controlled и ack-ается. JM знает, какой reader получил какие splits — и это знание используется при checkpoint-е и failover-е.

При failover какого-то reader-а (например, его TM упал), его splits возвращаются в enumerator через addSplitsBack, и enumerator перераспределяет их. Это даёт at-least-once seek-back semantics — splits, которые были partially-read, начинаются с last-checkpointed offset.

WARNING

Если ты пишешь custom source и хочешь сделать его performant, имей в виду: handleSplitRequest вызывается каждый раз когда reader idle. Если у тебя 256 readers, каждый дёргает enumerator при пустой очереди — это spam. Enumerator должен быстро обрабатывать requests или batch-ить распределение. Лучше: enumerator сам proактивно push-ит splits через assignSplit, не дожидаясь requests.


Checkpoint coordination

Когда SplitEnumerator делает snapshot, он сохраняет:

  • Pending splits — splits, которые ещё не назначены readers.
  • Assigned splits per subtask — для tracking-а, чтобы знать, у кого что.
  • Discovery state — например, в Kafka это набор уже-обнаруженных партиций (чтобы не назначить второй раз).

SourceReader сохраняет:

  • In-progress splits — splits, которые он сейчас читает, с current offset / position в каждом.
  • Finished splits — для bounded mode, чтобы знать что не запрашивать снова.

При restore:

  • Enumerator восстанавливает свое состояние из checkpoint-а.
  • Readers получают свои in-progress splits через addSplits.
  • Discovery продолжается с того места, где остановилось.

Эта симметрия — основа exactly-once в source layer. Combined с two-phase commit на sink-стороне, ты получаешь end-to-end exactly-once семантику. Подробнее про sink — следующий урок про FLIP-143.


Production-перспектива: разработка нового коннектора

Если ты пишешь коннектор к новому хранилищу (например, к ClickHouse, к Cassandra read, к Redis Streams), Source V2 даёт тебе чёткий контракт:

  1. Реши, что у тебя split — единица работы.
  2. Напиши enumerator: discovery + assignment.
  3. Напиши reader: для каждого split — как открыть, как читать, как закрыть.
  4. Напиши сериализаторы для splits и enum-state.
  5. Тестируй с MiniCluster — Flink-абстракция запуска mini-кластера в-process.

Для большинства коннекторов есть SourceReaderBase — абстрактный базовый класс, который делает за тебя всю mailbox-работу. Ты пишешь только SplitReader (per-split fetcher) и RecordEmitter (как convert raw record в pipeline record). Это reduces boilerplate на 60-70%.

Production-checklist:

  • Idle markers: если reader не имеет работы (нет splits) — он должен пометить себя idle через output.markIdle(), иначе watermark stuck (см. урок watermark-coordination-source).
  • Backpressure: если pipeline медленнее source-а, в FutureCompletingBlockingQueue накапливаются records. Reader должен ограничивать fetcher через pause() / resume().
  • Metrics: registry numRecordsIn, currentFetchEventTimeLag, currentEmitEventTimeLag через SourceReaderMetricGroup.
  • Configuration: parallelism, partition.discovery.interval, fetch.batch.size — стандартизированные ключи.

Дальше

Этот урок дал верхнеуровневый обзор архитектуры. В следующих уроках модуля 13:

  • 02-source-reader-internals: deep-dive в mailbox model, как pollNext interleaves с checkpoint barriers, fetcher threads.
  • 03-watermark-coordination-source: как watermarks генерируются per-split, как enumerator handles idle splits.
  • 04-sink-v2-and-custom: симметричная картина для sinks — Writer, Committer, GlobalCommitter.
Проверка знанийKnowledge check
Вы пишете custom source к внешнему system X. Pipeline parallelism = 32. X имеет 8 shards, которые ты mapping-уешь 1-к-1 в splits. SplitEnumerator после старта раздаёт 8 splits — 8 readers получают по одному, 24 reader-а остаются без splits. После какого-то времени pipeline watermark stuck. Что не так, и как починить?
ОтветAnswer
Проблема: 24 reader-а без splits не помечены как idle. Flink watermark agreement требует, чтобы каждый input channel либо advance-ил watermark, либо был помечен idle. Без markIdle() эти 24 reader-а считаются active, но их watermark вечно остаётся в Long.MIN_VALUE (initial). Watermark в downstream-операторе = min(всех input watermarks) = Long.MIN_VALUE — stuck. Решение: enumerator после распределения splits должен notify reader-ов, у которых splits = 0, что им работы не будет. Reader при receive NoMoreSplitsEvent (если bounded) или при детекции "splits=0 и не ждёт новых" должен вызвать output.markIdle(). Когда (или если) enumerator потом даст этому reader split — markActive() автоматически. Стандартный SourceReaderBase делает это автоматически если ты правильно используешь его API. В custom-реализациях это типичная бага. Кроме того: если у тебя 8 splits на 32 reader-а навсегда, лучше задать parallelism = 8, это будет проще для debugging и cleaner.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Где физически живёт SplitEnumerator в архитектуре Flink с FLIP-27 source-ом?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4