Learning Platform
Глоссарий Troubleshooting
Урок 04.01 · 20 мин
Средний
Source V2KafkaSourceWatermarksOffsetsInitializer

Source V2 API: KafkaSource и watermarks

Источник — точка входа данных в Flink. В Flink 1.x был SourceFunction; в 2.x он удалён, и единственный путь — Source V2 API. Этот урок — про практическое использование Source V2 на примере KafkaSource (самый распространённый source в production), с правильной настройкой watermark strategy.

К концу урока вы будете уметь подключать KafkaSource с правильной конфигурацией, понимать разницу между bounded и unbounded режимами, и настраивать watermarks для event time.


Что такое Source V2

Source V2 — это API, появившийся в Flink 1.11 как experimental и стабилизированный в 1.15. С 2.0 — единственный официальный путь. Он заменил три legacy API сразу: SourceFunction, RichSourceFunction, и InputFormat.

Архитектура Source V2 разделена на три компонента:

Source V2: разделение ответственности

SplitEnumerator (на JobManager)

SplitEnumerator: работает в JobManager (single instance). Решает, какие 'splits' (порции работы) есть в source. Для Kafka split = partition. Для FileSource split = файл или часть файла.
assigns splits to readers

SourceReader 1

SourceReader 1: работает в TaskManager subtask. Получает splits от enumerator, читает данные из них, emit'ит в downstream.

SourceReader 2

SourceReader 2: параллельный экземпляр SourceReader. Каждый subtask source имеет свой Reader. Получает свои splits.

SourceReader N

SourceReader N: для parallelism N. Enumerator распределяет splits между readers (для Kafka — round-robin или sticky assignment).
emit records
DataStream of recordsОбъединённый поток данных из всех readers. Дальше — downstream operators (map, filter, keyBy, и т.д.).

Зачем это сделано так:

  • Splits — first-class concept. Enumerator знает все splits, может перераспределять при rescaling.
  • Bounded и unbounded в одном API. Source V2 поддерживает оба режима — для bounded enumerator перестаёт выдавать splits, для unbounded — продолжает.
  • Watermarks правильно. Источник эмитит watermarks per-split (per-partition в Kafka), что важно для корректной обработки.
  • Поддержка checkpointing встроена. Enumerator и Reader сохраняют свой state — splits assigned, read position.

Для пользователя API чище: вы пишете env.fromSource(source, watermarkStrategy, name), и всё.


KafkaSource: самый частый source

Kafka Consumer API: под капотом KafkaSource

KafkaSource — наиболее популярный source в production. Он покрывает все типичные паттерны: чтение топика, специфические offsets, watermarks из event time.

Минимальный пример

import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input-events")
    .setGroupId("my-job")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "Kafka Source"
);

Это читает топик input-events с начала, без watermarks. Каждая запись — String value (без key).

Все важные параметры

KafkaSource<MyEvent> source = KafkaSource.<MyEvent>builder()
    // обязательные
    .setBootstrapServers("kafka1:9092,kafka2:9092,kafka3:9092")  // список брокеров
    .setTopics("topic-1", "topic-2")  // один или несколько топиков
    // ИЛИ можно по паттерну
    // .setTopicPattern("user-events-.*")
    .setGroupId("my-flink-job")  // consumer group ID
    
    // starting position
    .setStartingOffsets(OffsetsInitializer.earliest())
    // варианты:
    // OffsetsInitializer.latest()
    // OffsetsInitializer.timestamp(System.currentTimeMillis() - 3600_000L)
    // OffsetsInitializer.committedOffsets()
    // OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)
    
    // deserializer
    .setValueOnlyDeserializer(new MyEventDeserializer())
    // ИЛИ с keys:
    // .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializer()))
    
    // bounded mode (для batch-like processing с Kafka)
    // .setBounded(OffsetsInitializer.latest())  // прочитать до латест и остановиться
    
    // properties для Kafka consumer (passed напрямую)
    .setProperty("commit.offsets.on.checkpoint", "true")
    .setProperty("partition.discovery.interval.ms", "30000")
    
    .build();

Important параметры:

  • groupId: важно ставить осмысленный ID. Если несколько Flink-jobs читают тот же топик — каждому свой groupId.
  • startingOffsets: куда начать читать при первом запуске (без savepoint). После savepoint этот параметр игнорируется — позиция восстанавливается из state.
  • partition.discovery.interval.ms: KafkaSource периодически проверяет новые партиции в топике. Если в production добавили партиции — Source автоматически их подхватит.
WARNING

KafkaSource НЕ использует Kafka consumer group для координации. Группа hartbeats с Kafka — disabled. Координация — через Flink JobManager. Это значит: вы НЕ увидите Flink consumer в kafka-consumer-groups.sh --list. Offsets хранятся в Flink checkpoints, не в Kafka __consumer_offsets (хотя можно опционально коммитить в Kafka для observability).

OffsetsInitializer варианты

InitializerКогда использовать
earliest()Прочитать всю историю топика с offset 0
latest()Только новые события с момента старта job
timestamp(ts)С определённого момента времени (Kafka >= 0.10)
committedOffsets()Использовать last committed offset; если нет — ERROR
committedOffsets(EARLIEST)Использовать committed; если нет — start with earliest
committedOffsets(LATEST)Использовать committed; если нет — start with latest
offsets(Map<TopicPartition, Long>)Конкретные offsets per partition (custom)

committedOffsets(EARLIEST) — самый безопасный default, особенно для миграции job с одного Flink-кластера на другой.


Bounded vs Unbounded

KafkaSource по умолчанию unbounded — читает forever. Это natural для streaming jobs.

Но можно сделать bounded — прочитать до определённой позиции и остановиться. Это batch-mode для Kafka:

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("input-events")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setBounded(OffsetsInitializer.latest())  // read until latest, then stop
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

Когда полезно:

  • Re-process all data с начала топика для recompute (Kappa Architecture).
  • Batch ETL из Kafka в lakehouse.
  • Тестирование на финитном куске данных.

Поведение: source читает до bounded offset на каждой партиции. Когда все партиции достигли своих bounded offsets — source закрывается, job переходит в FINISHED state.


Watermarks: event time strategy

Если ваш job работает в event time (что для большинства production случаев так), вы должны настроить watermark strategy на source.

Что такое watermark

Watermark — это monotonic timestamp, который Flink интерпретирует как “я уже видел все события с timestamp <= W”. Watermark распространяется через pipeline и триггерит windowing.

Без watermarks event time не работает — Flink не знает, когда window “закрыть”.

Базовые стратегии

No watermarks (только для processing time или watermark-agnostic):

WatermarkStrategy<MyEvent> ws = WatermarkStrategy.noWatermarks();

Monotonic timestamps (когда события приходят строго в order):

WatermarkStrategy<MyEvent> ws = WatermarkStrategy
    .<MyEvent>forMonotonousTimestamps()
    .withTimestampAssigner((event, ts) -> event.getEventTime());

Bounded out-of-orderness (типичный production случай):

WatermarkStrategy<MyEvent> ws = WatermarkStrategy
    .<MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.getEventTime());

Это значит: “Я допускаю out-of-orderness до 5 секунд”. Watermark = max_seen_timestamp - 5s. События позже watermark считаются late.

Полный пример с KafkaSource

KafkaSource<UserEvent> source = KafkaSource.<UserEvent>builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("user-events")
    .setGroupId("event-aggregator")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(
        OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new UserEventAvroDeserializer())
    .build();

WatermarkStrategy<UserEvent> ws = WatermarkStrategy
    .<UserEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((event, ts) -> event.getTimestampMillis())
    .withIdleness(Duration.ofMinutes(1));  // если партиция не получает события 1 минуту,
                                            // не блокируем watermark из-за неё

DataStream<UserEvent> stream = env.fromSource(
    source,
    ws,
    "User Events from Kafka"
);

withIdleness — критичная фича. Если одна Kafka-партиция не получает события (например, малопопулярные user IDs), watermark “застрянет” — Flink будет ждать события из этой партиции. withIdleness(Duration.ofMinutes(1)) говорит: “Если партиция молчит 1 минуту, считай её idle, не учитывай при расчёте watermark.”

TIP

Без withIdleness в production вы регулярно столкнётесь с проблемой “windows не закрываются”. Симптом: watermark на одной партиции стоит на месте, общий watermark = min(per-partition) = тоже стоит, окна не emit’ятся. withIdleness — must-have в KafkaSource для большинства cases.


Альтернативные sources

Помимо KafkaSource, Flink имеет другие официальные sources:

FileSource — для чтения файлов (один раз или continuous):

FileSource<String> source = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), 
                           new Path("s3://my-bucket/events/"))
    .monitorContinuously(Duration.ofSeconds(10))  // continuous mode
    .build();

JdbcSource — для чтения из RDBMS (bounded, для batch):

JdbcSource<Row> source = JdbcSource.<Row>builder()
    .setDBUrl("jdbc:postgresql://localhost:5432/mydb")
    .setQuery("SELECT id, name FROM users")
    .setRowConverter(...)
    .build();

PulsarSource — для Apache Pulsar.

KinesisSource — для AWS Kinesis.

Custom source — если ничего из готового не подходит, можно написать свой через Source V2 interface. Но обычно это редко нужно — community/vendor connectors покрывают все основные сценарии.


Production checklist для KafkaSource

Перед деплоем в production проверьте:

  1. Bootstrap servers — production cluster (не dev). Несколько брокеров для failover.
  2. groupId уникальный для этого job.
  3. startingOffsets = committedOffsets(EARLIEST) — для безопасного start.
  4. commit.offsets.on.checkpoint = true — для observability в Kafka (увидите Flink consumer в kafka-consumer-groups).
  5. withIdleness настроен (1-5 минут типично) — иначе watermarks “застревают”.
  6. forBoundedOutOfOrderness с разумным интервалом (5-30 секунд для большинства cases).
  7. Deserializer — без NPE на malformed messages. Лучше — schema-aware (Avro, Protobuf).
  8. partition.discovery.interval.ms — 30-60 секунд, если ожидаете добавление партиций.
  9. SSL/SASL конфиги — если Kafka secured (модуль 16).

Попробуй сам

Расширьте WordCount из урока 00.3:

  1. Замените SQL-based WordCount на Java DataStream API с KafkaSource. Используйте код из примера выше как стартовую точку.
  2. Добавьте watermark strategy с forBoundedOutOfOrderness(5s) и withIdleness(1m).
  3. Запустите. Проверьте Web UI — Source оператор должен показывать watermark в Watermarks tab. Какое значение?
  4. Отправьте 100 событий с фиксированным timestamp. Что показывает watermark? Что если отправите событие с timestamp на 10 секунд раньше предыдущего?
  5. Bonus: создайте топик с 6 партициями, но отправляйте события только в 2 из них. Без withIdleness — что произойдёт с watermark? С withIdleness — что?
Проверка знанийKnowledge check
KafkaSource читает топик с 8 партициями, parallelism=4. Какие splits получит каждый subtask, и что произойдёт при rescaling до parallelism=2?
ОтветAnswer
Изначально SplitEnumerator распределит 8 партиций между 4 subtasks: каждый получит 2 партиции. При rescaling до parallelism=2 происходит restart job с savepoint. Enumerator перераспределит splits: каждый из 2 subtasks получит 4 партиции. State savepoint содержит per-split offsets, поэтому после restart каждая партиция возобновляется с правильной позиции. Если bы Source V2 не разделял enumerator и reader (как старый SourceFunction), rescaling был бы намного сложнее. Это одна из главных причин, почему Source V2 пришёл на замену старому API: built-in поддержка rescaling через splits.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Архитектура Source V2 разделена на три компонента. Где работает SplitEnumerator?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5