Source V2 API: KafkaSource и watermarks
Источник — точка входа данных в Flink. В Flink 1.x был SourceFunction; в 2.x он удалён, и единственный путь — Source V2 API. Этот урок — про практическое использование Source V2 на примере KafkaSource (самый распространённый source в production), с правильной настройкой watermark strategy.
К концу урока вы будете уметь подключать KafkaSource с правильной конфигурацией, понимать разницу между bounded и unbounded режимами, и настраивать watermarks для event time.
Что такое Source V2
Source V2 — это API, появившийся в Flink 1.11 как experimental и стабилизированный в 1.15. С 2.0 — единственный официальный путь. Он заменил три legacy API сразу: SourceFunction, RichSourceFunction, и InputFormat.
Архитектура Source V2 разделена на три компонента:
SplitEnumerator (на JobManager)
SplitEnumerator: работает в JobManager (single instance). Решает, какие 'splits' (порции работы) есть в source. Для Kafka split = partition. Для FileSource split = файл или часть файла.SourceReader 1
SourceReader 1: работает в TaskManager subtask. Получает splits от enumerator, читает данные из них, emit'ит в downstream.SourceReader 2
SourceReader 2: параллельный экземпляр SourceReader. Каждый subtask source имеет свой Reader. Получает свои splits.SourceReader N
SourceReader N: для parallelism N. Enumerator распределяет splits между readers (для Kafka — round-robin или sticky assignment).Зачем это сделано так:
- Splits — first-class concept. Enumerator знает все splits, может перераспределять при rescaling.
- Bounded и unbounded в одном API. Source V2 поддерживает оба режима — для bounded enumerator перестаёт выдавать splits, для unbounded — продолжает.
- Watermarks правильно. Источник эмитит watermarks per-split (per-partition в Kafka), что важно для корректной обработки.
- Поддержка checkpointing встроена. Enumerator и Reader сохраняют свой state — splits assigned, read position.
Для пользователя API чище: вы пишете env.fromSource(source, watermarkStrategy, name), и всё.
KafkaSource: самый частый source
Kafka Consumer API: под капотом KafkaSourceKafkaSource — наиболее популярный source в production. Он покрывает все типичные паттерны: чтение топика, специфические offsets, watermarks из event time.
Минимальный пример
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-events")
.setGroupId("my-job")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"Kafka Source"
);
Это читает топик input-events с начала, без watermarks. Каждая запись — String value (без key).
Все важные параметры
KafkaSource<MyEvent> source = KafkaSource.<MyEvent>builder()
// обязательные
.setBootstrapServers("kafka1:9092,kafka2:9092,kafka3:9092") // список брокеров
.setTopics("topic-1", "topic-2") // один или несколько топиков
// ИЛИ можно по паттерну
// .setTopicPattern("user-events-.*")
.setGroupId("my-flink-job") // consumer group ID
// starting position
.setStartingOffsets(OffsetsInitializer.earliest())
// варианты:
// OffsetsInitializer.latest()
// OffsetsInitializer.timestamp(System.currentTimeMillis() - 3600_000L)
// OffsetsInitializer.committedOffsets()
// OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
// deserializer
.setValueOnlyDeserializer(new MyEventDeserializer())
// ИЛИ с keys:
// .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializer()))
// bounded mode (для batch-like processing с Kafka)
// .setBounded(OffsetsInitializer.latest()) // прочитать до латест и остановиться
// properties для Kafka consumer (passed напрямую)
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperty("partition.discovery.interval.ms", "30000")
.build();
Important параметры:
- groupId: важно ставить осмысленный ID. Если несколько Flink-jobs читают тот же топик — каждому свой groupId.
- startingOffsets: куда начать читать при первом запуске (без savepoint). После savepoint этот параметр игнорируется — позиция восстанавливается из state.
- partition.discovery.interval.ms: KafkaSource периодически проверяет новые партиции в топике. Если в production добавили партиции — Source автоматически их подхватит.
KafkaSource НЕ использует Kafka consumer group для координации. Группа hartbeats с Kafka — disabled. Координация — через Flink JobManager. Это значит: вы НЕ увидите Flink consumer в kafka-consumer-groups.sh --list. Offsets хранятся в Flink checkpoints, не в Kafka __consumer_offsets (хотя можно опционально коммитить в Kafka для observability).
OffsetsInitializer варианты
| Initializer | Когда использовать |
|---|---|
earliest() | Прочитать всю историю топика с offset 0 |
latest() | Только новые события с момента старта job |
timestamp(ts) | С определённого момента времени (Kafka >= 0.10) |
committedOffsets() | Использовать last committed offset; если нет — ERROR |
committedOffsets(EARLIEST) | Использовать committed; если нет — start with earliest |
committedOffsets(LATEST) | Использовать committed; если нет — start with latest |
offsets(Map<TopicPartition, Long>) | Конкретные offsets per partition (custom) |
committedOffsets(EARLIEST) — самый безопасный default, особенно для миграции job с одного Flink-кластера на другой.
Bounded vs Unbounded
KafkaSource по умолчанию unbounded — читает forever. Это natural для streaming jobs.
Но можно сделать bounded — прочитать до определённой позиции и остановиться. Это batch-mode для Kafka:
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka:9092")
.setTopics("input-events")
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest()) // read until latest, then stop
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
Когда полезно:
- Re-process all data с начала топика для recompute (Kappa Architecture).
- Batch ETL из Kafka в lakehouse.
- Тестирование на финитном куске данных.
Поведение: source читает до bounded offset на каждой партиции. Когда все партиции достигли своих bounded offsets — source закрывается, job переходит в FINISHED state.
Watermarks: event time strategy
Если ваш job работает в event time (что для большинства production случаев так), вы должны настроить watermark strategy на source.
Что такое watermark
Watermark — это monotonic timestamp, который Flink интерпретирует как “я уже видел все события с timestamp <= W”. Watermark распространяется через pipeline и триггерит windowing.
Без watermarks event time не работает — Flink не знает, когда window “закрыть”.
Базовые стратегии
No watermarks (только для processing time или watermark-agnostic):
WatermarkStrategy<MyEvent> ws = WatermarkStrategy.noWatermarks();
Monotonic timestamps (когда события приходят строго в order):
WatermarkStrategy<MyEvent> ws = WatermarkStrategy
.<MyEvent>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.getEventTime());
Bounded out-of-orderness (типичный production случай):
WatermarkStrategy<MyEvent> ws = WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime());
Это значит: “Я допускаю out-of-orderness до 5 секунд”. Watermark = max_seen_timestamp - 5s. События позже watermark считаются late.
Полный пример с KafkaSource
KafkaSource<UserEvent> source = KafkaSource.<UserEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("user-events")
.setGroupId("event-aggregator")
.setStartingOffsets(OffsetsInitializer.committedOffsets(
OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new UserEventAvroDeserializer())
.build();
WatermarkStrategy<UserEvent> ws = WatermarkStrategy
.<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getTimestampMillis())
.withIdleness(Duration.ofMinutes(1)); // если партиция не получает события 1 минуту,
// не блокируем watermark из-за неё
DataStream<UserEvent> stream = env.fromSource(
source,
ws,
"User Events from Kafka"
);
withIdleness — критичная фича. Если одна Kafka-партиция не получает события (например, малопопулярные user IDs), watermark “застрянет” — Flink будет ждать события из этой партиции. withIdleness(Duration.ofMinutes(1)) говорит: “Если партиция молчит 1 минуту, считай её idle, не учитывай при расчёте watermark.”
Без withIdleness в production вы регулярно столкнётесь с проблемой “windows не закрываются”. Симптом: watermark на одной партиции стоит на месте, общий watermark = min(per-partition) = тоже стоит, окна не emit’ятся. withIdleness — must-have в KafkaSource для большинства cases.
Альтернативные sources
Помимо KafkaSource, Flink имеет другие официальные sources:
FileSource — для чтения файлов (один раз или continuous):
FileSource<String> source = FileSource
.forRecordStreamFormat(new TextLineInputFormat(),
new Path("s3://my-bucket/events/"))
.monitorContinuously(Duration.ofSeconds(10)) // continuous mode
.build();
JdbcSource — для чтения из RDBMS (bounded, для batch):
JdbcSource<Row> source = JdbcSource.<Row>builder()
.setDBUrl("jdbc:postgresql://localhost:5432/mydb")
.setQuery("SELECT id, name FROM users")
.setRowConverter(...)
.build();
PulsarSource — для Apache Pulsar.
KinesisSource — для AWS Kinesis.
Custom source — если ничего из готового не подходит, можно написать свой через Source V2 interface. Но обычно это редко нужно — community/vendor connectors покрывают все основные сценарии.
Production checklist для KafkaSource
Перед деплоем в production проверьте:
- Bootstrap servers — production cluster (не dev). Несколько брокеров для failover.
- groupId уникальный для этого job.
- startingOffsets =
committedOffsets(EARLIEST)— для безопасного start. - commit.offsets.on.checkpoint = true — для observability в Kafka (увидите Flink consumer в
kafka-consumer-groups). - withIdleness настроен (1-5 минут типично) — иначе watermarks “застревают”.
- forBoundedOutOfOrderness с разумным интервалом (5-30 секунд для большинства cases).
- Deserializer — без NPE на malformed messages. Лучше — schema-aware (Avro, Protobuf).
- partition.discovery.interval.ms — 30-60 секунд, если ожидаете добавление партиций.
- SSL/SASL конфиги — если Kafka secured (модуль 16).
Попробуй сам
Расширьте WordCount из урока 00.3:
- Замените SQL-based WordCount на Java DataStream API с KafkaSource. Используйте код из примера выше как стартовую точку.
- Добавьте watermark strategy с forBoundedOutOfOrderness(5s) и withIdleness(1m).
- Запустите. Проверьте Web UI — Source оператор должен показывать watermark в Watermarks tab. Какое значение?
- Отправьте 100 событий с фиксированным timestamp. Что показывает watermark? Что если отправите событие с timestamp на 10 секунд раньше предыдущего?
- Bonus: создайте топик с 6 партициями, но отправляйте события только в 2 из них. Без
withIdleness— что произойдёт с watermark? СwithIdleness— что?