Table и DataStream — конвертации и выбор API
Flink — это редкий случай движка, где живут два разных программных интерфейса: DataStream API (низкоуровневый, императивный) и Table API/SQL (высокоуровневый, декларативный). Они работают внутри одного job, делят общий StreamExecutionEnvironment, общий state backend, общий checkpoint. Между ними есть мосты, и понимание этих мостов — ключ к продуктивному использованию обоих API.
В этом уроке разберём, как конвертировать туда-обратно, какие ограничения есть в каждом направлении, и как выбирать API под задачу.
Зачем смешивать
Большинство production jobs не выбирают между Table и DataStream — они используют оба. Типичная картина:
Аналогия в Spark: DataFrame vs RDD API- Ingestion в DataStream — гибкое управление source, custom deserializers, exotic-форматы.
- Бизнес-логика в SQL — лаконично, проверяется автоматическим оптимизатором, легко модифицируется.
- Custom операторы (ProcessFunction, KeyedCoProcessFunction) — там, где SQL не выражает (например, complex ML inference, custom state).
- Sink часто снова в DataStream — для нестандартных sinks или fine-grained контроля.
// Pseudo-flow одного job:
DataStream<RawEvent> raw = env.fromSource(kafka, ...); // DataStream
DataStream<NormalizedEvent> normalized = raw.process(new Normalizer()); // DataStream
Table enriched = tEnv.fromDataStream(normalized).join(rates).select(...); // Table
DataStream<EnrichedRecord> changelog = tEnv.toChangelogStream(enriched); // DataStream
changelog.sinkTo(customSink); // DataStream
fromDataStream
fromDataStream(DataStream<T>) превращает DataStream в append-only Table. Каждое событие в потоке становится строкой таблицы.
DataStream<Click> clicks = env.fromSource(kafkaSource, WatermarkStrategy.<Click>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((c, t) -> c.getEventTime().toEpochMilli()), "clicks");
Table clicksTable = tEnv.fromDataStream(clicks);
tEnv.createTemporaryView("clicks", clicksTable);
Table topUrls = tEnv.sqlQuery("""
SELECT url, COUNT(*) AS cnt
FROM clicks
GROUP BY url
""");
По умолчанию схема инферится из POJO/Tuple/Row класса. Event-time attribute извлекается из WatermarkStrategy потока.
Explicit Schema:
Table clicksTable = tEnv.fromDataStream(
clicks,
Schema.newBuilder()
.column("userId", DataTypes.STRING())
.column("url", DataTypes.STRING())
.columnByExpression("eventTime", "CAST(eventTime AS TIMESTAMP_LTZ(3))")
.watermark("eventTime", "eventTime - INTERVAL '5' SECOND")
.build()
);
Полезно для:
- Кастомных type mappings (Java BigDecimal -> Flink DECIMAL(10,2)).
- Явного объявления event-time/watermark.
- Добавления computed columns.
toDataStream
toDataStream(Table) обратная операция. Важно: работает только если Table даёт append-only changelog. Если запрос порождает retract или upsert — Flink выкинет compile-time error.
// OK: simple projection
Table filtered = tEnv.sqlQuery("SELECT user_id, url FROM clicks WHERE url LIKE '/product/%'");
DataStream<Row> filteredStream = tEnv.toDataStream(filtered);
// FAIL: SQL exception на compile-time
Table aggregated = tEnv.sqlQuery("SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id");
// tEnv.toDataStream(aggregated); // Throws TableException — not append-only
Для retract/upsert используется toChangelogStream.
Row — это generic row container. Если хочется типизированный POJO:
DataStream<MyPojo> typed = tEnv.toDataStream(filtered, MyPojo.class);
Flink сделает field-mapping по именам.
toChangelogStream
toChangelogStream(Table) работает для любого changelog-режима: append, upsert, retract. Возвращает DataStream<Row> где каждый Row содержит RowKind в дополнение к колонкам.
Table aggregated = tEnv.sqlQuery("SELECT user_id, COUNT(*) AS cnt FROM clicks GROUP BY user_id");
DataStream<Row> changelog = tEnv.toChangelogStream(aggregated);
changelog.print();
// Вывод:
// +I[alice, 1]
// +I[bob, 1]
// -U[alice, 1]
// +U[alice, 2]
// -U[bob, 1]
// +U[bob, 2]
row.getKind() возвращает RowKind.INSERT, UPDATE_BEFORE, UPDATE_AFTER, или DELETE.
Если downstream sink не умеет работать с changelog (например, Kafka топик в append-only режиме) — нужно ВРУЧНУЮ обработать changelog. Например, отфильтровать +U (новые значения) и игнорировать -U, -D, +I. Это даёт upsert-семантику для downstream, но теряется delete. Альтернатива: использовать sink, который понимает changelog (upsert-Kafka, Paimon, Iceberg с CDC).
Конкретная схема conversions
DataStream
DataStream — поток объектов. Append-only по природе. Имеет watermark если настроен WatermarkStrategy.Append Table
Table в append-only режиме. Можно делать SELECT/WHERE/projection без агрегаций.Append Table
Append Table — без агрегаций или с time-window. Результат может быть конвертирован обратно в DataStream через toDataStream.DataStream Row
DataStream Row, готов к sink. Подходит для custom-sink, для downstream-DataStream-операций.Retract/Upsert Table
Retract/Upsert Table — после GROUP BY, JOIN без time-bound, и т.д. Содержит updates и deletes.DataStream changelog
DataStream Row с RowKind. Каждая строка имеет +I/-U/+U/-D метку. Downstream должен обрабатывать changelog.Когда какой API выбрать
Берите Table API/SQL когда:
- Запрос декларативный: проекции, фильтрация, агрегации, joins.
- Хотите простоту поддержки: SQL понимают все, оптимизатор сделает что нужно.
- Нужны time-window операции (TUMBLE, HOP, CUMULATE — лаконично в SQL).
- Хотите явно интегрироваться с lakehouse (Iceberg, Paimon — нативный SQL).
- Команда смешанная и не вся знает Java/Scala — SQL-запросы могут писать аналитики.
Берите DataStream API когда:
- Нужен fine-grained state управление: TTL по конкретному ключу, conditional cleanup.
- Custom timers — отправить событие через 5 минут после получения, если ничего не пришло.
- Side-outputs — разделить поток на несколько по сложным условиям.
- ProcessFunction — нужна полная контроль над event + timer + state.
- Бизнес-логика естественно императивная: state machines, custom protocols.
Смешивайте когда:
- Custom ingestion (parsing, normalization) + SQL для бизнес-логики + custom sink.
- 90% — SQL, но один шаг (например, ML inference) требует кастомного оператора.
- Job начат на SQL, потом одна сложная часть переписана в DataStream — это нормально.
Гибрид: SQL + ProcessFunction
// Шаг 1: Custom DataStream — parsing
DataStream<NormalizedClick> parsed = env.fromSource(kafkaSource, ...)
.process(new CustomParser());
// Шаг 2: Table API — джойн с rate-каталогом + агрегация
tEnv.createTemporaryView("parsed_clicks", parsed);
Table enriched = tEnv.sqlQuery("""
SELECT
window_start,
window_end,
pc.user_id,
COUNT(*) AS cnt
FROM TABLE(
TUMBLE(TABLE parsed_clicks, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
) AS pc
GROUP BY window_start, window_end, pc.user_id
""");
// Шаг 3: Conversion в DataStream — append-only результат
DataStream<Row> windowResults = tEnv.toDataStream(enriched);
// Шаг 4: Custom DataStream — anomaly detection через ProcessFunction
DataStream<Alert> alerts = windowResults
.keyBy(r -> r.getFieldAs("user_id"))
.process(new AnomalyDetector());
// Шаг 5: Sink через DataStream
alerts.sinkTo(alertsKafkaSink);
Эта pattern (parse в DataStream -> agg в SQL -> custom logic в DataStream -> sink в DataStream) — очень распространённая в production. SQL даёт декларативную бизнес-логику, DataStream даёт гибкость на краях.
Попробуй сам
- Возьми существующий чисто-DataStream job и попробуй переписать одну агрегацию (GROUP BY) на SQL через
fromDataStream-> SQL ->toChangelogStream. Сравни кода. - Что произойдёт, если попытаться
toDataStream(table)на результатSELECT ... FROM TABLE(TUMBLE(...))— append-only TUMBLE? А что для unbounded GROUP BY? - Подумай: какая часть твоего production job была бы более читаемой в SQL, а какая — наоборот в DataStream? Где SQL добавил бы performance-overhead?