Learning Platform
Глоссарий Troubleshooting
Урок 13.05 · 18 мин
Средний
Table APIDataStream APIConversiontoDataStreamtoChangelogStreamfromDataStream

Table и DataStream — конвертации и выбор API

Flink — это редкий случай движка, где живут два разных программных интерфейса: DataStream API (низкоуровневый, императивный) и Table API/SQL (высокоуровневый, декларативный). Они работают внутри одного job, делят общий StreamExecutionEnvironment, общий state backend, общий checkpoint. Между ними есть мосты, и понимание этих мостов — ключ к продуктивному использованию обоих API.

В этом уроке разберём, как конвертировать туда-обратно, какие ограничения есть в каждом направлении, и как выбирать API под задачу.


Зачем смешивать

Большинство production jobs не выбирают между Table и DataStream — они используют оба. Типичная картина:

Аналогия в Spark: DataFrame vs RDD API
  • Ingestion в DataStream — гибкое управление source, custom deserializers, exotic-форматы.
  • Бизнес-логика в SQL — лаконично, проверяется автоматическим оптимизатором, легко модифицируется.
  • Custom операторы (ProcessFunction, KeyedCoProcessFunction) — там, где SQL не выражает (например, complex ML inference, custom state).
  • Sink часто снова в DataStream — для нестандартных sinks или fine-grained контроля.
// Pseudo-flow одного job:
DataStream<RawEvent> raw = env.fromSource(kafka, ...);                       // DataStream
DataStream<NormalizedEvent> normalized = raw.process(new Normalizer());      // DataStream
Table enriched = tEnv.fromDataStream(normalized).join(rates).select(...);    // Table
DataStream<EnrichedRecord> changelog = tEnv.toChangelogStream(enriched);     // DataStream
changelog.sinkTo(customSink);                                                // DataStream

fromDataStream

fromDataStream(DataStream<T>) превращает DataStream в append-only Table. Каждое событие в потоке становится строкой таблицы.

DataStream<Click> clicks = env.fromSource(kafkaSource, WatermarkStrategy.<Click>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((c, t) -> c.getEventTime().toEpochMilli()), "clicks");

Table clicksTable = tEnv.fromDataStream(clicks);
tEnv.createTemporaryView("clicks", clicksTable);

Table topUrls = tEnv.sqlQuery("""
    SELECT url, COUNT(*) AS cnt
    FROM clicks
    GROUP BY url
    """);

По умолчанию схема инферится из POJO/Tuple/Row класса. Event-time attribute извлекается из WatermarkStrategy потока.

Explicit Schema:

Table clicksTable = tEnv.fromDataStream(
    clicks,
    Schema.newBuilder()
        .column("userId", DataTypes.STRING())
        .column("url", DataTypes.STRING())
        .columnByExpression("eventTime", "CAST(eventTime AS TIMESTAMP_LTZ(3))")
        .watermark("eventTime", "eventTime - INTERVAL '5' SECOND")
        .build()
);

Полезно для:

  • Кастомных type mappings (Java BigDecimal -> Flink DECIMAL(10,2)).
  • Явного объявления event-time/watermark.
  • Добавления computed columns.

toDataStream

toDataStream(Table) обратная операция. Важно: работает только если Table даёт append-only changelog. Если запрос порождает retract или upsert — Flink выкинет compile-time error.

// OK: simple projection
Table filtered = tEnv.sqlQuery("SELECT user_id, url FROM clicks WHERE url LIKE '/product/%'");
DataStream<Row> filteredStream = tEnv.toDataStream(filtered);

// FAIL: SQL exception на compile-time
Table aggregated = tEnv.sqlQuery("SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id");
// tEnv.toDataStream(aggregated);  // Throws TableException — not append-only

Для retract/upsert используется toChangelogStream.

Row — это generic row container. Если хочется типизированный POJO:

DataStream<MyPojo> typed = tEnv.toDataStream(filtered, MyPojo.class);

Flink сделает field-mapping по именам.


toChangelogStream

toChangelogStream(Table) работает для любого changelog-режима: append, upsert, retract. Возвращает DataStream<Row> где каждый Row содержит RowKind в дополнение к колонкам.

Table aggregated = tEnv.sqlQuery("SELECT user_id, COUNT(*) AS cnt FROM clicks GROUP BY user_id");
DataStream<Row> changelog = tEnv.toChangelogStream(aggregated);

changelog.print();
// Вывод:
// +I[alice, 1]
// +I[bob, 1]
// -U[alice, 1]
// +U[alice, 2]
// -U[bob, 1]
// +U[bob, 2]

row.getKind() возвращает RowKind.INSERT, UPDATE_BEFORE, UPDATE_AFTER, или DELETE.

TIP

Если downstream sink не умеет работать с changelog (например, Kafka топик в append-only режиме) — нужно ВРУЧНУЮ обработать changelog. Например, отфильтровать +U (новые значения) и игнорировать -U, -D, +I. Это даёт upsert-семантику для downstream, но теряется delete. Альтернатива: использовать sink, который понимает changelog (upsert-Kafka, Paimon, Iceberg с CDC).


Конкретная схема conversions

Conversion mosty между DataStream и Table

DataStream

DataStream — поток объектов. Append-only по природе. Имеет watermark если настроен WatermarkStrategy.
fromDataStream(): append

Append Table

Table в append-only режиме. Можно делать SELECT/WHERE/projection без агрегаций.

Append Table

Append Table — без агрегаций или с time-window. Результат может быть конвертирован обратно в DataStream через toDataStream.
toDataStream()

DataStream Row

DataStream Row, готов к sink. Подходит для custom-sink, для downstream-DataStream-операций.

Retract/Upsert Table

Retract/Upsert Table — после GROUP BY, JOIN без time-bound, и т.д. Содержит updates и deletes.
toChangelogStream()

DataStream changelog

DataStream Row с RowKind. Каждая строка имеет +I/-U/+U/-D метку. Downstream должен обрабатывать changelog.

Когда какой API выбрать

Берите Table API/SQL когда:

  • Запрос декларативный: проекции, фильтрация, агрегации, joins.
  • Хотите простоту поддержки: SQL понимают все, оптимизатор сделает что нужно.
  • Нужны time-window операции (TUMBLE, HOP, CUMULATE — лаконично в SQL).
  • Хотите явно интегрироваться с lakehouse (Iceberg, Paimon — нативный SQL).
  • Команда смешанная и не вся знает Java/Scala — SQL-запросы могут писать аналитики.

Берите DataStream API когда:

  • Нужен fine-grained state управление: TTL по конкретному ключу, conditional cleanup.
  • Custom timers — отправить событие через 5 минут после получения, если ничего не пришло.
  • Side-outputs — разделить поток на несколько по сложным условиям.
  • ProcessFunction — нужна полная контроль над event + timer + state.
  • Бизнес-логика естественно императивная: state machines, custom protocols.

Смешивайте когда:

  • Custom ingestion (parsing, normalization) + SQL для бизнес-логики + custom sink.
  • 90% — SQL, но один шаг (например, ML inference) требует кастомного оператора.
  • Job начат на SQL, потом одна сложная часть переписана в DataStream — это нормально.

Гибрид: SQL + ProcessFunction

// Шаг 1: Custom DataStream — parsing
DataStream<NormalizedClick> parsed = env.fromSource(kafkaSource, ...)
    .process(new CustomParser());

// Шаг 2: Table API — джойн с rate-каталогом + агрегация
tEnv.createTemporaryView("parsed_clicks", parsed);
Table enriched = tEnv.sqlQuery("""
    SELECT
      window_start,
      window_end,
      pc.user_id,
      COUNT(*) AS cnt
    FROM TABLE(
      TUMBLE(TABLE parsed_clicks, DESCRIPTOR(event_time), INTERVAL '1' MINUTE)
    ) AS pc
    GROUP BY window_start, window_end, pc.user_id
    """);

// Шаг 3: Conversion в DataStream — append-only результат
DataStream<Row> windowResults = tEnv.toDataStream(enriched);

// Шаг 4: Custom DataStream — anomaly detection через ProcessFunction
DataStream<Alert> alerts = windowResults
    .keyBy(r -> r.getFieldAs("user_id"))
    .process(new AnomalyDetector());

// Шаг 5: Sink через DataStream
alerts.sinkTo(alertsKafkaSink);

Эта pattern (parse в DataStream -> agg в SQL -> custom logic в DataStream -> sink в DataStream) — очень распространённая в production. SQL даёт декларативную бизнес-логику, DataStream даёт гибкость на краях.


Попробуй сам

  1. Возьми существующий чисто-DataStream job и попробуй переписать одну агрегацию (GROUP BY) на SQL через fromDataStream -> SQL -> toChangelogStream. Сравни кода.
  2. Что произойдёт, если попытаться toDataStream(table) на результат SELECT ... FROM TABLE(TUMBLE(...)) — append-only TUMBLE? А что для unbounded GROUP BY?
  3. Подумай: какая часть твоего production job была бы более читаемой в SQL, а какая — наоборот в DataStream? Где SQL добавил бы performance-overhead?
Проверка знанийKnowledge check
Команда переписывает Flink job с DataStream API на SQL. После рефакторинга job выдаёт ошибку при попытке tEnv.toDataStream(result), где result = tEnv.sqlQuery("SELECT user_id, SUM(amount) FROM transactions GROUP BY user_id"). Почему ошибка и как её правильно исправить?
ОтветAnswer
Ошибка возникает потому, что SELECT с GROUP BY без time-window порождает retract-changelog (-U/+U события для обновлений), а toDataStream работает только с append-only Table. Compile-time exception: "Table sink does not support consuming update changes". Правильное решение — использовать tEnv.toChangelogStream(result). Это вернёт DataStream Row с явными RowKind (+I/-U/+U/-D). Downstream-код должен учитывать RowKind: либо отфильтровать только +I/+U для upsert-семантики (теряя deletes), либо корректно обрабатывать retract (например, если sink — Paimon или upsert-Kafka, который сам понимает changelog). Альтернатива — переписать запрос так, чтобы он был append-only: добавить TUMBLE окно (SELECT ..., SUM(amount) FROM TABLE(TUMBLE(...)) GROUP BY window_start, ..., user_id) — результат будет emit только при закрытии окна, без updates.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Какая команда правильно конвертирует результат SELECT user_id, COUNT(*) FROM clicks GROUP BY user_id в DataStream?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5