Learning Platform
Глоссарий Troubleshooting
Урок 04.03 · 18 мин
Средний
Sink V2KafkaSinkFileSinkDelivery Guarantees

Sink V2: FileSink, KafkaSink

Sink — точка выхода данных из Flink. В Flink 1.x был SinkFunction; в 2.x он удалён. Единственный путь — Sink V2 API. Этот урок про практическое использование Sink V2 на двух самых частых примерах: KafkaSink и FileSink.

К концу урока вы будете уметь правильно настраивать sinks с нужным delivery guarantee, понимать, что такое two-phase commit под капотом, и избегать типичных ошибок.


Архитектура Sink V2

Sink V2 разделён на несколько компонентов, что позволяет реализовать сложную семантику доставки.

Sink V2: компоненты
Sink (factory)Top-level interface. Создаёт SinkWriter'ы и опционально Committer'ы. Это то, что пользователь видит в API: builder pattern, конфигурация.

SinkWriter (per-subtask, write data)

SinkWriter (per-subtask): получает события из upstream, буферизирует и пишет во внешнюю систему. Для transactional sinks — открывает transaction, накапливает записи.
(только для exactly-once)

Committer (commit на checkpoint)

Committer: финализирует transactions при checkpoint. Для exactly-once sinks: SinkWriter создаёт committable, Committer делает commit при successful checkpoint. Two-phase commit.

Для at-least-once sinks (например, basic file write) Committer не нужен — SinkWriter просто пишет.

Для exactly-once sinks Committer необходим — он завершает transactions только когда checkpoint успешен. Это two-phase commit.


Delivery guarantees

Sink имеет три возможных уровня гарантии:

Delivery guarantees: trade-offs
NONE (at-most-once)Fire-and-forget. Sink пишет события асинхронно, не ждёт ACK. Самая высокая производительность, но при failure события теряются. Используется редко — только когда потери допустимы (например, debug logs).
AT_LEAST_ONCESink гарантирует, что каждое событие записано хотя бы один раз. При failure — возможны дубликаты (записали, не подтвердили, restart, записали повторно). Most common выбор для non-critical pipelines.
EXACTLY_ONCESink гарантирует ровно одну запись каждого события. Реализуется через two-phase commit с Committer'ом или idempotent writes. Самая дорогая семантика, но necessary для critical data.

Какой выбирать:

  • EXACTLY_ONCE: financial data, billing events, critical CDC. Дороже (transactions), но абсолютно корректно.
  • AT_LEAST_ONCE: большинство дашбордов, метрики, обогащение данных. Дубликаты приемлемы (особенно если downstream идемпотентен).
  • NONE: только debug или fire-and-forget logging.

Уровень настраивается на каждом sink через setDeliveryGuarantee(...).


KafkaSink: production sink номер 1

Most common sink — KafkaSink. Пишет в Kafka topic, поддерживает все три уровня доставки.

Базовый пример (at-least-once)

import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.<String>builder()
            .setTopic("output-events")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

stream.sinkTo(sink).name("Kafka Sink");

Это минимальная конфигурация. Каждое событие сериализуется в String, отправляется в output-events топик.

С ключом и кастомным partitioner

KafkaSink<Transaction> sink = KafkaSink.<Transaction>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.<Transaction>builder()
            .setTopic("processed-txns")
            // Ключ — userId (для partitioning по user)
            .setKeySerializationSchema(txn -> txn.getUserId().getBytes(StandardCharsets.UTF_8))
            // Value — JSON
            .setValueSerializationSchema(txn -> objectMapper.writeValueAsBytes(txn))
            .setPartitioner(new FlinkFixedPartitioner<>())  // round-robin или fixed
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    .build();

Ключ важен: Kafka партиционирует записи по hash(key) % partitions. Если key = userId, все события одного пользователя в одну партицию = гарантия order.

Exactly-once с transactional producer

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(
        KafkaRecordSerializationSchema.<String>builder()
            .setTopic("critical-output")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("my-flink-job")  // обязательно для EXACTLY_ONCE
    .build();

Что происходит под капотом:

  1. SinkWriter каждой subtask открывает Kafka transaction.
  2. На каждое событие — producer.send() внутри transaction (не committed).
  3. При checkpoint — committable (transaction ID) сохраняется в Flink state.
  4. После checkpoint complete — Committer выполняет producer.commitTransaction().
  5. Если failover между шагами 3 и 4 — после restart Flink восстанавливает transactions и commits/aborts на основе сохранённого state.

Read isolation: consumers, читающие из Kafka, должны быть настроены на isolation.level=read_committed, иначе они увидят uncommitted transactions.

Kafka transactions: как работает two-phase commit
WARNING

EXACTLY_ONCE с Kafka требует transaction.timeout.ms на брокере >= ваш checkpoint interval + ваш max checkpoint duration + transactional overhead. Если transaction.timeout слишком короткий — transactions abort’ятся, exactly-once нарушается. Production setup: transaction.timeout.ms = 15min для checkpoint interval 1 min.


FileSink: запись в файлы

FileSink пишет в файловую систему (local, S3, GCS, Azure Blob, HDFS) с rolling — новые файлы создаются периодически.

Базовый пример

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

FileSink<String> sink = FileSink
    .<String>forRowFormat(
        new Path("s3://my-bucket/events/"),
        new SimpleStringEncoder<>("UTF-8")
    )
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd/HH"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15))
            .withInactivityInterval(Duration.ofMinutes(5))
            .withMaxPartSize(MemorySize.ofMebiBytes(128))
            .build()
    )
    .withOutputFileConfig(
        OutputFileConfig.builder()
            .withPartPrefix("part")
            .withPartSuffix(".log")
            .build()
    )
    .build();

stream.sinkTo(sink).name("S3 File Sink");

Что это делает:

  • Записывает в s3://my-bucket/events/2026-05-19/14/part-0-0.log (например).
  • BucketAssigner разбивает файлы по datetime (“yyyy-MM-dd/HH” — папка час).
  • RollingPolicy ротирует файлы: каждые 15 минут, или после 5 минут idle, или после 128 MB.
  • Two-phase commit под капотом: файл сначала пишется как .inprogress.XXX, при checkpoint — .pending.XXX, после checkpoint complete — final name. Это даёт exactly-once.

Bulk-encoded форматы (Parquet, ORC, Avro)

Для analytics workloads чаще используется bulk format — Parquet:

FileSink<Transaction> sink = FileSink
    .forBulkFormat(
        new Path("s3://my-bucket/transactions-parquet/"),
        ParquetAvroWriters.forReflectRecord(Transaction.class)
    )
    .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd"))
    .withRollingPolicy(OnCheckpointRollingPolicy.build())  // обязательно для bulk
    .build();

Особенности bulk format:

  • Rolling policy обязательно OnCheckpointRollingPolicy — файлы закрываются только при checkpoint (bulk форматам нужно атомарно закрыться).
  • Часто используется для streaming ingestion в lakehouse (Iceberg, Delta, Hudi).

Sink в Iceberg / Delta / Hudi

Для lakehouse architectures есть отдельные connectors (отдельные JAR-ы):

// Iceberg sink (упрощённо)
FlinkSink.forRowData(stream)
    .table(icebergTable)
    .tableLoader(tableLoader)
    .writeParallelism(4)
    .build();

Это покрывается deeply в модуле 13 (Connectors в production). Сейчас важно знать: для lakehouse есть готовые sinks.


JdbcSink: запись в реляционную БД

JdbcSink (с Sink V2 API) для JDBC sources:

import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;

JdbcSink<Transaction> sink = JdbcSink.<Transaction>builder()
    .withQueryStatement(
        "INSERT INTO transactions (id, user_id, amount, ts) VALUES (?, ?, ?, ?) " +
        "ON CONFLICT (id) DO UPDATE SET amount = EXCLUDED.amount",
        (PreparedStatement ps, Transaction txn) -> {
            ps.setString(1, txn.getId());
            ps.setString(2, txn.getUserId());
            ps.setBigDecimal(3, txn.getAmount());
            ps.setTimestamp(4, Timestamp.from(Instant.ofEpochMilli(txn.getTimestamp())));
        }
    )
    .withExecutionOptions(JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .withMaxRetries(3)
        .build())
    .withConnectionOptions(...)
    .build();

Idempotency: используйте ON CONFLICT DO UPDATE (Postgres) или INSERT ... ON DUPLICATE KEY UPDATE (MySQL) — это даёт идемпотентность, что в сочетании с at-least-once delivery даёт exactly-once на уровне финального state в БД.

ExactlyOnceJdbcSink существует для XA transactions, но XA сложнее и медленнее — обычно идемпотентность через upsert проще и эффективнее.


Самый простой sink — .print():

stream.print();  // печатает в TaskManager logs (stdout)

Полезно для отладки. НИКОГДА не используйте в production — print blocks pipeline (sync IO в stdout), и логи перегружаются.

Variant: .printToErr() — то же, но stderr.


Custom sink: когда готового нет

Если ни один готовый sink не подходит, можно написать свой через Sink V2 API. Это не простой переход с SinkFunction (legacy) — Sink V2 имеет более сложный contract.

Минимальный custom sink (at-least-once):

public class HttpSink<T> implements Sink<T> {
    @Override
    public SinkWriter<T> createWriter(InitContext context) {
        return new HttpSinkWriter<>(httpClient, endpoint);
    }
}

class HttpSinkWriter<T> implements SinkWriter<T> {
    @Override
    public void write(T element, Context context) throws IOException {
        httpClient.post(endpoint, serialize(element));
    }
    
    @Override
    public void flush(boolean endOfInput) {
        // ничего не буферизировано
    }
    
    @Override
    public void close() {
        // cleanup
    }
}

Для exactly-once нужно реализовать также Committer interface — это нетривиально. Большинство production случаев решается готовыми sinks.

TIP

Прежде чем писать custom sink, проверьте Flink ecosystem — для большинства известных систем уже есть готовые sinks (Elasticsearch, MongoDB, Cassandra, Redis, RabbitMQ, и многие другие).


Sink production checklist

Для KafkaSink:

  1. DeliveryGuarantee правильный для use case (EXACTLY_ONCE для критичных).
  2. transactional.id prefix уникальный для job (если EXACTLY_ONCE).
  3. transaction.timeout.ms на Kafka брокере >= checkpoint interval + buffer.
  4. Topic ACL — Flink user имеет права write + transactional (если EXACTLY_ONCE).
  5. Partitioner правильный — для ordered events используйте key-based partitioning.

Для FileSink:

  1. BucketAssigner даёт корректную партиционность (дата/час, не один большой bucket).
  2. RollingPolicy настроена — не получится 1 GB файлов или 1 KB файлов (оба плохо).
  3. OnCheckpointRollingPolicy для bulk форматов (Parquet/ORC).
  4. S3 / GCS endpoint правильный, IAM permissions есть.
  5. In-progress / pending файлы не валяются после job stop — это либо bug, либо нужен cleanup в Operator.

Попробуй сам

Расширьте WordCount pipeline:

  1. Добавьте KafkaSink с AT_LEAST_ONCE. Подтвердите, что записи появляются в Kafka топике.
  2. Добавьте FileSink в local file (например, file:///tmp/wordcount-output). Проверьте файлы — какие имена? Когда они rolling’уются?
  3. Поменяйте на EXACTLY_ONCE. Что нужно настроить? Что произошло с performance?
  4. Симулируйте failure: убейте TaskManager во время работы. После recovery — есть ли дубликаты в Kafka? В файлах?
  5. Bonus: настройте isolation.level=read_committed на kafka-console-consumer и сравните с default read_uncommitted — что увидите при EXACTLY_ONCE sink?
Проверка знанийKnowledge check
KafkaSink с EXACTLY_ONCE не commitит transactions, и через 15 минут Flink логи показывают TransactionTimeoutException. Что вероятная причина и как исправить?
ОтветAnswer
Причина: 'transaction.timeout.ms' на Kafka брокере слишком короткий относительно интервала между Flink checkpoints (или checkpoint duration). Транзакция открывается в начале checkpoint period (когда sink начинает писать в transaction), и должна быть committed после успешного checkpoint. Если checkpoint занимает 5 минут + checkpoint interval 10 минут, в худшем случае транзакция открыта 15 минут. Если 'transaction.timeout.ms = 10min' — будет timeout. Решение: (1) На Kafka брокере увеличить 'transaction.max.timeout.ms' (default 15min, можно поставить 1hour для критичных систем). (2) В KafkaSink (или общих properties) установить 'transaction.timeout.ms' >= checkpoint interval + max checkpoint duration + safety margin (например, 30min для checkpoint interval 1min с current sizing). (3) Параллельно — уменьшить checkpoint duration через настройку state backend (incremental checkpoints в RocksDB, ForSt). Если timeout уже произошёл — данные транзакции abort'нуты, при restart с предыдущего checkpoint они будут переотправлены — exactly-once сохранён, но job постоянно failover'ится.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём смысл Committer компонента в Sink V2?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5