Materialized tables: declarative freshness в Flink 2.x
В Flink 2.0 появилась concept materialized tables — таблиц, которые declaratively описывают результат query, и Flink сам решает, как их поддерживать в актуальном состоянии. В Flink 2.2 это эволюционировало с новой DISTRIBUTED BY/INTO syntax для bucketing.
Это пытается решить one of the long-standing problems streaming SQL: ты пишешь “SELECT count(*) FROM orders GROUP BY date” — это batch query или streaming? Это GROUP BY ключ должен быть partitioned как? Нужен ли continuous job или nightly batch? Materialized tables дают single declarative API.
В этом уроке: что такое materialized tables, как FRESHNESS работает, как Flink выбирает streaming vs batch, и Flink 2.2 enhancements.
Output modes в Spark Structured StreamingПроблема: streaming vs batch
Сценарий: у тебя есть таблица orders (Paimon, обновляется continuously из Kafka). Тебе нужен daily summary:
SELECT date, count(*) AS order_count, sum(amount) AS total
FROM orders
GROUP BY date;
Как поддерживать этот результат updated? Варианты:
Вариант 1: Streaming job continuous.
- Flink streaming job consumes orders, aggregates, writes в
order_summarytable. - Result: всегда current (sub-second latency).
- Cost: full streaming pipeline always running, state для group-by, RocksDB usage.
Вариант 2: Scheduled batch.
- Каждую ночь: INSERT OVERWRITE order_summary SELECT … FROM orders.
- Result: 1-day staleness.
- Cost: один batch job in night.
Вариант 3: Hybrid micro-batch.
- Каждый час scheduled batch на incremental data.
- Result: 1-hour staleness.
- Cost: 24x batch overhead per day.
Choice зависит от business requirement freshness. Если “data must be current within 5 minutes” — streaming. Если “next day OK” — batch. Если “within hour OK” — hourly micro-batch.
Каждый из этих trips deserve different infrastructure (Flink streaming job vs Airflow scheduled vs custom logic). Materialized tables abstract это: задай freshness, остальное Flink сам.
CREATE MATERIALIZED TABLE syntax
CREATE MATERIALIZED TABLE order_summary
PARTITIONED BY (date)
FRESHNESS = INTERVAL '5' MINUTE
REFRESH MODE = AUTO
AS
SELECT date, count(*) AS order_count, sum(amount) AS total
FROM orders
GROUP BY date;
Параметры:
- FRESHNESS: maximum staleness.
INTERVAL '5' MINUTE— данные не старее 5 минут. - REFRESH MODE: AUTO (Flink decides), CONTINUOUS (force streaming), FULL (force batch).
- PARTITIONED BY: same as regular table — partition layout.
При CREATE Flink:
- Анализирует query.
- Анализирует FRESHNESS.
- Анализирует source tables (streaming-compatible? CDC-compatible?).
- Выбирает refresh strategy.
- Если CONTINUOUS — запускает long-running Flink job.
- Если FULL — schedule (через external scheduler — Airflow integration или Flink scheduler).
Storage таблицы — лежит в lakehouse (обычно Paimon, но не ограничено). Видимая через regular SELECT:
SELECT * FROM order_summary;
-- читает Paimon table, latest snapshot
REFRESH MODE = AUTO: как Flink решает
AUTO — это AI Flink планировщика. Decision rules:
| Condition | Result |
|---|---|
| FRESHNESS меньше 1 minute | CONTINUOUS (streaming required) |
| FRESHNESS >= 1 hour AND source supports batch read | FULL (scheduled batch) |
| FRESHNESS 1-60 minutes | usually CONTINUOUS, но depends |
| Source not streaming-compatible | FULL only |
Heuristic не fixed — может развиваться. Идея: Flink выбирает cheapest option meeting requirement.
Пример CONTINUOUS:
CREATE MATERIALIZED TABLE realtime_dashboard
FRESHNESS = INTERVAL '30' SECOND
AS
SELECT category, count(*), sum(amount)
FROM orders
WHERE event_time > NOW() - INTERVAL '1' HOUR
GROUP BY category;
-- Flink: streaming job continuous, sub-second latency
Пример FULL:
CREATE MATERIALIZED TABLE daily_report
FRESHNESS = INTERVAL '1' DAY
AS
SELECT DATE(event_time), category, count(*)
FROM orders
WHERE event_time > NOW() - INTERVAL '90' DAY
GROUP BY DATE(event_time), category;
-- Flink: scheduled batch job, runs nightly
Lifecycle и monitoring
После CREATE Flink начинает maintaining materialized table:
-- Просмотр статуса
SHOW MATERIALIZED TABLES;
SELECT * FROM information_schema.materialized_tables;
/*
table_name | refresh_mode | last_refresh | next_refresh
order_summary | CONTINUOUS | (running) | -
daily_report | FULL | 2026-05-19 00:00:00 | 2026-05-20 00:00:00
*/
-- Просмотр lineage и dependencies
SHOW MATERIALIZED TABLE order_summary;
Управление:
-- Pause refresh (e.g., during maintenance)
ALTER MATERIALIZED TABLE order_summary SUSPEND;
-- Resume
ALTER MATERIALIZED TABLE order_summary RESUME;
-- Refresh on-demand
ALTER MATERIALIZED TABLE order_summary REFRESH;
-- Refresh specific partition
ALTER MATERIALIZED TABLE order_summary REFRESH PARTITION (date = '2026-05-19');
-- Change FRESHNESS
ALTER MATERIALIZED TABLE order_summary
SET FRESHNESS = INTERVAL '1' MINUTE;
-- Drop
DROP MATERIALIZED TABLE order_summary;
REFRESH PARTITION особенно полезен для FULL mode: если daily report одна partition failed (data issue) — recompute только её, не вся table.
Что под капотом для CONTINUOUS
CONTINUOUS materialized table = managed Flink streaming job. Что Flink делает:
Если streaming job fails — Flink scheduler restarts. Если pattern persistent — alerting. Materialized table — abstraction над managed job lifecycle.
Что под капотом для FULL
FULL refresh = scheduled batch job. Flink interfaces:
- Internal scheduler: Flink имеет built-in scheduling — cron-like, можно использовать без external tools.
- External integration: Airflow / Dagster / etc. Flink exposes refresh API, scheduler triggers.
Flow:
- Scheduler triggers refresh (по FRESHNESS interval или manual).
- Flink starts batch job:
INSERT OVERWRITE materialized_table_storage SELECT ... FROM source. - Batch job полно sweeps source, computes aggregations, writes new state.
- После complete — new snapshot visible в materialized table.
Variants:
- Full refresh: пересчитать всю таблицу. Simple, accurate.
- Incremental refresh (Flink 2.x feature): только partitions, которые changed (например, daily report — только today’s partition needs recompute если data только сегодня updated). Cheaper, but requires Flink to track changes.
Configuration:
CREATE MATERIALIZED TABLE daily_report
FRESHNESS = INTERVAL '1' HOUR
REFRESH MODE = FULL
AS
SELECT ...
-- Incremental refresh (через property)
ALTER MATERIALIZED TABLE daily_report SET (
'refresh.incremental' = 'true'
);
Incremental requires source быть Paimon (или другой format с change tracking) — Iceberg не supports trivially (no native changelog).
Flink 2.2: DISTRIBUTED BY / INTO
В Flink 2.2 добавлена new syntax для bucketing materialized tables:
CREATE MATERIALIZED TABLE order_summary
DISTRIBUTED BY HASH(date) INTO 32 BUCKETS
PARTITIONED BY (date)
FRESHNESS = INTERVAL '5' MINUTE
AS
SELECT date, count(*), sum(amount)
FROM orders
GROUP BY date;
DISTRIBUTED BY HASH(date) INTO 32 BUCKETS:
- HASH(date): bucket key —
date. Records с same date -> same bucket. Это даёт co-locality для aggregations. - INTO 32 BUCKETS: fixed bucket count.
Это direct extension Paimon bucketing на materialized tables. Effect:
- Writer parallelism = 32 (один на bucket).
- Reads with
WHERE date = ...query efficient (only relevant buckets). - Group-by на bucket-key happens in-bucket (no cross-bucket shuffle).
Без DISTRIBUTED BY — bucketing default (1 bucket on no PK, или PK-based). Explicit DISTRIBUTED BY lets you tune.
Alternative — DISTRIBUTED BY RANGE (для time-series):
CREATE MATERIALIZED TABLE order_summary
DISTRIBUTED BY RANGE(date) INTO 32 BUCKETS
...
Range distribute дата по time-buckets — useful для temporal queries (range scans), но trade-off: hot buckets для recent data.
Real example: dashboard backing
Сценарий: real-time business dashboard для company. Need:
- Order volume / revenue last 24 hours (refresh 1 min).
- Order trends last 30 days (refresh 1 hour).
- Daily summary last 90 days (refresh daily).
-- Hot, real-time
CREATE MATERIALIZED TABLE dashboard_realtime
DISTRIBUTED BY HASH(category) INTO 16 BUCKETS
FRESHNESS = INTERVAL '1' MINUTE
AS
SELECT category, count(*) AS orders, sum(amount) AS revenue
FROM orders
WHERE event_time > NOW() - INTERVAL '24' HOUR
GROUP BY category;
-- Flink: CONTINUOUS streaming job
-- Mid, hourly batch
CREATE MATERIALIZED TABLE dashboard_trends_30d
DISTRIBUTED BY HASH(date) INTO 32 BUCKETS
PARTITIONED BY (date)
FRESHNESS = INTERVAL '1' HOUR
AS
SELECT date, category, count(*), sum(amount)
FROM orders
WHERE event_time > NOW() - INTERVAL '30' DAY
GROUP BY date, category;
-- Flink: FULL incremental refresh каждый час
-- Cold, daily batch
CREATE MATERIALIZED TABLE dashboard_summary_90d
DISTRIBUTED BY HASH(date) INTO 8 BUCKETS
PARTITIONED BY (date)
FRESHNESS = INTERVAL '1' DAY
AS
SELECT date, count(*), sum(amount), avg(amount)
FROM orders
WHERE event_time > NOW() - INTERVAL '90' DAY
GROUP BY date;
-- Flink: FULL refresh, nightly
Dashboard backend reads from all three для different time horizons. От real-time dashboard panel reads dashboard_realtime, history charts — dashboard_trends_30d, monthly summary — dashboard_summary_90d. Каждый optimized по cost (continuous streaming dorogо, batch чешевле).
Limitations и caveats
Materialized tables ещё young (Flink 2.0/2.1/2.2 evolving). Caveats:
1. Streaming refresh requires streaming-compatible query.
Не все SQL constructs работают в streaming. Window functions, certain types of joins, sub-queries — могут force FULL. Если query incompatible с streaming, Flink falls back на FULL или errors.
2. Storage = lakehouse compatibility.
Только Paimon (mature), other storages experimental. Если ты используешь Iceberg/Hudi, materialized tables могут не работать seamlessly.
3. Catalog integration.
Materialized tables — это catalog object. Должен быть catalog который supports их (Paimon catalog full support, others — partial).
4. Cost monitoring.
CONTINUOUS jobs are long-running — easy to forget about. SHOW MATERIALIZED TABLES регулярно — посмотри что running, нужно ли всё это.
-- Audit: какие continuous MVs running
SELECT
table_name,
refresh_mode,
job_id,
job_running_time
FROM information_schema.materialized_tables
WHERE refresh_mode = 'CONTINUOUS';
5. Schema changes complicated.
ALTER MATERIALIZED TABLE для change query — full refresh из scratch. Heavy operation, может занять hours.
Materialized tables — это “managed Flink job” в storage abstraction. Если ты создаёшь 50 materialized tables с CONTINUOUS refresh, ты создаёшь 50 running Flink jobs. JM resources, TM slots, RocksDB state — всё накапливается. Один dashboard может easy give birth к 20-30 MVs. Plan for this, не assume что MV = “free SQL view”. Это full pipeline под капотом.
Comparison: MV vs обычная streaming job
Manual approach без MV:
// Java Flink job, manually authored
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(paimonSource("orders"), watermarkStrategy, "orders-source")
.keyBy(row -> row.getDate())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new OrderAggregator())
.sinkTo(paimonSink("order_summary"));
env.execute("order-summary-job");
Equivalent MV:
CREATE MATERIALIZED TABLE order_summary
FRESHNESS = INTERVAL '1' MINUTE
AS
SELECT date, count(*), sum(amount)
FROM orders
WHERE event_time >= TUMBLE_START(event_time, INTERVAL '1' MINUTE)
GROUP BY date, TUMBLE(event_time, INTERVAL '1' MINUTE);
Trade-offs:
- MV: declarative, simpler, managed by Flink scheduler.
- Manual: more control, can use any Java/Python features, separate lifecycle.
Для simple aggregations — MV. Для complex transformations с custom logic — manual streaming job.