Learning Platform
Глоссарий Troubleshooting
Урок 15.04 · 24 мин
Продвинутый
Materialized tablesFRESHNESSStreaming refreshBatch refreshDISTRIBUTED BYFlink 2.0Flink 2.2

Materialized tables: declarative freshness в Flink 2.x

В Flink 2.0 появилась concept materialized tables — таблиц, которые declaratively описывают результат query, и Flink сам решает, как их поддерживать в актуальном состоянии. В Flink 2.2 это эволюционировало с новой DISTRIBUTED BY/INTO syntax для bucketing.

Это пытается решить one of the long-standing problems streaming SQL: ты пишешь “SELECT count(*) FROM orders GROUP BY date” — это batch query или streaming? Это GROUP BY ключ должен быть partitioned как? Нужен ли continuous job или nightly batch? Materialized tables дают single declarative API.

В этом уроке: что такое materialized tables, как FRESHNESS работает, как Flink выбирает streaming vs batch, и Flink 2.2 enhancements.

Output modes в Spark Structured Streaming

Проблема: streaming vs batch

Сценарий: у тебя есть таблица orders (Paimon, обновляется continuously из Kafka). Тебе нужен daily summary:

SELECT date, count(*) AS order_count, sum(amount) AS total
FROM orders
GROUP BY date;

Как поддерживать этот результат updated? Варианты:

Вариант 1: Streaming job continuous.

  • Flink streaming job consumes orders, aggregates, writes в order_summary table.
  • Result: всегда current (sub-second latency).
  • Cost: full streaming pipeline always running, state для group-by, RocksDB usage.

Вариант 2: Scheduled batch.

  • Каждую ночь: INSERT OVERWRITE order_summary SELECT … FROM orders.
  • Result: 1-day staleness.
  • Cost: один batch job in night.

Вариант 3: Hybrid micro-batch.

  • Каждый час scheduled batch на incremental data.
  • Result: 1-hour staleness.
  • Cost: 24x batch overhead per day.

Choice зависит от business requirement freshness. Если “data must be current within 5 minutes” — streaming. Если “next day OK” — batch. Если “within hour OK” — hourly micro-batch.

Каждый из этих trips deserve different infrastructure (Flink streaming job vs Airflow scheduled vs custom logic). Materialized tables abstract это: задай freshness, остальное Flink сам.


CREATE MATERIALIZED TABLE syntax

CREATE MATERIALIZED TABLE order_summary
  PARTITIONED BY (date)
  FRESHNESS = INTERVAL '5' MINUTE
  REFRESH MODE = AUTO
AS
SELECT date, count(*) AS order_count, sum(amount) AS total
FROM orders
GROUP BY date;

Параметры:

  • FRESHNESS: maximum staleness. INTERVAL '5' MINUTE — данные не старее 5 минут.
  • REFRESH MODE: AUTO (Flink decides), CONTINUOUS (force streaming), FULL (force batch).
  • PARTITIONED BY: same as regular table — partition layout.

При CREATE Flink:

  1. Анализирует query.
  2. Анализирует FRESHNESS.
  3. Анализирует source tables (streaming-compatible? CDC-compatible?).
  4. Выбирает refresh strategy.
  5. Если CONTINUOUS — запускает long-running Flink job.
  6. Если FULL — schedule (через external scheduler — Airflow integration или Flink scheduler).

Storage таблицы — лежит в lakehouse (обычно Paimon, но не ограничено). Видимая через regular SELECT:

SELECT * FROM order_summary;
-- читает Paimon table, latest snapshot

AUTO — это AI Flink планировщика. Decision rules:

ConditionResult
FRESHNESS меньше 1 minuteCONTINUOUS (streaming required)
FRESHNESS >= 1 hour AND source supports batch readFULL (scheduled batch)
FRESHNESS 1-60 minutesusually CONTINUOUS, но depends
Source not streaming-compatibleFULL only

Heuristic не fixed — может развиваться. Идея: Flink выбирает cheapest option meeting requirement.

Пример CONTINUOUS:

CREATE MATERIALIZED TABLE realtime_dashboard
  FRESHNESS = INTERVAL '30' SECOND
AS
SELECT category, count(*), sum(amount)
FROM orders
WHERE event_time > NOW() - INTERVAL '1' HOUR
GROUP BY category;
-- Flink: streaming job continuous, sub-second latency

Пример FULL:

CREATE MATERIALIZED TABLE daily_report
  FRESHNESS = INTERVAL '1' DAY
AS
SELECT DATE(event_time), category, count(*)
FROM orders
WHERE event_time > NOW() - INTERVAL '90' DAY
GROUP BY DATE(event_time), category;
-- Flink: scheduled batch job, runs nightly

Lifecycle и monitoring

После CREATE Flink начинает maintaining materialized table:

-- Просмотр статуса
SHOW MATERIALIZED TABLES;

SELECT * FROM information_schema.materialized_tables;
/*
table_name        | refresh_mode | last_refresh           | next_refresh
order_summary     | CONTINUOUS   | (running)              | -
daily_report      | FULL         | 2026-05-19 00:00:00   | 2026-05-20 00:00:00
*/

-- Просмотр lineage и dependencies
SHOW MATERIALIZED TABLE order_summary;

Управление:

-- Pause refresh (e.g., during maintenance)
ALTER MATERIALIZED TABLE order_summary SUSPEND;

-- Resume
ALTER MATERIALIZED TABLE order_summary RESUME;

-- Refresh on-demand
ALTER MATERIALIZED TABLE order_summary REFRESH;

-- Refresh specific partition
ALTER MATERIALIZED TABLE order_summary REFRESH PARTITION (date = '2026-05-19');

-- Change FRESHNESS
ALTER MATERIALIZED TABLE order_summary
  SET FRESHNESS = INTERVAL '1' MINUTE;

-- Drop
DROP MATERIALIZED TABLE order_summary;

REFRESH PARTITION особенно полезен для FULL mode: если daily report одна partition failed (data issue) — recompute только её, не вся table.


Что под капотом для CONTINUOUS

CONTINUOUS materialized table = managed Flink streaming job. Что Flink делает:

CONTINUOUS materialized table lifecycle
CREATE MVПользователь делает CREATE MATERIALIZED TABLE. Flink SQL parser строит query plan
PlannerFlink Planner анализирует query, source tables, FRESHNESS. Выбирает CONTINUOUS, генерирует streaming job DAG
submit job
Streaming JobStreaming job — обычный Flink job. Reads from source tables (streaming reads если Paimon, или Kafka direct). Writes в materialized table storage (обычно Paimon с changelog-producer=lookup для UPDATE support)
continuous writes
MV StorageMaterialized table storage (Paimon table). Каждый Flink checkpoint = snapshot. Queries (от users / BI tools) читают latest snapshot
SELECT queries
ConsumersUsers / BI tools / dashboards читают materialized table так как обычную table. Не знают что под капотом streaming job. Just see fresh data

Если streaming job fails — Flink scheduler restarts. Если pattern persistent — alerting. Materialized table — abstraction над managed job lifecycle.


Что под капотом для FULL

FULL refresh = scheduled batch job. Flink interfaces:

  1. Internal scheduler: Flink имеет built-in scheduling — cron-like, можно использовать без external tools.
  2. External integration: Airflow / Dagster / etc. Flink exposes refresh API, scheduler triggers.

Flow:

  1. Scheduler triggers refresh (по FRESHNESS interval или manual).
  2. Flink starts batch job: INSERT OVERWRITE materialized_table_storage SELECT ... FROM source.
  3. Batch job полно sweeps source, computes aggregations, writes new state.
  4. После complete — new snapshot visible в materialized table.

Variants:

  • Full refresh: пересчитать всю таблицу. Simple, accurate.
  • Incremental refresh (Flink 2.x feature): только partitions, которые changed (например, daily report — только today’s partition needs recompute если data только сегодня updated). Cheaper, but requires Flink to track changes.

Configuration:

CREATE MATERIALIZED TABLE daily_report
  FRESHNESS = INTERVAL '1' HOUR
  REFRESH MODE = FULL
  AS
  SELECT ...

-- Incremental refresh (через property)
ALTER MATERIALIZED TABLE daily_report SET (
  'refresh.incremental' = 'true'
);

Incremental requires source быть Paimon (или другой format с change tracking) — Iceberg не supports trivially (no native changelog).


В Flink 2.2 добавлена new syntax для bucketing materialized tables:

CREATE MATERIALIZED TABLE order_summary
  DISTRIBUTED BY HASH(date) INTO 32 BUCKETS
  PARTITIONED BY (date)
  FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT date, count(*), sum(amount)
FROM orders
GROUP BY date;

DISTRIBUTED BY HASH(date) INTO 32 BUCKETS:

  • HASH(date): bucket key — date. Records с same date -> same bucket. Это даёт co-locality для aggregations.
  • INTO 32 BUCKETS: fixed bucket count.

Это direct extension Paimon bucketing на materialized tables. Effect:

  • Writer parallelism = 32 (один на bucket).
  • Reads with WHERE date = ... query efficient (only relevant buckets).
  • Group-by на bucket-key happens in-bucket (no cross-bucket shuffle).

Без DISTRIBUTED BY — bucketing default (1 bucket on no PK, или PK-based). Explicit DISTRIBUTED BY lets you tune.

Alternative — DISTRIBUTED BY RANGE (для time-series):

CREATE MATERIALIZED TABLE order_summary
  DISTRIBUTED BY RANGE(date) INTO 32 BUCKETS
  ...

Range distribute дата по time-buckets — useful для temporal queries (range scans), но trade-off: hot buckets для recent data.


Real example: dashboard backing

Сценарий: real-time business dashboard для company. Need:

  • Order volume / revenue last 24 hours (refresh 1 min).
  • Order trends last 30 days (refresh 1 hour).
  • Daily summary last 90 days (refresh daily).
-- Hot, real-time
CREATE MATERIALIZED TABLE dashboard_realtime
  DISTRIBUTED BY HASH(category) INTO 16 BUCKETS
  FRESHNESS = INTERVAL '1' MINUTE
AS
SELECT category, count(*) AS orders, sum(amount) AS revenue
FROM orders
WHERE event_time > NOW() - INTERVAL '24' HOUR
GROUP BY category;
-- Flink: CONTINUOUS streaming job

-- Mid, hourly batch
CREATE MATERIALIZED TABLE dashboard_trends_30d
  DISTRIBUTED BY HASH(date) INTO 32 BUCKETS
  PARTITIONED BY (date)
  FRESHNESS = INTERVAL '1' HOUR
AS
SELECT date, category, count(*), sum(amount)
FROM orders
WHERE event_time > NOW() - INTERVAL '30' DAY
GROUP BY date, category;
-- Flink: FULL incremental refresh каждый час

-- Cold, daily batch
CREATE MATERIALIZED TABLE dashboard_summary_90d
  DISTRIBUTED BY HASH(date) INTO 8 BUCKETS
  PARTITIONED BY (date)
  FRESHNESS = INTERVAL '1' DAY
AS
SELECT date, count(*), sum(amount), avg(amount)
FROM orders
WHERE event_time > NOW() - INTERVAL '90' DAY
GROUP BY date;
-- Flink: FULL refresh, nightly

Dashboard backend reads from all three для different time horizons. От real-time dashboard panel reads dashboard_realtime, history charts — dashboard_trends_30d, monthly summary — dashboard_summary_90d. Каждый optimized по cost (continuous streaming dorogо, batch чешевле).


Limitations и caveats

Materialized tables ещё young (Flink 2.0/2.1/2.2 evolving). Caveats:

1. Streaming refresh requires streaming-compatible query.

Не все SQL constructs работают в streaming. Window functions, certain types of joins, sub-queries — могут force FULL. Если query incompatible с streaming, Flink falls back на FULL или errors.

2. Storage = lakehouse compatibility.

Только Paimon (mature), other storages experimental. Если ты используешь Iceberg/Hudi, materialized tables могут не работать seamlessly.

3. Catalog integration.

Materialized tables — это catalog object. Должен быть catalog который supports их (Paimon catalog full support, others — partial).

4. Cost monitoring.

CONTINUOUS jobs are long-running — easy to forget about. SHOW MATERIALIZED TABLES регулярно — посмотри что running, нужно ли всё это.

-- Audit: какие continuous MVs running
SELECT
  table_name,
  refresh_mode,
  job_id,
  job_running_time
FROM information_schema.materialized_tables
WHERE refresh_mode = 'CONTINUOUS';

5. Schema changes complicated.

ALTER MATERIALIZED TABLE для change query — full refresh из scratch. Heavy operation, может занять hours.

WARNING

Materialized tables — это “managed Flink job” в storage abstraction. Если ты создаёшь 50 materialized tables с CONTINUOUS refresh, ты создаёшь 50 running Flink jobs. JM resources, TM slots, RocksDB state — всё накапливается. Один dashboard может easy give birth к 20-30 MVs. Plan for this, не assume что MV = “free SQL view”. Это full pipeline под капотом.


Comparison: MV vs обычная streaming job

Manual approach без MV:

// Java Flink job, manually authored
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.fromSource(paimonSource("orders"), watermarkStrategy, "orders-source")
   .keyBy(row -> row.getDate())
   .window(TumblingEventTimeWindows.of(Time.minutes(1)))
   .aggregate(new OrderAggregator())
   .sinkTo(paimonSink("order_summary"));

env.execute("order-summary-job");

Equivalent MV:

CREATE MATERIALIZED TABLE order_summary
  FRESHNESS = INTERVAL '1' MINUTE
AS
SELECT date, count(*), sum(amount)
FROM orders
WHERE event_time >= TUMBLE_START(event_time, INTERVAL '1' MINUTE)
GROUP BY date, TUMBLE(event_time, INTERVAL '1' MINUTE);

Trade-offs:

  • MV: declarative, simpler, managed by Flink scheduler.
  • Manual: more control, can use any Java/Python features, separate lifecycle.

Для simple aggregations — MV. Для complex transformations с custom logic — manual streaming job.


Проверка знанийKnowledge check
Команда хочет real-time dashboard для key business metric. Уже есть orders Paimon table (updated через streaming Flink job). Создают CREATE MATERIALIZED TABLE с FRESHNESS = INTERVAL '5' SECOND. Через час обнаруживают: Flink cluster resource usage вырос 3x, JobManager metadata growth большой, materialized table показывает данные с 30s latency не 5s. Что произошло?
ОтветAnswer
Несколько проблем: (1) FRESHNESS = 5 sec очень агрессивная requirement. Под капотом CONTINUOUS materialized table — это full Flink streaming job. Checkpoint interval должен быть значительно меньше 5s — но minimum recommended checkpoint = 5-10s (любой меньше — overhead dominates work). Set checkpoint = 2s? RocksDB checkpoint каждые 2s — постоянный disk IO, snapshot lifecycle overhead. (2) Snapshot lifecycle создаёт snapshot Paimon table каждый checkpoint — за час 1800 snapshots. Manifest growth, query planning slows down. (3) Resource usage 3x — streaming job нон-стоп running, не sharing slots с другими jobs (probably). Caveat: каждая materialized table — independent job, не shared infra. (4) 30s actual latency vs 5s target — вероятно checkpoint settling + Paimon visibility lag. Также Paimon LATEST pointer update не instant в S3 (eventual consistency). Fixes: (1) Relax FRESHNESS до 30s-1min — realistic для streaming. 5s — для in-memory aggregations или custom Java job, не SQL MV. (2) Если really need 5s — use Fluss как storage (sub-second visibility) вместо Paimon, or direct streaming pipeline в Redis / cache. (3) Поднять checkpoint interval до 10s, accept что latency ~30s. (4) Group multiple MVs в один job if dependencies share — reduces resource (но MV не gives this control automatically). Key lesson: MV — это convenient abstraction, но not magical. Performance constraints прежнего streaming pipeline всё ещё apply. 5s freshness не free.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие задачи решает CREATE MATERIALIZED TABLE с FRESHNESS специфицирована?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5