Learning Platform
Глоссарий Troubleshooting
Урок 14.01 · 22 мин
Средний
KafkaKafkaSourceKafkaSinkOffsetsExactly-onceConsumer Group

Kafka source и sink

Kafka — наиболее популярный источник и sink для Flink jobs. Это не случайность: Kafka даёт persistent log с replay-ability, что прекрасно ложится на checkpoint-механизм Flink. Большинство production stream-processing-стэков выглядят как “Kafka -> Flink -> Kafka”: один топик ingest-данных, Flink делает stateful processing, другой топик — обогащённый результат.

В этом уроке разберём production-параметры обоих API: KafkaSource (modern unified API) в DataStream и 'connector' = 'kafka' в SQL. Поговорим про offsets, group.id, isolation level и подводные камни.


KafkaSource в DataStream

KafkaSource (Flink 1.14+) — это unified-source API, заменивший устаревший FlinkKafkaConsumer. Если вы видите в кодовой базе FlinkKafkaConsumer — это legacy, надо мигрировать.

import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.common.IsolationLevel;

KafkaSource<Click> source = KafkaSource.<Click>builder()
    .setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
    .setTopics("clicks")
    .setGroupId("flink-clicks-consumer")
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    .setValueOnlyDeserializer(new ClickDeserializer())
    .setProperty("isolation.level", "read_committed")
    .setProperty("partition.discovery.interval.ms", "30000")
    .build();

DataStream<Click> clicks = env.fromSource(
    source,
    WatermarkStrategy.<Click>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((c, t) -> c.eventTime().toEpochMilli()),
    "clicks-kafka"
);

Разберём ключевые параметры.


Bootstrap servers и group.id

bootstrap.servers — стандартный Kafka client property. Указывайте минимум 3 broker для отказоустойчивости. Список используется только для discovery — реальные подключения к leader-партициям делаются после metadata request.

group.id — consumer group ID. Влияет на:

  • Committed offsets — Flink commit-ит offsets в Kafka только для мониторинга (lag в Kafka tools). Реальное восстановление позиции делается из Flink checkpoint, не из Kafka group offsets.
  • Mutually exclusive consumers — если другой Flink job или внешний consumer использует тот же group.id, Kafka попытается распределить партиции между ними (что почти всегда баг для Flink).
WARNING

Никогда не делите group.id между двумя независимыми Flink jobs. Flink не использует Kafka consumer group для координации (он сам управляет partition assignment), но Kafka может вмешаться, если видит конфликтующих consumers. Используйте уникальный group.id на каждый job (flink-<job-name>-<env>). Это не теряет offsets — позиция восстанавливается из checkpoint.


OffsetsInitializer

OffsetsInitializer определяет, с какого offset начать, если у job нет checkpoint (cold start). Варианты:

  • OffsetsInitializer.earliest() — с самого начала топика. Хорошо для full-replay, плохо для топиков с многомесячным retention (job полтора месяца догоняет).
  • OffsetsInitializer.latest() — только новые события. Хорошо для real-time job, где исторические данные неинтересны.
  • OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST) — если есть commit’ы под этим group.id — оттуда; иначе fallback на EARLIEST. Самый production-friendly default.
  • OffsetsInitializer.timestamp(epochMs) — с конкретного timestamp. Полезно для replay-back-fill.
  • OffsetsInitializer.offsets(Map<TopicPartition, Long>) — точное задание per-partition.

При наличии Flink checkpoint OffsetsInitializer игнорируется — позиция берётся из checkpoint. Initializer работает только на cold-start (первый запуск, deleted checkpoint).


Partition discovery

Topics в production иногда расширяются: новые партиции добавляются для горизонтального scaling. По умолчанию Flink не видит новых партиций — он зафиксировал список при старте.

partition.discovery.interval.ms (по умолчанию -1, выключено) — интервал, через который Flink перечитает метаданные топика. Рекомендуется выставлять (30000 — 5 минут), особенно если динамическое scaling топиков — часть операций.

.setProperty("partition.discovery.interval.ms", "30000")  // 30 секунд

При обнаружении новой партиции Flink rebalance’ит assignment между source-subtasks. Это не триггерит full restart — assignment меняется на ходу.


Isolation level

Kafka поддерживает транзакционные сообщения (atomic write across partitions). Consumer может выбрать:

Kafka транзакции: как работает transactional producer
  • read_uncommitted (default) — видит все сообщения, включая из abort’нутых транзакций.
  • read_committed — видит только committed сообщения, игнорирует abort.
.setProperty("isolation.level", "read_committed")

Для production exactly-once pipeline это обязательно. Если upstream producer пишет транзакционно (например, другой Flink job с EXACTLY_ONCE sink), Flink consumer должен читать read_committed, иначе он увидит дубликаты от aborted transactions.


KafkaSink

KafkaSink — современный sink-API (Flink 1.14+, заменяет FlinkKafkaProducer):

import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.base.DeliveryGuarantee;

KafkaSink<EnrichedOrder> sink = KafkaSink.<EnrichedOrder>builder()
    .setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.<EnrichedOrder>builder()
            .setTopic("enriched-orders")
            .setKeySerializationSchema(e -> e.orderId().toString().getBytes())
            .setValueSerializationSchema(new EnrichedOrderSerializer())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-enricher-job")
    .setProperty("transaction.timeout.ms", "900000")  // 15 минут
    .build();

stream.sinkTo(sink);

Главные параметры:

  • DeliveryGuarantee:

    • NONE — fire-and-forget, нет гарантий.
    • AT_LEAST_ONCE — гарантирует не-потерю, могут быть дубликаты после failure.
    • EXACTLY_ONCE — транзакционная запись, синхронизированная с Flink checkpoint.
  • transactionalIdPrefix (только для EXACTLY_ONCE) — Flink использует Kafka transactions. Каждая subtask требует уникального transactional.id. Prefix должен быть уникальным на каждый Flink job, иначе разные job’ы будут конфликтовать за transactions.

  • transaction.timeout.ms — должен быть больше, чем максимальное время между checkpoints + время восстановления. Если Flink timer’ы не успели pre-commit за timeout — Kafka abort’нет транзакцию, потеряется exactly-once. Дефолт 1 час; для checkpoint-интервалов > 30 мин надо увеличивать.


Архитектура exactly-once

KafkaSink с EXACTLY_ONCE: two-phase commit

Flink Sink

Flink job обрабатывает события и пишет в Kafka в рамках открытой транзакции. Сообщения в Kafka, но isolation_level=read_uncommitted скрывает их от consumers.
produce in tx N

Kafka (uncommitted)

Kafka хранит сообщения с tx-marker (uncommitted). Consumers с read_committed не видят их пока tx не закоммичена.

Checkpoint Coordinator

Flink триггерит checkpoint. Sink делает pre-commit Kafka транзакции (flush, ready to commit, но не committed еще).
checkpoint barrier

Sink pre-commit

Sink делает Kafka pre-commit: данные на диске в Kafka, но транзакция в состоянии PREPARED. Если crash сейчас — recovery commit-нет транзакцию.

Checkpoint complete

Checkpoint завершён (saved в durable storage). Job manager сообщает sinks о completion.
notify checkpoint complete

Sink Kafka.commit()

Sink делает Kafka commitTransaction. Теперь read_committed consumers видят данные. Атомарно с Flink checkpoint.

SQL DDL для Kafka

-- Source
CREATE TABLE clicks (
  user_id STRING,
  url STRING,
  event_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
  'properties.group.id' = 'flink-clicks',
  'properties.isolation.level' = 'read_committed',
  'scan.startup.mode' = 'group-offsets',
  'scan.topic-partition-discovery.interval' = '30 s',
  'format' = 'avro-confluent',
  'avro-confluent.schema-registry.url' = 'http://schema-registry:8081'
);

-- Sink (append-only)
CREATE TABLE enriched_orders (
  order_id BIGINT,
  customer_name STRING,
  amount_usd DECIMAL(10, 2),
  enriched_at TIMESTAMP_LTZ(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'enriched-orders',
  'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
  'sink.delivery-guarantee' = 'exactly-once',
  'sink.transactional-id-prefix' = 'flink-enricher',
  'properties.transaction.timeout.ms' = '900000',
  'format' = 'avro-confluent',
  'avro-confluent.schema-registry.url' = 'http://schema-registry:8081'
);

-- Upsert Kafka sink (для retract changelog с PK)
CREATE TABLE user_metrics (
  user_id STRING,
  click_count BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'user-metrics',
  'properties.bootstrap.servers' = 'kafka-1:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);
TIP

Различайте обычный kafka connector и upsert-kafka. Первый — append-only sink, не принимает retract/upsert changelog. Второй — конвертирует upsert-changelog в Kafka сообщения: insert/update = новое сообщение с тем же ключом (Kafka log compaction почистит старые версии), delete = tombstone (null value). Для агрегаций с GROUP BY PK всегда нужен upsert-kafka.


Common pitfalls

Несовпадение subtask и партиций. Если у топика 4 партиции, а source parallelism = 8 — половина subtasks idle. И наоборот: 4 subtasks на 16 партиций — каждый subtask читает 4 партиции, повышенная нагрузка. Идеально: subtasks = partitions или partitions кратно subtasks.

Watermarks с idle partitions. Если одна партиция не получает events какое-то время, watermark не продвигается. Решение: WatermarkStrategy.withIdleness(Duration.ofMinutes(1)) — после 1 минуты idle партиция помечается как inactive и не блокирует watermark.

Producer transaction.id collisions. Если два Flink jobs используют один transactionalIdPrefix (например, копипаст в конфиге) — они будут zombie-fenced друг друга. Дайте уникальный префикс на каждый job.


Попробуй сам

  1. Настрой KafkaSource на топик с 16 партициями и source parallelism = 4. Сколько партиций будет назначено каждой subtask?
  2. Запусти 2 Flink job на один топик с разными group.id. Они будут конкурировать или читать независимо?
  3. Симулируй failure после pre-commit Kafka транзакции: убей TaskManager в момент checkpoint. Какие операции произойдут при recovery?
Проверка знанийKnowledge check
Команда задеплоила Flink job с KafkaSink, DeliveryGuarantee.EXACTLY_ONCE и transactionalIdPrefix='flink-job'. Job работает с checkpoint interval=15min. Через час downstream consumer с isolation.level=read_committed начинает получать batches каждые 15 минут, но иногда они пропадают полностью и появляются дубли. Что может быть не так?
ОтветAnswer
Самая вероятная причина — transaction.timeout.ms (по умолчанию в Kafka 60 секунд для transactional producer, и 15 минут для broker max — но Kafka broker имеет своё transactional.id.expiration.ms 7 дней). При checkpoint interval=15min Kafka транзакция остаётся открытой до 15 минут. Если producer не явно настроил transaction.timeout.ms больше 15 минут — Kafka brokerbort'нет транзакцию по таймауту (transaction.timeout.ms у producer должен быть > checkpoint interval + recovery overhead). Aborted транзакция = read_committed consumer ничего не видит за этот checkpoint (batch потерян). При следующем checkpoint данные могут продьюсериться заново (если Flink перезапустился и переиграл), что даст дубли. Fix: явно установить transaction.timeout.ms = 900000 (15 минут) или больше, и убедиться, что Kafka broker setting transaction.max.timeout.ms тоже это разрешает (обычно ставят 1 час). Также ещё одна возможная проблема — несовпадение transactionalIdPrefix между job restart'ами (если job re-deploy'ится с новым префиксом — старые транзакции не fence'нутся правильно).

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда использует один group.id для двух независимых Flink jobs, читающих из топика clicks. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4