Sink V2: FileSink, KafkaSink
Sink — точка выхода данных из Flink. В Flink 1.x был SinkFunction; в 2.x он удалён. Единственный путь — Sink V2 API. Этот урок про практическое использование Sink V2 на двух самых частых примерах: KafkaSink и FileSink.
К концу урока вы будете уметь правильно настраивать sinks с нужным delivery guarantee, понимать, что такое two-phase commit под капотом, и избегать типичных ошибок.
Архитектура Sink V2
Sink V2 разделён на несколько компонентов, что позволяет реализовать сложную семантику доставки.
SinkWriter (per-subtask, write data)
SinkWriter (per-subtask): получает события из upstream, буферизирует и пишет во внешнюю систему. Для transactional sinks — открывает transaction, накапливает записи.Committer (commit на checkpoint)
Committer: финализирует transactions при checkpoint. Для exactly-once sinks: SinkWriter создаёт committable, Committer делает commit при successful checkpoint. Two-phase commit.Для at-least-once sinks (например, basic file write) Committer не нужен — SinkWriter просто пишет.
Для exactly-once sinks Committer необходим — он завершает transactions только когда checkpoint успешен. Это two-phase commit.
Delivery guarantees
Sink имеет три возможных уровня гарантии:
Какой выбирать:
- EXACTLY_ONCE: financial data, billing events, critical CDC. Дороже (transactions), но абсолютно корректно.
- AT_LEAST_ONCE: большинство дашбордов, метрики, обогащение данных. Дубликаты приемлемы (особенно если downstream идемпотентен).
- NONE: только debug или fire-and-forget logging.
Уровень настраивается на каждом sink через setDeliveryGuarantee(...).
KafkaSink: production sink номер 1
Most common sink — KafkaSink. Пишет в Kafka topic, поддерживает все три уровня доставки.
Базовый пример (at-least-once)
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("output-events")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink).name("Kafka Sink");
Это минимальная конфигурация. Каждое событие сериализуется в String, отправляется в output-events топик.
С ключом и кастомным partitioner
KafkaSink<Transaction> sink = KafkaSink.<Transaction>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<Transaction>builder()
.setTopic("processed-txns")
// Ключ — userId (для partitioning по user)
.setKeySerializationSchema(txn -> txn.getUserId().getBytes(StandardCharsets.UTF_8))
// Value — JSON
.setValueSerializationSchema(txn -> objectMapper.writeValueAsBytes(txn))
.setPartitioner(new FlinkFixedPartitioner<>()) // round-robin или fixed
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
Ключ важен: Kafka партиционирует записи по hash(key) % partitions. Если key = userId, все события одного пользователя в одну партицию = гарантия order.
Exactly-once с transactional producer
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setTopic("critical-output")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-flink-job") // обязательно для EXACTLY_ONCE
.build();
Что происходит под капотом:
- SinkWriter каждой subtask открывает Kafka transaction.
- На каждое событие —
producer.send()внутри transaction (не committed). - При checkpoint — committable (transaction ID) сохраняется в Flink state.
- После checkpoint complete — Committer выполняет
producer.commitTransaction(). - Если failover между шагами 3 и 4 — после restart Flink восстанавливает transactions и commits/aborts на основе сохранённого state.
Read isolation: consumers, читающие из Kafka, должны быть настроены на isolation.level=read_committed, иначе они увидят uncommitted transactions.
EXACTLY_ONCE с Kafka требует transaction.timeout.ms на брокере >= ваш checkpoint interval + ваш max checkpoint duration + transactional overhead. Если transaction.timeout слишком короткий — transactions abort’ятся, exactly-once нарушается. Production setup: transaction.timeout.ms = 15min для checkpoint interval 1 min.
FileSink: запись в файлы
FileSink пишет в файловую систему (local, S3, GCS, Azure Blob, HDFS) с rolling — новые файлы создаются периодически.
Базовый пример
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
FileSink<String> sink = FileSink
.<String>forRowFormat(
new Path("s3://my-bucket/events/"),
new SimpleStringEncoder<>("UTF-8")
)
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15))
.withInactivityInterval(Duration.ofMinutes(5))
.withMaxPartSize(MemorySize.ofMebiBytes(128))
.build()
)
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("part")
.withPartSuffix(".log")
.build()
)
.build();
stream.sinkTo(sink).name("S3 File Sink");
Что это делает:
- Записывает в
s3://my-bucket/events/2026-05-19/14/part-0-0.log(например). - BucketAssigner разбивает файлы по datetime (“yyyy-MM-dd/HH” — папка час).
- RollingPolicy ротирует файлы: каждые 15 минут, или после 5 минут idle, или после 128 MB.
- Two-phase commit под капотом: файл сначала пишется как
.inprogress.XXX, при checkpoint —.pending.XXX, после checkpoint complete — final name. Это даёт exactly-once.
Bulk-encoded форматы (Parquet, ORC, Avro)
Для analytics workloads чаще используется bulk format — Parquet:
FileSink<Transaction> sink = FileSink
.forBulkFormat(
new Path("s3://my-bucket/transactions-parquet/"),
ParquetAvroWriters.forReflectRecord(Transaction.class)
)
.withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
.withRollingPolicy(OnCheckpointRollingPolicy.build()) // обязательно для bulk
.build();
Особенности bulk format:
- Rolling policy обязательно
OnCheckpointRollingPolicy— файлы закрываются только при checkpoint (bulk форматам нужно атомарно закрыться). - Часто используется для streaming ingestion в lakehouse (Iceberg, Delta, Hudi).
Sink в Iceberg / Delta / Hudi
Для lakehouse architectures есть отдельные connectors (отдельные JAR-ы):
// Iceberg sink (упрощённо)
FlinkSink.forRowData(stream)
.table(icebergTable)
.tableLoader(tableLoader)
.writeParallelism(4)
.build();
Это покрывается deeply в модуле 13 (Connectors в production). Сейчас важно знать: для lakehouse есть готовые sinks.
JdbcSink: запись в реляционную БД
JdbcSink (с Sink V2 API) для JDBC sources:
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
JdbcSink<Transaction> sink = JdbcSink.<Transaction>builder()
.withQueryStatement(
"INSERT INTO transactions (id, user_id, amount, ts) VALUES (?, ?, ?, ?) " +
"ON CONFLICT (id) DO UPDATE SET amount = EXCLUDED.amount",
(PreparedStatement ps, Transaction txn) -> {
ps.setString(1, txn.getId());
ps.setString(2, txn.getUserId());
ps.setBigDecimal(3, txn.getAmount());
ps.setTimestamp(4, Timestamp.from(Instant.ofEpochMilli(txn.getTimestamp())));
}
)
.withExecutionOptions(JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(3)
.build())
.withConnectionOptions(...)
.build();
Idempotency: используйте ON CONFLICT DO UPDATE (Postgres) или INSERT ... ON DUPLICATE KEY UPDATE (MySQL) — это даёт идемпотентность, что в сочетании с at-least-once delivery даёт exactly-once на уровне финального state в БД.
ExactlyOnceJdbcSink существует для XA transactions, но XA сложнее и медленнее — обычно идемпотентность через upsert проще и эффективнее.
Print sink: для отладки
Самый простой sink — .print():
stream.print(); // печатает в TaskManager logs (stdout)
Полезно для отладки. НИКОГДА не используйте в production — print blocks pipeline (sync IO в stdout), и логи перегружаются.
Variant: .printToErr() — то же, но stderr.
Custom sink: когда готового нет
Если ни один готовый sink не подходит, можно написать свой через Sink V2 API. Это не простой переход с SinkFunction (legacy) — Sink V2 имеет более сложный contract.
Минимальный custom sink (at-least-once):
public class HttpSink<T> implements Sink<T> {
@Override
public SinkWriter<T> createWriter(InitContext context) {
return new HttpSinkWriter<>(httpClient, endpoint);
}
}
class HttpSinkWriter<T> implements SinkWriter<T> {
@Override
public void write(T element, Context context) throws IOException {
httpClient.post(endpoint, serialize(element));
}
@Override
public void flush(boolean endOfInput) {
// ничего не буферизировано
}
@Override
public void close() {
// cleanup
}
}
Для exactly-once нужно реализовать также Committer interface — это нетривиально. Большинство production случаев решается готовыми sinks.
Прежде чем писать custom sink, проверьте Flink ecosystem — для большинства известных систем уже есть готовые sinks (Elasticsearch, MongoDB, Cassandra, Redis, RabbitMQ, и многие другие).
Sink production checklist
Для KafkaSink:
- DeliveryGuarantee правильный для use case (EXACTLY_ONCE для критичных).
- transactional.id prefix уникальный для job (если EXACTLY_ONCE).
- transaction.timeout.ms на Kafka брокере >= checkpoint interval + buffer.
- Topic ACL — Flink user имеет права write + transactional (если EXACTLY_ONCE).
- Partitioner правильный — для ordered events используйте key-based partitioning.
Для FileSink:
- BucketAssigner даёт корректную партиционность (дата/час, не один большой bucket).
- RollingPolicy настроена — не получится 1 GB файлов или 1 KB файлов (оба плохо).
- OnCheckpointRollingPolicy для bulk форматов (Parquet/ORC).
- S3 / GCS endpoint правильный, IAM permissions есть.
- In-progress / pending файлы не валяются после job stop — это либо bug, либо нужен cleanup в Operator.
Попробуй сам
Расширьте WordCount pipeline:
- Добавьте KafkaSink с AT_LEAST_ONCE. Подтвердите, что записи появляются в Kafka топике.
- Добавьте FileSink в local file (например,
file:///tmp/wordcount-output). Проверьте файлы — какие имена? Когда они rolling’уются? - Поменяйте на EXACTLY_ONCE. Что нужно настроить? Что произошло с performance?
- Симулируйте failure: убейте TaskManager во время работы. После recovery — есть ли дубликаты в Kafka? В файлах?
- Bonus: настройте
isolation.level=read_committedна kafka-console-consumer и сравните с defaultread_uncommitted— что увидите при EXACTLY_ONCE sink?