Capstone: MySQL CDC -> Flink enrichment -> Paimon + Kafka
Все 16 модулей курса сводятся к этому уроку. Реальный production-grade пайплайн, который вы могли бы развернуть завтра в боевом контуре. Цель не “продемонстрировать features”, а собрать всё вместе: source, state, exactly-once, async I/O, dual sinks, deploy на K8s через Operator.
Сценарий: финтех-компания. У них MySQL c таблицами orders и payments. Нужно: real-time enrichment событий через lookup в customers (другая MySQL, медленная), и dual sink — данные в Apache Paimon для аналитики (BI dashboards) И в Kafka для downstream notifications (SMS, email, fraud detection). End-to-end exactly-once обязателен (это финансы — дубликаты SMS-уведомлений недопустимы).
Архитектура пайплайна
Это не игрушка. Каждый блок применим к реальному production.
Шаг 1: Flink CDC source для двух MySQL
Используем Flink CDC connector — он обёртка над Debezium, читает MySQL binlog, эмитит CDC events.
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
MySqlSource<Order> ordersSource = MySqlSource.<Order>builder()
.hostname("mysql-orders.internal")
.port(3306)
.databaseList("orders_db")
.tableList("orders_db.orders")
.username("flink_cdc_user")
.password("...")
.serverId("5400-5404") // unique per parallel source instance
.startupOptions(StartupOptions.initial()) // snapshot then binlog
.deserializer(new OrderDeserializationSchema())
.build();
MySqlSource<Payment> paymentsSource = MySqlSource.<Payment>builder()
.hostname("mysql-payments.internal")
.port(3306)
.databaseList("payments_db")
.tableList("payments_db.payments")
.username("flink_cdc_user")
.password("...")
.serverId("5500-5504") // отличный от orders
.startupOptions(StartupOptions.initial())
.deserializer(new PaymentDeserializationSchema())
.build();
DataStream<Order> orders = env.fromSource(
ordersSource, WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((o, ts) -> o.timestamp),
"orders-cdc"
).uid("orders-cdc-source");
DataStream<Payment> payments = env.fromSource(
paymentsSource, WatermarkStrategy.<Payment>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((p, ts) -> p.timestamp),
"payments-cdc"
).uid("payments-cdc-source");
Ключевые моменты:
- serverId уникальный per Source instance. MySQL не разрешает несколько consumers с одним serverId на binlog. Диапазон
5400-5404для parallelism 4-5. - StartupOptions.initial() — сначала snapshot существующих данных, потом binlog. Альтернатива
latest()— только новые события с момента старта. - UID для source обязателен.
- Watermark на event_time из MySQL row timestamp.
Шаг 2: Join orders + payments через keyed state
Бизнес-логика: order считается “обработанным”, когда есть и order, и payment. Joining через CoProcessFunction по order_id.
public class OrderPaymentJoin extends KeyedCoProcessFunction<Long, Order, Payment, EnrichedOrder> {
private transient ValueState<Order> pendingOrder;
private transient ValueState<Payment> pendingPayment;
@Override
public void open(Configuration parameters) {
StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.hours(24))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.build();
ValueStateDescriptor<Order> orderDesc =
new ValueStateDescriptor<>("pending-order", Order.class);
orderDesc.enableTimeToLive(ttl);
pendingOrder = getRuntimeContext().getState(orderDesc);
ValueStateDescriptor<Payment> paymentDesc =
new ValueStateDescriptor<>("pending-payment", Payment.class);
paymentDesc.enableTimeToLive(ttl);
pendingPayment = getRuntimeContext().getState(paymentDesc);
}
@Override
public void processElement1(Order order, Context ctx, Collector<EnrichedOrder> out) throws Exception {
Payment payment = pendingPayment.value();
if (payment != null) {
out.collect(new EnrichedOrder(order, payment));
pendingPayment.clear();
} else {
pendingOrder.update(order);
}
}
@Override
public void processElement2(Payment payment, Context ctx, Collector<EnrichedOrder> out) throws Exception {
Order order = pendingOrder.value();
if (order != null) {
out.collect(new EnrichedOrder(order, payment));
pendingOrder.clear();
} else {
pendingPayment.update(payment);
}
}
}
DataStream<EnrichedOrder> joined = orders
.keyBy(o -> o.id)
.connect(payments.keyBy(p -> p.orderId))
.process(new OrderPaymentJoin())
.uid("order-payment-join")
.name("OrderPaymentJoin");
- StateTtlConfig: pending orders/payments удаляются через 24 часа (избегаем unbounded state, если payment никогда не придёт).
- Two ValueStates — потому что событие может прийти первым с любой стороны.
- state.clear() после match — освобождаем место.
Шаг 3: Async I/O enrichment через REST customers-service
Каждый EnrichedOrder нужно обогатить данными клиента. Customers-service - медленный REST API (avg 50ms). Sync lookup бы зарезал throughput. Async I/O решает.
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
public class CustomerEnrichment extends RichAsyncFunction<EnrichedOrder, FullOrder> {
private transient HttpClient httpClient;
@Override
public void open(Configuration parameters) {
httpClient = HttpClient.newBuilder()
.executor(Executors.newFixedThreadPool(50))
.build();
}
@Override
public void asyncInvoke(EnrichedOrder order, ResultFuture<FullOrder> resultFuture) {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://customers-service/v1/customer/" + order.customerId))
.timeout(Duration.ofSeconds(2))
.build();
httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenApply(response -> parseCustomer(response.body()))
.thenAccept(customer -> resultFuture.complete(
Collections.singletonList(new FullOrder(order, customer))
))
.exceptionally(ex -> {
// Не блокируем pipeline, шлём с null customer
resultFuture.complete(Collections.singletonList(
new FullOrder(order, null)
));
return null;
});
}
@Override
public void timeout(EnrichedOrder order, ResultFuture<FullOrder> resultFuture) {
// Default 3 sec timeout - emit с null customer
resultFuture.complete(Collections.singletonList(new FullOrder(order, null)));
}
}
DataStream<FullOrder> enriched = AsyncDataStream.orderedWait(
joined,
new CustomerEnrichment(),
3, TimeUnit.SECONDS, // timeout per request
100 // capacity (concurrent in-flight)
).uid("customer-enrichment").name("CustomerEnrichment");
- orderedWait сохраняет порядок (важно для финансовых событий).
unorderedWaitбыстрее, если порядок не нужен. - capacity: 100 — до 100 одновременных запросов per task. Throughput = 100 / 0.05s = 2000/sec per slot.
- exceptionally + timeout — fallback, чтобы один медленный API не блокировал пайплайн.
Шаг 4: Split -> Paimon (всё) + Kafka (только алерты)
Раздваиваем поток через side output: основной поток в Paimon (все события для analytics), side output в Kafka (только high-priority alerts для notifications).
import org.apache.flink.util.OutputTag;
OutputTag<FullOrder> highPriorityTag = new OutputTag<FullOrder>("high-priority"){};
SingleOutputStreamOperator<FullOrder> split = enriched
.process(new ProcessFunction<FullOrder, FullOrder>() {
@Override
public void processElement(FullOrder order, Context ctx, Collector<FullOrder> out) {
out.collect(order); // main stream -> Paimon
if (order.amount > 10000 || isHighRiskUser(order.customer)) {
ctx.output(highPriorityTag, order); // side -> Kafka
}
}
})
.uid("paimon-kafka-split")
.name("PaimonKafkaSplit");
DataStream<FullOrder> alerts = split.getSideOutput(highPriorityTag);
Main stream split -> Paimon. Side output alerts -> Kafka.
Шаг 5: Paimon sink с EXACTLY_ONCE
Apache Paimon — lakehouse-формат для streaming-write workloads. Имеет встроенную поддержку EXACTLY_ONCE через snapshot commits, работает с S3/HDFS.
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.catalog.CatalogContext;
Configuration paimonConf = new Configuration();
paimonConf.set("warehouse", "s3://lakehouse/paimon");
Catalog catalog = CatalogFactory.createCatalog(CatalogContext.create(paimonConf));
Table ordersTable = catalog.getTable(Identifier.create("default", "orders_enriched"));
split.sinkTo(
new FlinkSinkBuilder()
.forRow(/* row data type */)
.toTable(ordersTable)
.build()
).uid("paimon-sink").name("PaimonSink");
Paimon делает snapshot per checkpoint. EXACTLY_ONCE: либо весь checkpoint фиксируется (snapshot commit succeeds), либо весь откатывается.
Шаг 6: Kafka sink с EXACTLY_ONCE для алертов
import org.apache.flink.connector.kafka.sink.*;
KafkaSink<FullOrder> kafkaSink = KafkaSink.<FullOrder>builder()
.setBootstrapServers("kafka.internal:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.<FullOrder>builder()
.setTopic("high-priority-alerts")
.setValueSerializationSchema(new FullOrderAvroSerializer())
.build())
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-orders-pipeline-")
.setProperty("transaction.timeout.ms", "900000") // 15 min
.build();
alerts.sinkTo(kafkaSink).uid("kafka-alerts-sink").name("KafkaAlertsSink");
- EXACTLY_ONCE через транзакционный producer.
- transactionalIdPrefix — уникальный per job.
- transaction.timeout.ms > execution.checkpointing.timeout — иначе транзакции таймаутят раньше checkpoint commit.
Полный main() метод
public class OrdersCapstoneJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Source-ы
DataStream<Order> orders = env.fromSource(...).uid("orders-cdc-source");
DataStream<Payment> payments = env.fromSource(...).uid("payments-cdc-source");
// Join
DataStream<EnrichedOrder> joined = orders
.keyBy(o -> o.id)
.connect(payments.keyBy(p -> p.orderId))
.process(new OrderPaymentJoin())
.uid("order-payment-join");
// Async enrichment
DataStream<FullOrder> enriched = AsyncDataStream.orderedWait(
joined, new CustomerEnrichment(), 3, TimeUnit.SECONDS, 100
).uid("customer-enrichment");
// Split
OutputTag<FullOrder> highPriorityTag = new OutputTag<FullOrder>("high-priority"){};
SingleOutputStreamOperator<FullOrder> split = enriched
.process(new SplitFunction(highPriorityTag))
.uid("paimon-kafka-split");
// Paimon sink
split.sinkTo(buildPaimonSink()).uid("paimon-sink");
// Kafka sink (side output)
DataStream<FullOrder> alerts = split.getSideOutput(highPriorityTag);
alerts.sinkTo(buildKafkaSink()).uid("kafka-alerts-sink");
env.execute("orders-capstone");
}
}
Все stateful операторы имеют UID. Очень важно — без UID любое изменение графа потеряет state (см. урок 1).
Шаг 7: FlinkDeployment для production
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: orders-capstone
namespace: flink-prod
spec:
image: my-registry/orders-capstone:0.1.0
flinkVersion: v2_2
serviceAccount: flink
flinkConfiguration:
# State + HA
state.backend.type: rocksdb
state.checkpoints.dir: s3://flink-state/orders-capstone/checkpoints
state.savepoints.dir: s3://flink-state/orders-capstone/savepoints
state.savepoints.format: native
high-availability.type: kubernetes
high-availability.storageDir: s3://flink-state/orders-capstone/ha
kubernetes.cluster-id: orders-capstone-cluster
# Checkpointing
execution.checkpointing.interval: "30s"
execution.checkpointing.min-pause: "15s"
execution.checkpointing.timeout: "10min"
execution.checkpointing.unaligned: "true"
execution.checkpointing.mode: "EXACTLY_ONCE"
# Performance / safety
pipeline.disable-generic-types: "true" # ловить Kryo fallback
restart-strategy.type: "fixed-delay"
restart-strategy.fixed-delay.attempts: "5"
restart-strategy.fixed-delay.delay: "30s"
# Metrics
metrics.reporters: "prom"
metrics.reporter.prom.factory.class: "org.apache.flink.metrics.prometheus.PrometheusReporterFactory"
metrics.reporter.prom.port: "9249"
# RocksDB tuning
taskmanager.memory.process.size: "4096m"
taskmanager.memory.managed.fraction: "0.4"
state.backend.rocksdb.memory.managed: "true"
# S3 параллелизм для checkpoint
s3.upload.max.concurrent.uploads: "32"
jobManager:
replicas: 2
resource:
memory: "2048m"
cpu: 1
podTemplate:
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
component: jobmanager
topologyKey: kubernetes.io/hostname
taskManager:
replicas: 4
resource:
memory: "4096m"
cpu: 2
job:
jarURI: local:///opt/flink/usrlib/orders-capstone.jar
entryClass: com.acme.flink.OrdersCapstoneJob
parallelism: 8
upgradeMode: savepoint
state: running
Это всё в одном YAML — HA, checkpoints, metrics, tuning, restart strategy. Применили и забыли.
Шаг 8: Production checklist
Прежде чем катить в прод этот capstone — пройдите чек-лист:
- Все stateful операторы имеют explicit UID.
- StateTtlConfig на ValueStates в OrderPaymentJoin (24h для pending).
-
pipeline.disable-generic-types: true— Kryo fallback запрещён. - Integration test с MiniCluster + Testcontainers (MySQL + Kafka) проходит.
-
transaction.timeout.ms>execution.checkpointing.timeoutдля Kafka sink. - Paimon snapshot retention настроен (иначе бесконечный snapshots в S3).
- MySQL binlog includes the right tables, retention > recovery window (например, 7 дней).
- HA: JM replicas: 2, storageDir в S3, PodAntiAffinity.
- Prometheus ServiceMonitor + Grafana dashboard (14911 + custom panels).
- Alerts: SLO-based + 8 базовых + runbook URL.
- CI/CD pipeline: integration tests на PR, savepoint-driven deploy на tag.
- Rollback workflow готов и протестирован на stage.
Что вы научились за весь курс
Запуская этот capstone, вы используете:
- DataStream API (модуль 3): main API для пайплайна.
- Keyed state (модуль 4): ValueState в OrderPaymentJoin с TTL.
- Event-time + watermarks (модуль 6): WatermarkStrategy в source.
- ProcessFunction (модуль 7): SplitFunction, OrderPaymentJoin.
- Async I/O (модуль 9): customer enrichment через REST.
- Checkpoints + savepoints (модуль 10): EXACTLY_ONCE через checkpoint barriers.
- End-to-end exactly-once (модуль 11): Kafka transactional + Paimon snapshot.
- Flink CDC (модуль 14): MySqlSource для двух OLTP БД.
- Kubernetes deploy (модуль 15): FlinkDeployment с HA и monitoring.
- Production essentials (модуль 16): UID, state TTL, observability, CI/CD.
Это полный production-grade pipeline. Применимо в любой команде, которая делает real-time data processing в большом масштабе.
Ключевые выводы
-
Полный production-grade пайплайн включает source (CDC), join (stateful), enrichment (async I/O), split (side output), dual sink (Paimon + Kafka) — все на EXACTLY_ONCE.
-
Flink CDC для MySQL требует уникальных serverId per parallel source, StartupOptions.initial() для snapshot+binlog.
-
CoProcessFunction для join с двумя ValueStates и StateTtlConfig для cleanup pending state.
-
Async I/O для slow lookups: orderedWait сохраняет порядок, capacity управляет concurrency, exceptionally/timeout fallback.
-
Side outputs для split — один пайплайн -> разные sink-и без дублирования.
-
Paimon sink для lakehouse analytics, Kafka sink для notifications. Оба EXACTLY_ONCE.
-
FlinkDeployment YAML включает HA, RocksDB tuning, Prometheus metrics, restart strategy, S3 параллелизм — всё в одном манифесте.
-
Production checklist — обязательная проверка перед первым деплоем.
-
Этот пайплайн использует всё, чему вы научились в курсе. Это и есть mastery — не отдельные фичи, а их слияние в работающий production-system.