FLIP-27 Source V2: SplitEnumerator плюс SourceReader
Старый SourceFunction (теперь deprecated) был одним из самых проблемных API в Flink. Это просто run(SourceContext) метод, который ты должен был как-то распараллелить, синхронизировать с checkpoint barriers, обработать watermarks, разделить нагрузку между subtasks. Каждый коннектор изобретал свой подход. Kafka, Kinesis, File source — все по-разному. FLIP-27 (введён в Flink 1.11, основной API с Flink 1.14, в 2.2 — единственный поддерживаемый) разделил source на две роли с чёткой ответственностью.
Этот урок — про то, как устроен Source V2 на уровне процессов: что физически живёт в JobManager, что в TaskManager, как они общаются, как один и тот же API даёт и bounded (batch), и unbounded (streaming) семантику.
KafkaSource и watermarks через Source V2 API Consumer Groups: параллельное чтение из KafkaДве роли: discovery и consumption
Главная идея FLIP-27 — разделить открытие источников данных (discovery) и чтение данных (consumption).
В старом API эти две задачи смешивались внутри одного SourceFunction. Например, FlinkKafkaConsumer сам открывал topic, обнаруживал партиции, назначал их subtasks через сложную логику в open(), и сам же читал данные в run(). В результате — если ты хотел сделать свой коннектор, тебе нужно было повторить всю эту машинерию.
FLIP-27 разделил это:
SplitEnumerator — централизованная компонента, живёт в одной инстанции на JobManager (внутри JobMaster). Её задача — обнаружить splits (логические единицы работы: партиция Kafka, файл, range в БД), и назначать их readers. Она знает обо всём source-е целиком.
SourceReader — параллельные инстанции, по одной на subtask. Живут на TaskManagers. Получают splits, читают данные, эмитят records в pipeline. Они не знают друг о друге — каждый видит только свои splits.
Каждый из этих компонентов — пользовательский Java-класс. SplitEnumerator реализует SplitEnumerator<SplitT, CheckpointT>, SourceReader реализует SourceReader<T, SplitT>. Source — фабрика, которая их создаёт: Source.createEnumerator() и Source.createReader().
Что такое Split
Split — это сериализуемое описание единицы работы. Не сами данные, а указатель на них.
Для каждого коннектора split-ы свои:
- KafkaSourceSplit: TopicPartition + startingOffset + stoppingOffset (или -1 для unbounded). Сериализованный — буквально несколько строк.
- FileSourceSplit: путь к файлу + start offset + length. Один файл может быть один split (если не разбиваем) или несколько (для splittable форматов типа Parquet).
- JdbcSourceSplit: SQL-запрос + range (
WHERE id BETWEEN 1000 AND 2000). - HybridSourceSplit: композитный — wrap другого split-а с metadata.
Сплиты должны быть сериализуемы (SimpleVersionedSerializer<SplitT>), потому что они отправляются по сети от enumerator-а к reader-ам, и сохраняются в checkpoint-ах. В checkpoint попадают активные splits каждого reader-а (которые он сейчас читает) + pending splits в enumerator-е (которые он ещё не назначил).
// Пример: KafkaSourceSplit
public class KafkaSourceSplit implements SourceSplit {
private final TopicPartition partition;
private final long startingOffset;
private final long stoppingOffset; // -1 для unbounded
@Override
public String splitId() {
return partition.topic() + "-" + partition.partition();
}
}
// Сериализатор — обычно простая бинарная форма
public class KafkaSplitSerializer
implements SimpleVersionedSerializer<KafkaSourceSplit> {
@Override
public byte[] serialize(KafkaSourceSplit split) {
// topic | partition | startOffset | stopOffset
DataOutputSerializer out = new DataOutputSerializer(64);
out.writeUTF(split.getTopicPartition().topic());
out.writeInt(split.getTopicPartition().partition());
out.writeLong(split.getStartingOffset());
out.writeLong(split.getStoppingOffset());
return out.getCopyOfBuffer();
}
}
splitId уникален в пределах source-а — он используется для tracking-а в state-ах enumerator-а и reader-а.
Жизненный цикл SplitEnumerator
SplitEnumerator — это долгоживущий объект на JobManager, у которого есть состояние и периодические задачи.
Главные методы:
public class KafkaSourceEnumerator
implements SplitEnumerator<KafkaSourceSplit, KafkaSourceEnumState> {
// Вызывается при старте source-а на JM
@Override
public void start() {
// Запускает периодический discovery новых партиций
context.callAsync(
this::discoverNewPartitions,
this::processDiscoveredPartitions,
0L, // initial delay
partitionDiscoveryIntervalMs); // periodic
}
// Reader просит split — обычно когда у него нет работы
@Override
public void handleSplitRequest(int subtaskId, String hostname) {
if (pendingSplits.isEmpty()) {
// Сейчас работы нет, пометим reader как ожидающий
readersAwaitingSplit.put(subtaskId, hostname);
return;
}
KafkaSourceSplit split = pendingSplits.poll();
context.assignSplit(split, subtaskId);
}
// Subtask добавлен (например, после rescale)
@Override
public void addReader(int subtaskId) {
// Можно сразу назначить какие-то splits, или ждать handleSplitRequest
}
// Subtask вернул splits (например, при failover)
@Override
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
pendingSplits.addAll(splits);
// Перераспределим оставшимся readers
assignPendingSplits();
}
// Snapshot для checkpoint
@Override
public KafkaSourceEnumState snapshotState(long checkpointId) {
return new KafkaSourceEnumState(
assignedPartitions, pendingSplits, currentOffsets);
}
}
Discovery работает асинхронно. context.callAsync(supplier, callback) — это паттерн, в котором тяжёлая работа (например, REST-вызов к Kafka admin client для list-а партиций) выполняется в отдельном thread pool-е, а callback вызывается на main mailbox thread-е enumerator-а. Это критично для thread-safety: всё состояние enumerator-а touch-ается только из одного thread-а.
processDiscoveredPartitions — это callback, который обнаружил delta (новые партиции появились / старые удалились), создаёт новые splits и raspределяет их. Здесь же разруливается dynamic partition discovery в Kafka — когда оператор добавляет партиции в топик в runtime, Flink их подхватит без рестарта job.
Жизненный цикл SourceReader
SourceReader — это mailbox-based компонент. Он не запускает свой thread loop; вместо этого pollNext(ReaderOutput) вызывается task-thread-ом TaskManager-а через mailbox executor.
public class KafkaSourceReader implements SourceReader<RowData, KafkaSourceSplit> {
private final Map<TopicPartition, KafkaConsumer<?, ?>> consumers; // per split
private final Queue<KafkaSourceSplit> assignedSplits;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds> elementsQueue;
private final SplitFetcherManager fetcherManager; // background fetcher threads
@Override
public void start() {
// Запрашиваем splits у enumerator-а
context.sendSplitRequest();
}
@Override
public InputStatus pollNext(ReaderOutput<RowData> output) throws Exception {
RecordsWithSplitIds records = elementsQueue.poll();
if (records == null) {
return InputStatus.NOTHING_AVAILABLE;
}
// Эмитим records в pipeline
SourceOutput<RowData> splitOutput = output.createOutputForSplit(records.splitId());
while (records.hasNext()) {
splitOutput.collect(records.next(), records.timestamp());
}
return InputStatus.MORE_AVAILABLE;
}
@Override
public void addSplits(List<KafkaSourceSplit> splits) {
// Передаём в fetcher manager, он откроет consumer-ы
fetcherManager.addSplits(splits);
}
@Override
public List<KafkaSourceSplit> snapshotState(long checkpointId) {
// Сериализуем текущие позиции для всех splits
return fetcherManager.snapshotSplits();
}
}
Ключевая вещь — pollNext() не блокируется. Если нет данных, она возвращает NOTHING_AVAILABLE, и task-thread может заняться другими делами (mailbox messages, checkpoint barriers, timers). Чтение из Kafka происходит в fetcher threads — отдельных background threads, которые делают poll() к Kafka consumer, складывают RecordsWithSplitIds в FutureCompletingBlockingQueue. pollNext-метод просто забирает оттуда.
Это очень важная архитектурная решение: task thread никогда не блокируется на IO. Это позволяет checkpoint barriers и watermarks обрабатываться вовремя даже когда source медленный.
Unified bounded и unbounded
Старый API имел отдельные SourceFunction (для streaming) и InputFormat (для batch). FLIP-27 объединил это через Boundedness.
public class KafkaSource implements Source<RowData, KafkaSourceSplit, KafkaSourceEnumState> {
private final Boundedness boundedness;
@Override
public Boundedness getBoundedness() {
return boundedness; // BOUNDED или CONTINUOUS_UNBOUNDED
}
}
В bounded mode:
- SplitEnumerator знает финальный набор splits заранее (например, list файлов в директории, или partitions с stopping offset).
- Когда enumerator раздал все splits и все readers закончили — source emit-ит
END_OF_INPUT. - Pipeline переключается в batch-режим: операторы могут использовать batch-оптимизации (sort-merge join, no incremental processing).
В unbounded mode:
- SplitEnumerator периодически discovery-ит новые splits (новые партиции, новые файлы).
- Readers никогда не получают
END_OF_INPUT. - Pipeline в streaming-режиме: continuous, с checkpoint-ами, с watermarks.
Один и тот же source может работать в обоих режимах — пользователь выбирает при создании:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("orders")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest()) // делает source bounded
// .setUnbounded(OffsetsInitializer.committedOffsets()) // делает unbounded
.build();
Это разблокирует один из больших wins Flink 2.x — unified batch and streaming pipelines. Один и тот же job можно запустить и в batch (replay исторических данных), и в streaming-режиме без переписывания source-а.
Hybrid source (FLIP-150) — это композиция двух или более sources, где они подхватывают друг друга. Например, сначала читаем 30 дней истории из S3 (bounded), потом без gap-а переключаемся на Kafka (unbounded). HybridSource оборачивает их и передаёт offset-handshake — где остановился первый, оттуда начинает второй. Это работает только в FLIP-27 API.
Координация enumerator плюс reader через OperatorCoordinator
Под капотом коммуникация между SplitEnumerator (JM) и SourceReader (TM) проходит через OperatorCoordinator — generic-механизм Flink для JM-side компоненты любого оператора.
SourceCoordinator (наследник OperatorCoordinator) hosts enumerator. Он принимает события от reader-ов и шлёт им команды:
| Событие | Направление | Описание |
|---|---|---|
RequestSplitEvent | Reader -> Coordinator | Reader просит split (когда нет работы) |
SourceEventWrapper | Reader bidirectional Coordinator | Произвольное custom событие (для коннекторов) |
ReaderRegistrationEvent | Reader -> Coordinator | Reader зарегистрировался (start или после failover) |
AddSplitEvent | Coordinator -> Reader | Назначить splits |
NoMoreSplitsEvent | Coordinator -> Reader | Больше splits не будет (bounded mode конец) |
Все эти события сериализуются и передаются через OperatorEvent-механизм, который flow-controlled и ack-ается. JM знает, какой reader получил какие splits — и это знание используется при checkpoint-е и failover-е.
При failover какого-то reader-а (например, его TM упал), его splits возвращаются в enumerator через addSplitsBack, и enumerator перераспределяет их. Это даёт at-least-once seek-back semantics — splits, которые были partially-read, начинаются с last-checkpointed offset.
Если ты пишешь custom source и хочешь сделать его performant, имей в виду: handleSplitRequest вызывается каждый раз когда reader idle. Если у тебя 256 readers, каждый дёргает enumerator при пустой очереди — это spam. Enumerator должен быстро обрабатывать requests или batch-ить распределение. Лучше: enumerator сам proактивно push-ит splits через assignSplit, не дожидаясь requests.
Checkpoint coordination
Когда SplitEnumerator делает snapshot, он сохраняет:
- Pending splits — splits, которые ещё не назначены readers.
- Assigned splits per subtask — для tracking-а, чтобы знать, у кого что.
- Discovery state — например, в Kafka это набор уже-обнаруженных партиций (чтобы не назначить второй раз).
SourceReader сохраняет:
- In-progress splits — splits, которые он сейчас читает, с current offset / position в каждом.
- Finished splits — для bounded mode, чтобы знать что не запрашивать снова.
При restore:
- Enumerator восстанавливает свое состояние из checkpoint-а.
- Readers получают свои in-progress splits через
addSplits. - Discovery продолжается с того места, где остановилось.
Эта симметрия — основа exactly-once в source layer. Combined с two-phase commit на sink-стороне, ты получаешь end-to-end exactly-once семантику. Подробнее про sink — следующий урок про FLIP-143.
Production-перспектива: разработка нового коннектора
Если ты пишешь коннектор к новому хранилищу (например, к ClickHouse, к Cassandra read, к Redis Streams), Source V2 даёт тебе чёткий контракт:
- Реши, что у тебя split — единица работы.
- Напиши enumerator: discovery + assignment.
- Напиши reader: для каждого split — как открыть, как читать, как закрыть.
- Напиши сериализаторы для splits и enum-state.
- Тестируй с MiniCluster — Flink-абстракция запуска mini-кластера в-process.
Для большинства коннекторов есть SourceReaderBase — абстрактный базовый класс, который делает за тебя всю mailbox-работу. Ты пишешь только SplitReader (per-split fetcher) и RecordEmitter (как convert raw record в pipeline record). Это reduces boilerplate на 60-70%.
Production-checklist:
- Idle markers: если reader не имеет работы (нет splits) — он должен пометить себя idle через
output.markIdle(), иначе watermark stuck (см. урок watermark-coordination-source). - Backpressure: если pipeline медленнее source-а, в
FutureCompletingBlockingQueueнакапливаются records. Reader должен ограничивать fetcher черезpause()/resume(). - Metrics: registry
numRecordsIn,currentFetchEventTimeLag,currentEmitEventTimeLagчерезSourceReaderMetricGroup. - Configuration: parallelism, partition.discovery.interval, fetch.batch.size — стандартизированные ключи.
Дальше
Этот урок дал верхнеуровневый обзор архитектуры. В следующих уроках модуля 13:
- 02-source-reader-internals: deep-dive в mailbox model, как
pollNextinterleaves с checkpoint barriers, fetcher threads. - 03-watermark-coordination-source: как watermarks генерируются per-split, как enumerator handles idle splits.
- 04-sink-v2-and-custom: симметричная картина для sinks — Writer, Committer, GlobalCommitter.