Kafka source и sink
Kafka — наиболее популярный источник и sink для Flink jobs. Это не случайность: Kafka даёт persistent log с replay-ability, что прекрасно ложится на checkpoint-механизм Flink. Большинство production stream-processing-стэков выглядят как “Kafka -> Flink -> Kafka”: один топик ingest-данных, Flink делает stateful processing, другой топик — обогащённый результат.
В этом уроке разберём production-параметры обоих API: KafkaSource (modern unified API) в DataStream и 'connector' = 'kafka' в SQL. Поговорим про offsets, group.id, isolation level и подводные камни.
KafkaSource в DataStream
KafkaSource (Flink 1.14+) — это unified-source API, заменивший устаревший FlinkKafkaConsumer. Если вы видите в кодовой базе FlinkKafkaConsumer — это legacy, надо мигрировать.
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.common.IsolationLevel;
KafkaSource<Click> source = KafkaSource.<Click>builder()
.setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
.setTopics("clicks")
.setGroupId("flink-clicks-consumer")
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new ClickDeserializer())
.setProperty("isolation.level", "read_committed")
.setProperty("partition.discovery.interval.ms", "30000")
.build();
DataStream<Click> clicks = env.fromSource(
source,
WatermarkStrategy.<Click>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((c, t) -> c.eventTime().toEpochMilli()),
"clicks-kafka"
);
Разберём ключевые параметры.
Bootstrap servers и group.id
bootstrap.servers — стандартный Kafka client property. Указывайте минимум 3 broker для отказоустойчивости. Список используется только для discovery — реальные подключения к leader-партициям делаются после metadata request.
group.id — consumer group ID. Влияет на:
- Committed offsets — Flink commit-ит offsets в Kafka только для мониторинга (lag в Kafka tools). Реальное восстановление позиции делается из Flink checkpoint, не из Kafka group offsets.
- Mutually exclusive consumers — если другой Flink job или внешний consumer использует тот же group.id, Kafka попытается распределить партиции между ними (что почти всегда баг для Flink).
Никогда не делите group.id между двумя независимыми Flink jobs. Flink не использует Kafka consumer group для координации (он сам управляет partition assignment), но Kafka может вмешаться, если видит конфликтующих consumers. Используйте уникальный group.id на каждый job (flink-<job-name>-<env>). Это не теряет offsets — позиция восстанавливается из checkpoint.
OffsetsInitializer
OffsetsInitializer определяет, с какого offset начать, если у job нет checkpoint (cold start). Варианты:
OffsetsInitializer.earliest()— с самого начала топика. Хорошо для full-replay, плохо для топиков с многомесячным retention (job полтора месяца догоняет).OffsetsInitializer.latest()— только новые события. Хорошо для real-time job, где исторические данные неинтересны.OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)— если есть commit’ы под этим group.id — оттуда; иначе fallback на EARLIEST. Самый production-friendly default.OffsetsInitializer.timestamp(epochMs)— с конкретного timestamp. Полезно для replay-back-fill.OffsetsInitializer.offsets(Map<TopicPartition, Long>)— точное задание per-partition.
При наличии Flink checkpoint OffsetsInitializer игнорируется — позиция берётся из checkpoint. Initializer работает только на cold-start (первый запуск, deleted checkpoint).
Partition discovery
Topics в production иногда расширяются: новые партиции добавляются для горизонтального scaling. По умолчанию Flink не видит новых партиций — он зафиксировал список при старте.
partition.discovery.interval.ms (по умолчанию -1, выключено) — интервал, через который Flink перечитает метаданные топика. Рекомендуется выставлять (30000 — 5 минут), особенно если динамическое scaling топиков — часть операций.
.setProperty("partition.discovery.interval.ms", "30000") // 30 секунд
При обнаружении новой партиции Flink rebalance’ит assignment между source-subtasks. Это не триггерит full restart — assignment меняется на ходу.
Isolation level
Kafka поддерживает транзакционные сообщения (atomic write across partitions). Consumer может выбрать:
Kafka транзакции: как работает transactional producerread_uncommitted(default) — видит все сообщения, включая из abort’нутых транзакций.read_committed— видит только committed сообщения, игнорирует abort.
.setProperty("isolation.level", "read_committed")
Для production exactly-once pipeline это обязательно. Если upstream producer пишет транзакционно (например, другой Flink job с EXACTLY_ONCE sink), Flink consumer должен читать read_committed, иначе он увидит дубликаты от aborted transactions.
KafkaSink
KafkaSink — современный sink-API (Flink 1.14+, заменяет FlinkKafkaProducer):
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.base.DeliveryGuarantee;
KafkaSink<EnrichedOrder> sink = KafkaSink.<EnrichedOrder>builder()
.setBootstrapServers("kafka-1:9092,kafka-2:9092,kafka-3:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<EnrichedOrder>builder()
.setTopic("enriched-orders")
.setKeySerializationSchema(e -> e.orderId().toString().getBytes())
.setValueSerializationSchema(new EnrichedOrderSerializer())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-enricher-job")
.setProperty("transaction.timeout.ms", "900000") // 15 минут
.build();
stream.sinkTo(sink);
Главные параметры:
-
DeliveryGuarantee:
NONE— fire-and-forget, нет гарантий.AT_LEAST_ONCE— гарантирует не-потерю, могут быть дубликаты после failure.EXACTLY_ONCE— транзакционная запись, синхронизированная с Flink checkpoint.
-
transactionalIdPrefix (только для EXACTLY_ONCE) — Flink использует Kafka transactions. Каждая subtask требует уникального transactional.id. Prefix должен быть уникальным на каждый Flink job, иначе разные job’ы будут конфликтовать за transactions.
-
transaction.timeout.ms — должен быть больше, чем максимальное время между checkpoints + время восстановления. Если Flink timer’ы не успели pre-commit за timeout — Kafka abort’нет транзакцию, потеряется exactly-once. Дефолт 1 час; для checkpoint-интервалов > 30 мин надо увеличивать.
Архитектура exactly-once
Flink Sink
Flink job обрабатывает события и пишет в Kafka в рамках открытой транзакции. Сообщения в Kafka, но isolation_level=read_uncommitted скрывает их от consumers.Kafka (uncommitted)
Kafka хранит сообщения с tx-marker (uncommitted). Consumers с read_committed не видят их пока tx не закоммичена.Checkpoint Coordinator
Flink триггерит checkpoint. Sink делает pre-commit Kafka транзакции (flush, ready to commit, но не committed еще).Sink pre-commit
Sink делает Kafka pre-commit: данные на диске в Kafka, но транзакция в состоянии PREPARED. Если crash сейчас — recovery commit-нет транзакцию.Checkpoint complete
Checkpoint завершён (saved в durable storage). Job manager сообщает sinks о completion.Sink Kafka.commit()
Sink делает Kafka commitTransaction. Теперь read_committed consumers видят данные. Атомарно с Flink checkpoint.SQL DDL для Kafka
-- Source
CREATE TABLE clicks (
user_id STRING,
url STRING,
event_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
'properties.group.id' = 'flink-clicks',
'properties.isolation.level' = 'read_committed',
'scan.startup.mode' = 'group-offsets',
'scan.topic-partition-discovery.interval' = '30 s',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://schema-registry:8081'
);
-- Sink (append-only)
CREATE TABLE enriched_orders (
order_id BIGINT,
customer_name STRING,
amount_usd DECIMAL(10, 2),
enriched_at TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'enriched-orders',
'properties.bootstrap.servers' = 'kafka-1:9092,kafka-2:9092',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'flink-enricher',
'properties.transaction.timeout.ms' = '900000',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://schema-registry:8081'
);
-- Upsert Kafka sink (для retract changelog с PK)
CREATE TABLE user_metrics (
user_id STRING,
click_count BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user-metrics',
'properties.bootstrap.servers' = 'kafka-1:9092',
'key.format' = 'json',
'value.format' = 'json'
);
Различайте обычный kafka connector и upsert-kafka. Первый — append-only sink, не принимает retract/upsert changelog. Второй — конвертирует upsert-changelog в Kafka сообщения: insert/update = новое сообщение с тем же ключом (Kafka log compaction почистит старые версии), delete = tombstone (null value). Для агрегаций с GROUP BY PK всегда нужен upsert-kafka.
Common pitfalls
Несовпадение subtask и партиций. Если у топика 4 партиции, а source parallelism = 8 — половина subtasks idle. И наоборот: 4 subtasks на 16 партиций — каждый subtask читает 4 партиции, повышенная нагрузка. Идеально: subtasks = partitions или partitions кратно subtasks.
Watermarks с idle partitions. Если одна партиция не получает events какое-то время, watermark не продвигается. Решение: WatermarkStrategy.withIdleness(Duration.ofMinutes(1)) — после 1 минуты idle партиция помечается как inactive и не блокирует watermark.
Producer transaction.id collisions. Если два Flink jobs используют один transactionalIdPrefix (например, копипаст в конфиге) — они будут zombie-fenced друг друга. Дайте уникальный префикс на каждый job.
Попробуй сам
- Настрой KafkaSource на топик с 16 партициями и source parallelism = 4. Сколько партиций будет назначено каждой subtask?
- Запусти 2 Flink job на один топик с разными group.id. Они будут конкурировать или читать независимо?
- Симулируй failure после pre-commit Kafka транзакции: убей TaskManager в момент checkpoint. Какие операции произойдут при recovery?