Learning Platform
Глоссарий Troubleshooting
Урок 17.05 · 35 мин
Средний
CapstoneCDCPaimonKafkaAsync I/OEXACTLY_ONCEProduction

Capstone: MySQL CDC -> Flink enrichment -> Paimon + Kafka

Все 16 модулей курса сводятся к этому уроку. Реальный production-grade пайплайн, который вы могли бы развернуть завтра в боевом контуре. Цель не “продемонстрировать features”, а собрать всё вместе: source, state, exactly-once, async I/O, dual sinks, deploy на K8s через Operator.

Сценарий: финтех-компания. У них MySQL c таблицами orders и payments. Нужно: real-time enrichment событий через lookup в customers (другая MySQL, медленная), и dual sink — данные в Apache Paimon для аналитики (BI dashboards) И в Kafka для downstream notifications (SMS, email, fraud detection). End-to-end exactly-once обязателен (это финансы — дубликаты SMS-уведомлений недопустимы).

CDC фундаментальные концепции Apache Paimon internals: LSM-tree и streaming writes

Архитектура пайплайна

Capstone pipeline
MySQL: ordersOLTP БД заказов. Binlog включён. Flink CDC source через Debezium connector читает binlog в реальном времени.
MySQL: paymentsOLTP БД платежей. Тот же подход с binlog.
Flink CDC sourceMySqlSource - читает binlog обеих БД, эмитит CDC events (INSERT/UPDATE/DELETE) в виде RowData.
union
Join orders + paymentsProcessFunction keyed by order_id. Held state: pending orders без payment. Emits only when both arrive.
Async I/O enrichmentAsyncFunction для lookup customer profile через REST API customers-service. Avg 50ms latency, async чтобы не блокировать pipeline.
Side output splitОдин поток - в Paimon (analytics, raw enriched events). Другой - в Kafka (только высокоприоритетные алерты для downstream).
Paimon sinkLakehouse сторона: запись в Apache Paimon (Iceberg-подобный формат, оптимизирован для streaming writes с EXACTLY_ONCE через snapshot commits).
Kafka sinkNotifications: транзакционный Kafka producer с EXACTLY_ONCE. Downstream consumers с read_committed isolation.

Это не игрушка. Каждый блок применим к реальному production.


Используем Flink CDC connector — он обёртка над Debezium, читает MySQL binlog, эмитит CDC events.

import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;

MySqlSource<Order> ordersSource = MySqlSource.<Order>builder()
    .hostname("mysql-orders.internal")
    .port(3306)
    .databaseList("orders_db")
    .tableList("orders_db.orders")
    .username("flink_cdc_user")
    .password("...")
    .serverId("5400-5404")               // unique per parallel source instance
    .startupOptions(StartupOptions.initial())   // snapshot then binlog
    .deserializer(new OrderDeserializationSchema())
    .build();

MySqlSource<Payment> paymentsSource = MySqlSource.<Payment>builder()
    .hostname("mysql-payments.internal")
    .port(3306)
    .databaseList("payments_db")
    .tableList("payments_db.payments")
    .username("flink_cdc_user")
    .password("...")
    .serverId("5500-5504")               // отличный от orders
    .startupOptions(StartupOptions.initial())
    .deserializer(new PaymentDeserializationSchema())
    .build();

DataStream<Order> orders = env.fromSource(
    ordersSource, WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((o, ts) -> o.timestamp),
    "orders-cdc"
).uid("orders-cdc-source");

DataStream<Payment> payments = env.fromSource(
    paymentsSource, WatermarkStrategy.<Payment>forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((p, ts) -> p.timestamp),
    "payments-cdc"
).uid("payments-cdc-source");

Ключевые моменты:

  • serverId уникальный per Source instance. MySQL не разрешает несколько consumers с одним serverId на binlog. Диапазон 5400-5404 для parallelism 4-5.
  • StartupOptions.initial() — сначала snapshot существующих данных, потом binlog. Альтернатива latest() — только новые события с момента старта.
  • UID для source обязателен.
  • Watermark на event_time из MySQL row timestamp.

Шаг 2: Join orders + payments через keyed state

Бизнес-логика: order считается “обработанным”, когда есть и order, и payment. Joining через CoProcessFunction по order_id.

public class OrderPaymentJoin extends KeyedCoProcessFunction<Long, Order, Payment, EnrichedOrder> {

    private transient ValueState<Order> pendingOrder;
    private transient ValueState<Payment> pendingPayment;

    @Override
    public void open(Configuration parameters) {
        StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.hours(24))
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .build();

        ValueStateDescriptor<Order> orderDesc =
            new ValueStateDescriptor<>("pending-order", Order.class);
        orderDesc.enableTimeToLive(ttl);
        pendingOrder = getRuntimeContext().getState(orderDesc);

        ValueStateDescriptor<Payment> paymentDesc =
            new ValueStateDescriptor<>("pending-payment", Payment.class);
        paymentDesc.enableTimeToLive(ttl);
        pendingPayment = getRuntimeContext().getState(paymentDesc);
    }

    @Override
    public void processElement1(Order order, Context ctx, Collector<EnrichedOrder> out) throws Exception {
        Payment payment = pendingPayment.value();
        if (payment != null) {
            out.collect(new EnrichedOrder(order, payment));
            pendingPayment.clear();
        } else {
            pendingOrder.update(order);
        }
    }

    @Override
    public void processElement2(Payment payment, Context ctx, Collector<EnrichedOrder> out) throws Exception {
        Order order = pendingOrder.value();
        if (order != null) {
            out.collect(new EnrichedOrder(order, payment));
            pendingOrder.clear();
        } else {
            pendingPayment.update(payment);
        }
    }
}

DataStream<EnrichedOrder> joined = orders
    .keyBy(o -> o.id)
    .connect(payments.keyBy(p -> p.orderId))
    .process(new OrderPaymentJoin())
    .uid("order-payment-join")
    .name("OrderPaymentJoin");
  • StateTtlConfig: pending orders/payments удаляются через 24 часа (избегаем unbounded state, если payment никогда не придёт).
  • Two ValueStates — потому что событие может прийти первым с любой стороны.
  • state.clear() после match — освобождаем место.

Шаг 3: Async I/O enrichment через REST customers-service

Каждый EnrichedOrder нужно обогатить данными клиента. Customers-service - медленный REST API (avg 50ms). Sync lookup бы зарезал throughput. Async I/O решает.

import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;

public class CustomerEnrichment extends RichAsyncFunction<EnrichedOrder, FullOrder> {

    private transient HttpClient httpClient;

    @Override
    public void open(Configuration parameters) {
        httpClient = HttpClient.newBuilder()
            .executor(Executors.newFixedThreadPool(50))
            .build();
    }

    @Override
    public void asyncInvoke(EnrichedOrder order, ResultFuture<FullOrder> resultFuture) {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create("http://customers-service/v1/customer/" + order.customerId))
            .timeout(Duration.ofSeconds(2))
            .build();

        httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
            .thenApply(response -> parseCustomer(response.body()))
            .thenAccept(customer -> resultFuture.complete(
                Collections.singletonList(new FullOrder(order, customer))
            ))
            .exceptionally(ex -> {
                // Не блокируем pipeline, шлём с null customer
                resultFuture.complete(Collections.singletonList(
                    new FullOrder(order, null)
                ));
                return null;
            });
    }

    @Override
    public void timeout(EnrichedOrder order, ResultFuture<FullOrder> resultFuture) {
        // Default 3 sec timeout - emit с null customer
        resultFuture.complete(Collections.singletonList(new FullOrder(order, null)));
    }
}

DataStream<FullOrder> enriched = AsyncDataStream.orderedWait(
    joined,
    new CustomerEnrichment(),
    3, TimeUnit.SECONDS,    // timeout per request
    100                      // capacity (concurrent in-flight)
).uid("customer-enrichment").name("CustomerEnrichment");
  • orderedWait сохраняет порядок (важно для финансовых событий). unorderedWait быстрее, если порядок не нужен.
  • capacity: 100 — до 100 одновременных запросов per task. Throughput = 100 / 0.05s = 2000/sec per slot.
  • exceptionally + timeout — fallback, чтобы один медленный API не блокировал пайплайн.

Шаг 4: Split -> Paimon (всё) + Kafka (только алерты)

Раздваиваем поток через side output: основной поток в Paimon (все события для analytics), side output в Kafka (только high-priority alerts для notifications).

import org.apache.flink.util.OutputTag;

OutputTag<FullOrder> highPriorityTag = new OutputTag<FullOrder>("high-priority"){};

SingleOutputStreamOperator<FullOrder> split = enriched
    .process(new ProcessFunction<FullOrder, FullOrder>() {
        @Override
        public void processElement(FullOrder order, Context ctx, Collector<FullOrder> out) {
            out.collect(order);  // main stream -> Paimon
            if (order.amount > 10000 || isHighRiskUser(order.customer)) {
                ctx.output(highPriorityTag, order);  // side -> Kafka
            }
        }
    })
    .uid("paimon-kafka-split")
    .name("PaimonKafkaSplit");

DataStream<FullOrder> alerts = split.getSideOutput(highPriorityTag);

Main stream split -> Paimon. Side output alerts -> Kafka.


Шаг 5: Paimon sink с EXACTLY_ONCE

Apache Paimon — lakehouse-формат для streaming-write workloads. Имеет встроенную поддержку EXACTLY_ONCE через snapshot commits, работает с S3/HDFS.

import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.catalog.CatalogContext;

Configuration paimonConf = new Configuration();
paimonConf.set("warehouse", "s3://lakehouse/paimon");
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(paimonConf));

Table ordersTable = catalog.getTable(Identifier.create("default", "orders_enriched"));

split.sinkTo(
    new FlinkSinkBuilder()
        .forRow(/* row data type */)
        .toTable(ordersTable)
        .build()
).uid("paimon-sink").name("PaimonSink");

Paimon делает snapshot per checkpoint. EXACTLY_ONCE: либо весь checkpoint фиксируется (snapshot commit succeeds), либо весь откатывается.


Шаг 6: Kafka sink с EXACTLY_ONCE для алертов

import org.apache.flink.connector.kafka.sink.*;

KafkaSink<FullOrder> kafkaSink = KafkaSink.<FullOrder>builder()
    .setBootstrapServers("kafka.internal:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.<FullOrder>builder()
        .setTopic("high-priority-alerts")
        .setValueSerializationSchema(new FullOrderAvroSerializer())
        .build())
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    .setTransactionalIdPrefix("flink-orders-pipeline-")
    .setProperty("transaction.timeout.ms", "900000")   // 15 min
    .build();

alerts.sinkTo(kafkaSink).uid("kafka-alerts-sink").name("KafkaAlertsSink");
  • EXACTLY_ONCE через транзакционный producer.
  • transactionalIdPrefix — уникальный per job.
  • transaction.timeout.ms > execution.checkpointing.timeout — иначе транзакции таймаутят раньше checkpoint commit.

Полный main() метод

public class OrdersCapstoneJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Source-ы
        DataStream<Order> orders = env.fromSource(...).uid("orders-cdc-source");
        DataStream<Payment> payments = env.fromSource(...).uid("payments-cdc-source");

        // Join
        DataStream<EnrichedOrder> joined = orders
            .keyBy(o -> o.id)
            .connect(payments.keyBy(p -> p.orderId))
            .process(new OrderPaymentJoin())
            .uid("order-payment-join");

        // Async enrichment
        DataStream<FullOrder> enriched = AsyncDataStream.orderedWait(
            joined, new CustomerEnrichment(), 3, TimeUnit.SECONDS, 100
        ).uid("customer-enrichment");

        // Split
        OutputTag<FullOrder> highPriorityTag = new OutputTag<FullOrder>("high-priority"){};
        SingleOutputStreamOperator<FullOrder> split = enriched
            .process(new SplitFunction(highPriorityTag))
            .uid("paimon-kafka-split");

        // Paimon sink
        split.sinkTo(buildPaimonSink()).uid("paimon-sink");

        // Kafka sink (side output)
        DataStream<FullOrder> alerts = split.getSideOutput(highPriorityTag);
        alerts.sinkTo(buildKafkaSink()).uid("kafka-alerts-sink");

        env.execute("orders-capstone");
    }
}

Все stateful операторы имеют UID. Очень важно — без UID любое изменение графа потеряет state (см. урок 1).


Шаг 7: FlinkDeployment для production

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: orders-capstone
  namespace: flink-prod
spec:
  image: my-registry/orders-capstone:0.1.0
  flinkVersion: v2_2
  serviceAccount: flink

  flinkConfiguration:
    # State + HA
    state.backend.type: rocksdb
    state.checkpoints.dir: s3://flink-state/orders-capstone/checkpoints
    state.savepoints.dir: s3://flink-state/orders-capstone/savepoints
    state.savepoints.format: native
    high-availability.type: kubernetes
    high-availability.storageDir: s3://flink-state/orders-capstone/ha
    kubernetes.cluster-id: orders-capstone-cluster

    # Checkpointing
    execution.checkpointing.interval: "30s"
    execution.checkpointing.min-pause: "15s"
    execution.checkpointing.timeout: "10min"
    execution.checkpointing.unaligned: "true"
    execution.checkpointing.mode: "EXACTLY_ONCE"

    # Performance / safety
    pipeline.disable-generic-types: "true"   # ловить Kryo fallback
    restart-strategy.type: "fixed-delay"
    restart-strategy.fixed-delay.attempts: "5"
    restart-strategy.fixed-delay.delay: "30s"

    # Metrics
    metrics.reporters: "prom"
    metrics.reporter.prom.factory.class: "org.apache.flink.metrics.prometheus.PrometheusReporterFactory"
    metrics.reporter.prom.port: "9249"

    # RocksDB tuning
    taskmanager.memory.process.size: "4096m"
    taskmanager.memory.managed.fraction: "0.4"
    state.backend.rocksdb.memory.managed: "true"

    # S3 параллелизм для checkpoint
    s3.upload.max.concurrent.uploads: "32"

  jobManager:
    replicas: 2
    resource:
      memory: "2048m"
      cpu: 1
    podTemplate:
      spec:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchLabels:
                    component: jobmanager
                topologyKey: kubernetes.io/hostname

  taskManager:
    replicas: 4
    resource:
      memory: "4096m"
      cpu: 2

  job:
    jarURI: local:///opt/flink/usrlib/orders-capstone.jar
    entryClass: com.acme.flink.OrdersCapstoneJob
    parallelism: 8
    upgradeMode: savepoint
    state: running

Это всё в одном YAML — HA, checkpoints, metrics, tuning, restart strategy. Применили и забыли.


Шаг 8: Production checklist

Прежде чем катить в прод этот capstone — пройдите чек-лист:

  • Все stateful операторы имеют explicit UID.
  • StateTtlConfig на ValueStates в OrderPaymentJoin (24h для pending).
  • pipeline.disable-generic-types: true — Kryo fallback запрещён.
  • Integration test с MiniCluster + Testcontainers (MySQL + Kafka) проходит.
  • transaction.timeout.ms > execution.checkpointing.timeout для Kafka sink.
  • Paimon snapshot retention настроен (иначе бесконечный snapshots в S3).
  • MySQL binlog includes the right tables, retention > recovery window (например, 7 дней).
  • HA: JM replicas: 2, storageDir в S3, PodAntiAffinity.
  • Prometheus ServiceMonitor + Grafana dashboard (14911 + custom panels).
  • Alerts: SLO-based + 8 базовых + runbook URL.
  • CI/CD pipeline: integration tests на PR, savepoint-driven deploy на tag.
  • Rollback workflow готов и протестирован на stage.

Что вы научились за весь курс

Запуская этот capstone, вы используете:

  • DataStream API (модуль 3): main API для пайплайна.
  • Keyed state (модуль 4): ValueState в OrderPaymentJoin с TTL.
  • Event-time + watermarks (модуль 6): WatermarkStrategy в source.
  • ProcessFunction (модуль 7): SplitFunction, OrderPaymentJoin.
  • Async I/O (модуль 9): customer enrichment через REST.
  • Checkpoints + savepoints (модуль 10): EXACTLY_ONCE через checkpoint barriers.
  • End-to-end exactly-once (модуль 11): Kafka transactional + Paimon snapshot.
  • Flink CDC (модуль 14): MySqlSource для двух OLTP БД.
  • Kubernetes deploy (модуль 15): FlinkDeployment с HA и monitoring.
  • Production essentials (модуль 16): UID, state TTL, observability, CI/CD.

Это полный production-grade pipeline. Применимо в любой команде, которая делает real-time data processing в большом масштабе.


Ключевые выводы

  1. Полный production-grade пайплайн включает source (CDC), join (stateful), enrichment (async I/O), split (side output), dual sink (Paimon + Kafka) — все на EXACTLY_ONCE.

  2. Flink CDC для MySQL требует уникальных serverId per parallel source, StartupOptions.initial() для snapshot+binlog.

  3. CoProcessFunction для join с двумя ValueStates и StateTtlConfig для cleanup pending state.

  4. Async I/O для slow lookups: orderedWait сохраняет порядок, capacity управляет concurrency, exceptionally/timeout fallback.

  5. Side outputs для split — один пайплайн -> разные sink-и без дублирования.

  6. Paimon sink для lakehouse analytics, Kafka sink для notifications. Оба EXACTLY_ONCE.

  7. FlinkDeployment YAML включает HA, RocksDB tuning, Prometheus metrics, restart strategy, S3 параллелизм — всё в одном манифесте.

  8. Production checklist — обязательная проверка перед первым деплоем.

  9. Этот пайплайн использует всё, чему вы научились в курсе. Это и есть mastery — не отдельные фичи, а их слияние в работающий production-system.

Проверка знанийKnowledge check
Через 2 недели после деплоя capstone в production вы видите: lag в Kafka растёт, throughput упал на 30%, в логах WARN от CustomerEnrichment async function. Метрики: Paimon sink работает нормально, Kafka sink работает нормально, checkpoint duration вырос с 8s до 35s, backpressure на CustomerEnrichment 600 ms/s. Что наиболее вероятно и план действий?
ОтветAnswer
Наиболее вероятная причина: customers-service (REST API для enrichment) деградировал или достиг capacity limit. Это узкое место в пайплайне. План диагностики: 1. (1 минута) Проверить health customers-service: - Метрики самого сервиса (response time p50/p99, error rate). - Если latency p99 поднялся с 100ms до 2s - это причина. Async I/O capacity 100 не помогает, если каждый запрос идёт 2 секунды (effective throughput per slot = 50/sec вместо 2000/sec). - Если error rate высокий - мы видим exceptions в logs CustomerEnrichment. 2. (2 минуты) Проверить network между Flink TM и customers-service: - kubectl exec в TM-под, curl до customers-service - latency проверить. - Возможно, сетевая деградация (cloud provider issue, или NetworkPolicy блокирует часть запросов). 3. (1 минута) Проверить логи WARN: - kubectl logs taskmanager-pod | grep -i "customerEnrichment|timeout" - Если много timeouts - confirms slow external service. 4. Краткосрочные workarounds (на пару часов): a. Увеличить capacity AsyncDataStream с 100 до 200 - больше concurrent requests. Но это нагрузит сам customers-service ещё больше. b. Включить unaligned checkpoints (уже включены в нашей конфигурации) - решает checkpoint duration. c. Поднять parallelism CustomerEnrichment - но если bottleneck в external service, parallelism не поможет. 5. Долгосрочные решения: a. Внедрить кеш в CustomerEnrichment - один и тот же customer часто запрашивается. Использовать MapState с TTL как кеш в TaskManager-е, обращаться к API только при miss. Drastically уменьшит нагрузку на customers-service. b. Добавить fallback: если customers-service down, использовать stale данные из cache + флаг 'enriched_from_cache=true'. c. Связаться с командой customers-service - scale up их сервиса. 6. Если SLA нарушен (lag критичен): - Активировать degraded mode: пропускать enrichment вообще (customer = null), пропускать алерты с full enrichment. Это конфигурируется через feature flag в Flink-конфиге (например, enrichment.enabled). - После восстановления customers-service - снова включить. Превентивно (на будущее): - Циркуит брейкер на async I/O: при error rate > X% временно отключать запросы и использовать fallback. - Monitor + alert на latency customers-service из Flink-job-а: если p99 > 500ms 10 минут - warning. - SLA-based capacity planning: 8 slots × 100 capacity × 0.1s avg = 8000 req/sec capacity. Если customers-service выдерживает только 4000 req/sec - bottleneck предсказуем. Главный урок: external dependencies критичны в streaming pipeline. Async I/O помогает с latency, но не с capacity. Кеш + circuit breaker - production essentials, которые этот capstone не показал, но в проде они должны быть.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. В capstone pipeline AsyncDataStream.orderedWait для customer enrichment установлен с capacity=100 и timeout=3s. customers-service деградировал: response time p50 вырос с 50ms до 1s, p99 с 200ms до 5s. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5