Prerequisites:
- module-5/03-pyflink-cdc-connector
Stateful Stream Processing в PyFlink
Вы научились читать CDC события из Kafka в PyFlink. Но пока мы только читали поток — никаких агрегаций, никаких joins. Настоящая сила Flink раскрывается в stateful processing — способности хранить состояние между событиями и выполнять сложные операции: windowed aggregations, temporal joins, sessionization.
В этом уроке мы изучим, как PyFlink управляет состоянием, как настроить watermarks для late data, как использовать tumbling/sliding/session windows для агрегаций, и как выполнять temporal joins между CDC streams.
Зачем нужно состояние в потоковой обработке?
Streaming vs Batch: Фундаментальная разница
В batch обработке (Pandas, Spark batch) у вас есть завершённый dataset:
# Batch: у вас ВСЕ данные сразу
df = pd.read_csv('orders.csv')
result = df.groupby('customer_id').agg({'total': 'sum'})
В streaming обработке данные непрерывно прибывают:
# Streaming: данные прибывают бесконечно
# Когда остановить агрегацию? Как понять, что "все данные прибыли"?
stream.group_by('customer_id').sum('total') # ???
Проблема: Как агрегировать бесконечный поток? Как понять, когда group “завершилась”?
Решение: State (состояние) + Windows (временные окна) + Watermarks (маркеры прогресса).
Что нужно состояние для?
Операции требующие состояние
(COUNT, SUM, AVG)
Как Flink управляет состоянием
в памяти или RocksDB
Примеры:
-
Агрегации: “Сколько заказов на каждого customer за последние 5 минут?”
- State: Счётчик для каждого customer_id
-
Joins: “Обогатить заказ информацией о продукте”
- State: Версионированная таблица products (temporal join)
-
Deduplication: “Отбросить повторные события с одинаковым ID”
- State: Set из уже обработанных ID
-
Sessionization: “Группировать клики пользователя в сессии (gap = 30 минут)”
- State: Timestamp последнего события для каждого user
Production истина: Stateful processing — это суть Flink. Без state можно было бы использовать stateless map/filter. State позволяет Flink выполнять сложные аналитические операции в реальном времени.
Watermarks и обработка поздних данных
Проблема: Out-of-Order Events
В distributed systems события не приходят в порядке event time:
Event time порядок ≠ Processing time порядок
Processing time порядок: E2 → E3 → E1 (порядок прибытия в Flink)
Event time порядок: E1 → E2 → E3 (порядок генерации событий)
Причины: network latency в distributed Kafka partitions, replication lag, connector restarts, backpressure.
Почему это происходит:
- Network latency в distributed Kafka partitions
- Replication lag между PostgreSQL primary и standby
- Connector restarts после failures
- Backpressure в downstream processing
Проблема для window aggregations:
-- Window 10:00-10:05, текущее время 10:06
-- Flink уже закрыл окно и выдал результат: COUNT=100
-- Но потом прибыл late event с event_time=10:03!
-- Что делать? Пересчитывать окно? Отбросить событие?
Что такое Watermark?
Watermark — это маркер, который говорит Flink: “Все события с event_time < W уже прибыли (скорее всего)”.
Watermark отслеживает прогресс event time
Watermark — это маркер, который говорит Flink: "Все события с event_time < W уже прибыли (скорее всего)". Конфигурация: WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
Как работает:
- Flink отслеживает max event_time среди прибывших событий
- Watermark = max_event_time - allowed_lateness
- Когда watermark проходит window end time → окно закрывается и выдаёт результат
Конфигурация watermark в PyFlink:
CREATE TABLE orders (
id INT,
customer_id INT,
total DECIMAL(10,2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (...)
Что это означает:
event_time - INTERVAL '5' SECONDS— допускаем задержку до 5 секунд- Если max event_time = 10:00:30, то watermark = 10:00:25
- Все окна с end_time ≤ 10:00:25 будут закрыты
Trade-off: Lateness vs Completeness
Маленький allowed lateness (1 секунда):
- ✅ Низкая latency (окна закрываются быстро)
- ❌ Больше late events пропускается (data loss)
Большой allowed lateness (1 минута):
- ✅ Меньше late events (больше полноты данных)
- ❌ Высокая latency (ждём 1 минуту перед закрытием окна)
Production рекомендация: Для CDC pipeline используйте 5-10 секунд allowed lateness. Этого достаточно для network jitter, но не слишком много для latency.
Проверка знанийWatermark = max_event_time - INTERVAL 5 SECONDS. Текущий max event_time = 10:00:30. Событие с event_time = 10:00:24 прибывает в этот момент. Будет ли оно обработано или отброшено?
Обработка Late Data
Даже с watermark некоторые события прибудут позже allowed lateness. Что с ними делать?
Опция 1: Drop late events (по умолчанию)
-- Late events молча отбрасываются
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
Опция 2: Allowed lateness (дополнительное время)
# В DataStream API (Table API не поддерживает)
stream.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.allowedLateness(Time.minutes(1)) # Ещё 1 минута после watermark
Опция 3: Side outputs (для audit)
# Late events отправляются в отдельный output
late_tag = OutputTag("late-data")
stream.window(...).sideOutputLateData(late_tag)
Примечание: Table API (SQL) поддерживает только drop late events. Для advanced lateness handling используйте DataStream API.
Tumbling Windows — Неперекрывающиеся окна
Tumbling window — это fixed-size, non-overlapping временные окна.
Концепция
Неперекрывающиеся окна фиксированного размера
Window 1
Window 2
Window 3
Window 4
Неперекрывающиеся окна фиксированного размера. Каждое событие попадает ровно в одно окно. Use case: hourly/daily aggregations, periodic metrics.
SQL: TUMBLE(event_time, INTERVAL '5' MINUTES)
Характеристики:
- Размер: Фиксированный (например, 5 минут)
- Перекрытие: Нет (каждое событие в одном окне)
- Use case: Hourly/daily aggregations, periodic metrics
SQL Syntax
table_env.execute_sql("""
CREATE VIEW hourly_revenue AS
SELECT
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,
customer_id,
COUNT(*) AS order_count,
SUM(total) AS total_revenue,
AVG(total) AS avg_order_value
FROM orders_current
GROUP BY
TUMBLE(event_time, INTERVAL '1' HOUR),
customer_id
""")
Функции:
- TUMBLE(event_time, INTERVAL ‘1’ HOUR): Определяет tumbling window размером 1 час
- TUMBLE_START(…): Timestamp начала окна
- TUMBLE_END(…): Timestamp конца окна
GROUP BY: Обязательно группировать по TUMBLE(...) + ваши dimensions (customer_id)
Полный пример
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Kafka source с watermark
table_env.execute_sql("""
CREATE TABLE orders_cdc (
payload ROW<
after ROW<
id INT,
customer_id INT,
total DECIMAL(10,2),
created_at TIMESTAMP(3)
>,
op STRING,
ts_ms BIGINT
>
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pyflink-windows',
'format' = 'json'
)
""")
# VIEW с watermark
table_env.execute_sql("""
CREATE VIEW orders_current AS
SELECT
payload.after.id AS id,
payload.after.customer_id AS customer_id,
payload.after.total AS total,
TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time,
WATERMARK FOR TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS TO_TIMESTAMP_LTZ(payload.ts_ms, 3) - INTERVAL '5' SECONDS
FROM orders_cdc
WHERE payload.op IN ('c', 'u', 'r')
""")
# Tumbling window aggregation
result = table_env.sql_query("""
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTES) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTES) AS window_end,
customer_id,
COUNT(*) AS order_count,
SUM(total) AS total_revenue
FROM orders_current
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTES),
customer_id
""")
result.execute().print()
Output:
+I[2024-02-01 10:00:00.000, 2024-02-01 10:05:00.000, 1, 3, 450.00]
+I[2024-02-01 10:00:00.000, 2024-02-01 10:05:00.000, 2, 2, 300.00]
+I[2024-02-01 10:05:00.000, 2024-02-01 10:10:00.000, 1, 5, 750.00]
Важно: Watermark обязателен для window aggregations. Без watermark окна никогда не закроются.
Sliding Windows — Скользящие окна
Sliding window (также называется Hopping window) — это overlapping окна с slide interval.
Концепция
Перекрывающиеся окна. Событие может попасть в несколько окон.
Window 1
Window 2
Window 3
Перекрывающиеся окна. Событие может попасть в несколько окон. Use case: moving averages, rolling metrics, trend analysis.
SQL: HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)
Warning: Малый slide создаёт много окон → больше state → больше памяти
Характеристики:
- Размер: Window size (например, 10 минут)
- Slide: Как часто создаётся новое окно (например, 5 минут)
- Перекрытие: Да (событие может попасть в несколько окон)
- Use case: Moving averages, rolling metrics, trend analysis
SQL Syntax
SELECT
HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS window_start,
HOP_END(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS window_end,
status,
COUNT(*) AS status_count
FROM orders_current
GROUP BY
HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES),
status
Параметры HOP:
- event_time: Time attribute (с watermark)
- INTERVAL ‘5’ MINUTES: Slide interval (как часто сдвигается окно)
- INTERVAL ‘10’ MINUTES: Window size (размер окна)
Пример: Moving Average
# 10-минутное скользящее окно для moving average order value
result = table_env.sql_query("""
SELECT
HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTES) AS window_start,
AVG(total) AS avg_order_value_10min
FROM orders_current
GROUP BY HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTES)
""")
Что происходит:
- Каждую минуту создаётся новое 10-минутное окно
- Событие в 10:05:30 попадёт в 10 окон: [09:56-10:06], [09:57-10:07], …, [10:05-10:15]
Warning: Sliding windows с малым slide создают много окон → больше state → больше памяти. Мониторьте state size.
Session Windows — Сессионные окна
Session window — это динамические окна, которые закрываются после gap of inactivity.
Концепция
Динамические окна. Закрываются после периода неактивности (gap).
Session 1 (3 события)
(inactivity)
Session 2 (3 события)
Динамические окна. Размер зависит от событий. Закрываются после gap of inactivity. Use case: user session analysis, clickstream analytics, activity tracking.
SQL: SESSION(event_time, INTERVAL '30' MINUTES)
Note: Session windows сложнее для state management, т.к. session границы неизвестны заранее
Характеристики:
- Размер: Динамический (зависит от событий)
- Gap: Inactivity timeout (например, 30 минут)
- Перекрытие: Нет (сессии разделены по gap)
- Use case: User session analysis, clickstream analytics, activity tracking
SQL Syntax
SELECT
SESSION_START(event_time, INTERVAL '30' MINUTES) AS session_start,
SESSION_END(event_time, INTERVAL '30' MINUTES) AS session_end,
customer_id,
COUNT(*) AS events_in_session,
SUM(total) AS session_revenue
FROM orders_current
GROUP BY
SESSION(event_time, INTERVAL '30' MINUTES),
customer_id
Параметры SESSION:
- event_time: Time attribute
- INTERVAL ‘30’ MINUTES: Gap (если нет событий 30 минут → сессия закрывается)
Пример: Customer Session Analysis
# Анализ покупательских сессий
result = table_env.sql_query("""
SELECT
SESSION_START(event_time, INTERVAL '30' MINUTES) AS session_start,
SESSION_END(event_time, INTERVAL '30' MINUTES) AS session_end,
customer_id,
COUNT(*) AS orders_per_session,
SUM(total) AS session_total,
MAX(total) AS max_order_in_session
FROM orders_current
GROUP BY
SESSION(event_time, INTERVAL '30' MINUTES),
customer_id
""")
Use case:
- Пользователь делает заказ в 10:00, 10:10, 10:20 → одна сессия
- Затем gap 30 минут (нет заказов)
- Следующий заказ в 11:00 → новая сессия
Production примечание: Session windows сложнее для state management, т.к. session границы неизвестны заранее. Используйте только когда действительно нужна session semantics.
Temporal Joins — Временные соединения
Temporal join — это join между stream и versioned dimension table, где join выполняется “as of” определённого времени.
Проблема обычного join в streaming
Представьте, что вы хотите обогатить заказы информацией о продукте:
-- Обычный join (НЕ работает правильно в streaming)
SELECT
o.order_id,
o.customer_id,
p.product_name,
p.price
FROM orders o
JOIN products p ON o.product_id = p.product_id
Проблема:
- Продукт обновляется в базе (цена изменилась)
- Заказ был сделан вчера, когда цена была другой
- Обычный join вернёт текущую цену, а не цену на момент заказа
Решение: FOR SYSTEM_TIME AS OF
FOR SYSTEM_TIME AS OF — join "as of" определённого времени
Temporal join использует event time из orders для поиска правильной версии product. Если цена изменилась между заказами — каждый заказ получит цену, актуальную на момент его создания.
Требования: PRIMARY KEY на dimension table, WATERMARK на обеих таблицах
Temporal join использует event time из orders для поиска правильной версии product.
SQL Syntax
# Orders stream (левая сторона join)
table_env.execute_sql("""
CREATE TABLE orders (
order_id INT,
customer_id INT,
product_id INT,
quantity INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Products dimension table (правая сторона join)
table_env.execute_sql("""
CREATE TABLE products (
product_id INT,
product_name STRING,
price DECIMAL(10,2),
category STRING,
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECONDS,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.products',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Temporal join
result = table_env.sql_query("""
SELECT
o.order_id,
o.customer_id,
o.quantity,
p.product_name,
p.price,
p.category,
o.quantity * p.price AS total_amount,
o.order_time
FROM orders AS o
LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id
""")
Ключевые требования
- PRIMARY KEY на dimension table (products): Обязателен для temporal join
- WATERMARK на обеих таблицах: Для event time semantics
- FOR SYSTEM_TIME AS OF: Специальный синтаксис для temporal join
- LEFT JOIN: Обычно используется, чтобы не терять orders если product не найден
Внутренняя реализация
Flink хранит версионированную state для products table:
product_id=101:
[09:00] → {name: "Widget", price: 100.00}
[10:00] → {name: "Widget", price: 120.00}
[11:00] → {name: "Widget Pro", price: 150.00}
Когда приходит order с order_time=09:30, Flink ищет версию product ≤ 09:30 → price=100.00.
Warning: Temporal joins создают большой state (вся история dimension table). Используйте state TTL для cleanup старых версий.
Управление размером состояния
Проблема: State Size Explosion
Stateful operations создают managed state в памяти:
- Window aggregations: State для каждого ключа в каждом окне
- Temporal joins: Версионированная история dimension tables
- Deduplication: Set всех обработанных IDs
Проблема: State растёт unbounded → OutOfMemoryError → job failure.
State растёт unbounded без TTL конфигурации
- State TTL: Автоматический cleanup старого state (например, TTL = 7 дней)
- RocksDB State Backend: On-disk state вместо in-memory (терабайты state)
- Session windows: Вместо global aggregations (state cleanup после session end)
- Мониторинг: Alert на state size > 80% heap memory
Решение 1: State TTL (Time-To-Live)
Настройте automatic cleanup для старого state:
# В Table API TTL конфигурируется через table hints (Flink 1.13+)
table_env.execute_sql("""
CREATE TABLE orders (
customer_id INT,
total DECIMAL(10,2),
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092'
)
""")
# Для DataStream API используйте StateTtlConfig
from pyflink.datastream.state import StateTtlConfig
from pyflink.common.time import Time
ttl_config = StateTtlConfig \
.new_builder(Time.days(7)) \
.set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
.build()
Параметры:
- TTL duration: Как долго хранить state (например, 7 дней)
- Update type: Когда обновлять TTL (при записи или при чтении)
- State visibility: Возвращать ли expired state до cleanup
Решение 2: RocksDB State Backend
По умолчанию Flink хранит state in-memory. Для большого state используйте RocksDB (on-disk):
# Конфигурация RocksDB state backend
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state_backend import RocksDBStateBackend
env = StreamExecutionEnvironment.get_execution_environment()
env.set_state_backend(RocksDBStateBackend("file:///tmp/flink-checkpoints", True))
Плюсы:
- ✅ Поддерживает state размером в терабайты (ограничение = disk size)
- ✅ Checkpoints быстрее (incremental checkpoints)
Минусы:
- ❌ Медленнее доступ (disk I/O вместо memory)
- ❌ Больше CPU для serialization/deserialization
Решение 3: Мониторинг State Size
Настройте metrics для отслеживания state size:
# Flink предоставляет метрики state size
# Доступны через Flink Web UI или Prometheus
Метрики для мониторинга:
taskmanager_job_task_operator_state_size_bytestaskmanager_job_task_operator_rocksdb_estimate_num_keysflink_taskmanager_Status_JVM_Memory_Heap_Used
Alert thresholds:
- Warning: State size > 80% heap memory
- Critical: State size > 95% heap memory
Решение 4: Design Patterns
Паттерн 1: Use session windows вместо global aggregations
-- Плохо: global aggregation (unbounded state)
SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id
-- Хорошо: session window (state cleanup after session end)
SELECT
customer_id,
COUNT(*) AS orders_in_session
FROM orders
GROUP BY SESSION(event_time, INTERVAL '30' MINUTES), customer_id
Паттерн 2: Filter before stateful operations
-- Сначала отфильтровать (уменьшить объём для state)
SELECT ... FROM (
SELECT * FROM orders WHERE total > 100
)
GROUP BY ...
Production рекомендация: Всегда конфигурируйте state TTL для production jobs. Мониторьте state size metrics. Используйте RocksDB для state > 10 GB.
Проверка знанийПочему global aggregation (SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id) в streaming mode приводит к unbounded state growth? Как session windows решают эту проблему?
Лабораторная работа: Real-Time Dashboard
Давайте создадим real-time dashboard с tumbling window aggregations и sliding window moving averages.
Задание
-
Откройте JupyterLab и создайте новый notebook
module5/pyflink_stateful_processing.ipynb -
Создайте Kafka source для
dbserver1.public.orders(как в предыдущем уроке) -
Создайте VIEW с watermark:
table_env.execute_sql("""
CREATE VIEW orders_with_watermark AS
SELECT
payload.after.id AS id,
payload.after.customer_id AS customer_id,
payload.after.total AS total,
payload.after.status AS status,
TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time
FROM orders_cdc
WHERE payload.op IN ('c', 'u', 'r')
WATERMARK FOR TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS TO_TIMESTAMP_LTZ(payload.ts_ms, 3) - INTERVAL '5' SECONDS
""")
- Tumbling window: Orders per customer (5 minutes)
tumbling_result = table_env.sql_query("""
SELECT
TUMBLE_START(event_time, INTERVAL '5' MINUTES) AS window_start,
customer_id,
COUNT(*) AS order_count,
SUM(total) AS total_revenue
FROM orders_with_watermark
GROUP BY
TUMBLE(event_time, INTERVAL '5' MINUTES),
customer_id
""")
tumbling_result.execute().print()
- Sliding window: Moving average order value (10-minute window, 2-minute slide)
sliding_result = table_env.sql_query("""
SELECT
HOP_START(event_time, INTERVAL '2' MINUTES, INTERVAL '10' MINUTES) AS window_start,
AVG(total) AS avg_order_value
FROM orders_with_watermark
GROUP BY HOP(event_time, INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
""")
sliding_result.execute().print()
- В отдельном терминале вставьте несколько заказов с разными timestamp:
INSERT INTO orders (customer_id, product_id, quantity, total, status, created_at)
VALUES
(1, 101, 2, 150.00, 'pending', NOW()),
(1, 102, 1, 200.00, 'pending', NOW() + INTERVAL '1 minute'),
(2, 103, 3, 450.00, 'shipped', NOW() + INTERVAL '2 minutes'),
(1, 104, 1, 100.00, 'pending', NOW() + INTERVAL '6 minutes');
- Наблюдайте результаты в PyFlink output
Ожидаемый результат
Tumbling window output:
+I[2024-02-01 10:00:00.000, 1, 3, 450.00] -- Customer 1, окно 10:00-10:05
+I[2024-02-01 10:00:00.000, 2, 1, 450.00] -- Customer 2, окно 10:00-10:05
+I[2024-02-01 10:05:00.000, 1, 1, 100.00] -- Customer 1, окно 10:05-10:10
Sliding window output:
+I[2024-02-01 10:00:00.000, 266.67] -- Avg for 10:00-10:10 window
+I[2024-02-01 10:02:00.000, 250.00] -- Avg for 10:02-10:12 window (overlap)
Bonus: Temporal Join
Если успеете, добавьте temporal join с products table:
# Создайте products source и выполните temporal join
# (код аналогичен примеру выше)
Ключевые выводы
- Stateful processing — это ядро Flink: агрегации, joins, sessionization требуют managed state
- Watermarks определяют progress event time и когда закрывать окна
- WATERMARK FOR event_time AS event_time - INTERVAL ‘X’ SECONDS — конфигурация allowed lateness
- Tumbling windows — fixed-size, non-overlapping (hourly/daily metrics)
- Sliding windows (HOP) — overlapping windows для moving averages
- Session windows — динамические окна с inactivity gap для session analysis
- Temporal joins используют FOR SYSTEM_TIME AS OF для versioned dimension enrichment
- State size management критичен: конфигурируйте TTL, используйте RocksDB, мониторьте metrics
- PRIMARY KEY обязателен для temporal join на dimension table
- Late events по умолчанию отбрасываются после watermark; для advanced handling используйте DataStream API
Что дальше?
Мы освоили stateful stream processing в PyFlink: windows, watermarks, temporal joins. Но PyFlink — это один из многих движков для stream processing.
В следующем уроке мы переключимся на PySpark Structured Streaming — альтернативу PyFlink для batch + streaming unified processing. Мы сравним PySpark и PyFlink, изучим PySpark CDC patterns, и поймём, когда использовать каждый фреймворк.
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress