Перейти к содержанию
Learning Platform
Продвинутый
50 минут
PyFlink Windows Aggregations Joins Watermarks State

Требуемые знания:

  • module-5/03-pyflink-cdc-connector

Stateful Stream Processing в PyFlink

Вы научились читать CDC события из Kafka в PyFlink. Но пока мы только читали поток — никаких агрегаций, никаких joins. Настоящая сила Flink раскрывается в stateful processing — способности хранить состояние между событиями и выполнять сложные операции: windowed aggregations, temporal joins, sessionization.

В этом уроке мы изучим, как PyFlink управляет состоянием, как настроить watermarks для late data, как использовать tumbling/sliding/session windows для агрегаций, и как выполнять temporal joins между CDC streams.

Зачем нужно состояние в потоковой обработке?

Streaming vs Batch: Фундаментальная разница

В batch обработке (Pandas, Spark batch) у вас есть завершённый dataset:

# Batch: у вас ВСЕ данные сразу
df = pd.read_csv('orders.csv')
result = df.groupby('customer_id').agg({'total': 'sum'})

В streaming обработке данные непрерывно прибывают:

# Streaming: данные прибывают бесконечно
# Когда остановить агрегацию? Как понять, что "все данные прибыли"?
stream.group_by('customer_id').sum('total')  # ???

Проблема: Как агрегировать бесконечный поток? Как понять, когда group “завершилась”?

Решение: State (состояние) + Windows (временные окна) + Watermarks (маркеры прогресса).

Что нужно состояние для?

Stateful Operations

Операции требующие состояние

Aggregations
(COUNT, SUM, AVG)
Joins
Deduplication
Sessionization
Managed State

Как Flink управляет состоянием

State хранится
в памяти или RocksDB
Периодические checkpoints
Восстановление при сбоях

Примеры:

  1. Агрегации: “Сколько заказов на каждого customer за последние 5 минут?”

    • State: Счётчик для каждого customer_id
  2. Joins: “Обогатить заказ информацией о продукте”

    • State: Версионированная таблица products (temporal join)
  3. Deduplication: “Отбросить повторные события с одинаковым ID”

    • State: Set из уже обработанных ID
  4. Sessionization: “Группировать клики пользователя в сессии (gap = 30 минут)”

    • State: Timestamp последнего события для каждого user

Production истина: Stateful processing — это суть Flink. Без state можно было бы использовать stateless map/filter. State позволяет Flink выполнять сложные аналитические операции в реальном времени.

Watermarks и обработка поздних данных

Проблема: Out-of-Order Events

В distributed systems события не приходят в порядке event time:

Out-of-Order Events Problem

Event time порядок ≠ Processing time порядок

PostgreSQL
Kafka Partition 1
Kafka Partition 2
PyFlink
E1 (event_time=10:00)E2 (event_time=10:01)E3 (event_time=10:02)E2 arrives FIRSTE3 arrives secondE1 arrives LATE!
Почему это происходит:

Processing time порядок: E2 → E3 → E1 (порядок прибытия в Flink)
Event time порядок: E1 → E2 → E3 (порядок генерации событий)

Причины: network latency в distributed Kafka partitions, replication lag, connector restarts, backpressure.

Почему это происходит:

  1. Network latency в distributed Kafka partitions
  2. Replication lag между PostgreSQL primary и standby
  3. Connector restarts после failures
  4. Backpressure в downstream processing

Проблема для window aggregations:

-- Window 10:00-10:05, текущее время 10:06
-- Flink уже закрыл окно и выдал результат: COUNT=100

-- Но потом прибыл late event с event_time=10:03!
-- Что делать? Пересчитывать окно? Отбросить событие?

Что такое Watermark?

Watermark — это маркер, который говорит Flink: “Все события с event_time < W уже прибыли (скорее всего)”.

Watermark Progress

Watermark отслеживает прогресс event time

10:00
10:01
10:02← Watermark
10:03
10:04
Закрыть windows с end_time ≤ 10:02
Watermark Definition:

Watermark — это маркер, который говорит Flink: "Все события с event_time < W уже прибыли (скорее всего)". Конфигурация: WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS

Как работает:

  1. Flink отслеживает max event_time среди прибывших событий
  2. Watermark = max_event_time - allowed_lateness
  3. Когда watermark проходит window end time → окно закрывается и выдаёт результат

Конфигурация watermark в PyFlink:

CREATE TABLE orders (
    id INT,
    customer_id INT,
    total DECIMAL(10,2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS
) WITH (...)

Что это означает:

  • event_time - INTERVAL '5' SECONDS — допускаем задержку до 5 секунд
  • Если max event_time = 10:00:30, то watermark = 10:00:25
  • Все окна с end_time ≤ 10:00:25 будут закрыты

Trade-off: Lateness vs Completeness

Маленький allowed lateness (1 секунда):

  • ✅ Низкая latency (окна закрываются быстро)
  • ❌ Больше late events пропускается (data loss)

Большой allowed lateness (1 минута):

  • ✅ Меньше late events (больше полноты данных)
  • ❌ Высокая latency (ждём 1 минуту перед закрытием окна)

Production рекомендация: Для CDC pipeline используйте 5-10 секунд allowed lateness. Этого достаточно для network jitter, но не слишком много для latency.

Проверка знаний
Watermark = max_event_time - INTERVAL 5 SECONDS. Текущий max event_time = 10:00:30. Событие с event_time = 10:00:24 прибывает в этот момент. Будет ли оно обработано или отброшено?
Ответ
Watermark = 10:00:30 - 5 секунд = 10:00:25. Событие с event_time 10:00:24 прибыло позже watermark (10:00:24 < 10:00:25), поэтому оно будет отброшено (late event). Flink считает, что все события с timestamp до 10:00:25 уже прибыли. Чтобы это событие было обработано, нужно увеличить allowed lateness до 7+ секунд (30 - 24 = 6 секунд опоздания). Это иллюстрирует trade-off: больший watermark interval = меньше потерь, но выше latency.

Обработка Late Data

Даже с watermark некоторые события прибудут позже allowed lateness. Что с ними делать?

Опция 1: Drop late events (по умолчанию)

-- Late events молча отбрасываются
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDS

Опция 2: Allowed lateness (дополнительное время)

# В DataStream API (Table API не поддерживает)
stream.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .allowedLateness(Time.minutes(1))  # Ещё 1 минута после watermark

Опция 3: Side outputs (для audit)

# Late events отправляются в отдельный output
late_tag = OutputTag("late-data")
stream.window(...).sideOutputLateData(late_tag)

Примечание: Table API (SQL) поддерживает только drop late events. Для advanced lateness handling используйте DataStream API.

Tumbling Windows — Неперекрывающиеся окна

Tumbling window — это fixed-size, non-overlapping временные окна.

Концепция

Tumbling Windows (размер = 5 минут)

Неперекрывающиеся окна фиксированного размера

10:00-10:05
Window 1
10:05-10:10
Window 2
10:10-10:15
Window 3
10:15-10:20
Window 4
Характеристики:

Неперекрывающиеся окна фиксированного размера. Каждое событие попадает ровно в одно окно. Use case: hourly/daily aggregations, periodic metrics.

SQL: TUMBLE(event_time, INTERVAL '5' MINUTES)

Характеристики:

  • Размер: Фиксированный (например, 5 минут)
  • Перекрытие: Нет (каждое событие в одном окне)
  • Use case: Hourly/daily aggregations, periodic metrics

SQL Syntax

table_env.execute_sql("""
    CREATE VIEW hourly_revenue AS
    SELECT
        TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
        TUMBLE_END(event_time, INTERVAL '1' HOUR) AS window_end,
        customer_id,
        COUNT(*) AS order_count,
        SUM(total) AS total_revenue,
        AVG(total) AS avg_order_value
    FROM orders_current
    GROUP BY
        TUMBLE(event_time, INTERVAL '1' HOUR),
        customer_id
""")

Функции:

  • TUMBLE(event_time, INTERVAL ‘1’ HOUR): Определяет tumbling window размером 1 час
  • TUMBLE_START(…): Timestamp начала окна
  • TUMBLE_END(…): Timestamp конца окна

GROUP BY: Обязательно группировать по TUMBLE(...) + ваши dimensions (customer_id)

Полный пример

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Kafka source с watermark
table_env.execute_sql("""
    CREATE TABLE orders_cdc (
        payload ROW<
            after ROW<
                id INT,
                customer_id INT,
                total DECIMAL(10,2),
                created_at TIMESTAMP(3)
            >,
            op STRING,
            ts_ms BIGINT
        >
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dbserver1.public.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'pyflink-windows',
        'format' = 'json'
    )
""")

# VIEW с watermark
table_env.execute_sql("""
    CREATE VIEW orders_current AS
    SELECT
        payload.after.id AS id,
        payload.after.customer_id AS customer_id,
        payload.after.total AS total,
        TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time,
        WATERMARK FOR TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS TO_TIMESTAMP_LTZ(payload.ts_ms, 3) - INTERVAL '5' SECONDS
    FROM orders_cdc
    WHERE payload.op IN ('c', 'u', 'r')
""")

# Tumbling window aggregation
result = table_env.sql_query("""
    SELECT
        TUMBLE_START(event_time, INTERVAL '5' MINUTES) AS window_start,
        TUMBLE_END(event_time, INTERVAL '5' MINUTES) AS window_end,
        customer_id,
        COUNT(*) AS order_count,
        SUM(total) AS total_revenue
    FROM orders_current
    GROUP BY
        TUMBLE(event_time, INTERVAL '5' MINUTES),
        customer_id
""")

result.execute().print()

Output:

+I[2024-02-01 10:00:00.000, 2024-02-01 10:05:00.000, 1, 3, 450.00]
+I[2024-02-01 10:00:00.000, 2024-02-01 10:05:00.000, 2, 2, 300.00]
+I[2024-02-01 10:05:00.000, 2024-02-01 10:10:00.000, 1, 5, 750.00]

Важно: Watermark обязателен для window aggregations. Без watermark окна никогда не закроются.

Sliding Windows — Скользящие окна

Sliding window (также называется Hopping window) — это overlapping окна с slide interval.

Концепция

Sliding Windows (размер = 10 минут, slide = 5 минут)

Перекрывающиеся окна. Событие может попасть в несколько окон.

10:00-10:10
Window 1
overlap →
10:05-10:15
Window 2
overlap →
10:10-10:20
Window 3
Характеристики:

Перекрывающиеся окна. Событие может попасть в несколько окон. Use case: moving averages, rolling metrics, trend analysis.

SQL: HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES)

Warning: Малый slide создаёт много окон → больше state → больше памяти

Характеристики:

  • Размер: Window size (например, 10 минут)
  • Slide: Как часто создаётся новое окно (например, 5 минут)
  • Перекрытие: Да (событие может попасть в несколько окон)
  • Use case: Moving averages, rolling metrics, trend analysis

SQL Syntax

SELECT
    HOP_START(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS window_start,
    HOP_END(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES) AS window_end,
    status,
    COUNT(*) AS status_count
FROM orders_current
GROUP BY
    HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '10' MINUTES),
    status

Параметры HOP:

  1. event_time: Time attribute (с watermark)
  2. INTERVAL ‘5’ MINUTES: Slide interval (как часто сдвигается окно)
  3. INTERVAL ‘10’ MINUTES: Window size (размер окна)

Пример: Moving Average

# 10-минутное скользящее окно для moving average order value
result = table_env.sql_query("""
    SELECT
        HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTES) AS window_start,
        AVG(total) AS avg_order_value_10min
    FROM orders_current
    GROUP BY HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTES)
""")

Что происходит:

  • Каждую минуту создаётся новое 10-минутное окно
  • Событие в 10:05:30 попадёт в 10 окон: [09:56-10:06], [09:57-10:07], …, [10:05-10:15]

Warning: Sliding windows с малым slide создают много окон → больше state → больше памяти. Мониторьте state size.

Session Windows — Сессионные окна

Session window — это динамические окна, которые закрываются после gap of inactivity.

Концепция

Session Windows (gap = 30 минут)

Динамические окна. Закрываются после периода неактивности (gap).

User 1 — Session 1:
10:00-10:15
Session 1 (3 события)
Gap: 30 min
(inactivity)
10:45-10:55
Session 2 (3 события)
Характеристики:

Динамические окна. Размер зависит от событий. Закрываются после gap of inactivity. Use case: user session analysis, clickstream analytics, activity tracking.

SQL: SESSION(event_time, INTERVAL '30' MINUTES)

Note: Session windows сложнее для state management, т.к. session границы неизвестны заранее

Характеристики:

  • Размер: Динамический (зависит от событий)
  • Gap: Inactivity timeout (например, 30 минут)
  • Перекрытие: Нет (сессии разделены по gap)
  • Use case: User session analysis, clickstream analytics, activity tracking

SQL Syntax

SELECT
    SESSION_START(event_time, INTERVAL '30' MINUTES) AS session_start,
    SESSION_END(event_time, INTERVAL '30' MINUTES) AS session_end,
    customer_id,
    COUNT(*) AS events_in_session,
    SUM(total) AS session_revenue
FROM orders_current
GROUP BY
    SESSION(event_time, INTERVAL '30' MINUTES),
    customer_id

Параметры SESSION:

  1. event_time: Time attribute
  2. INTERVAL ‘30’ MINUTES: Gap (если нет событий 30 минут → сессия закрывается)

Пример: Customer Session Analysis

# Анализ покупательских сессий
result = table_env.sql_query("""
    SELECT
        SESSION_START(event_time, INTERVAL '30' MINUTES) AS session_start,
        SESSION_END(event_time, INTERVAL '30' MINUTES) AS session_end,
        customer_id,
        COUNT(*) AS orders_per_session,
        SUM(total) AS session_total,
        MAX(total) AS max_order_in_session
    FROM orders_current
    GROUP BY
        SESSION(event_time, INTERVAL '30' MINUTES),
        customer_id
""")

Use case:

  • Пользователь делает заказ в 10:00, 10:10, 10:20 → одна сессия
  • Затем gap 30 минут (нет заказов)
  • Следующий заказ в 11:00 → новая сессия

Production примечание: Session windows сложнее для state management, т.к. session границы неизвестны заранее. Используйте только когда действительно нужна session semantics.

Temporal Joins — Временные соединения

Temporal join — это join между stream и versioned dimension table, где join выполняется “as of” определённого времени.

Проблема обычного join в streaming

Представьте, что вы хотите обогатить заказы информацией о продукте:

-- Обычный join (НЕ работает правильно в streaming)
SELECT
    o.order_id,
    o.customer_id,
    p.product_name,
    p.price
FROM orders o
JOIN products p ON o.product_id = p.product_id

Проблема:

  1. Продукт обновляется в базе (цена изменилась)
  2. Заказ был сделан вчера, когда цена была другой
  3. Обычный join вернёт текущую цену, а не цену на момент заказа

Решение: FOR SYSTEM_TIME AS OF

Temporal Join: Versioned Dimension Enrichment

FOR SYSTEM_TIME AS OF — join "as of" определённого времени

Orders Stream
Products Table (Versioned)
Temporal Join
Version 1: product_id=101, price=100.00 (09:00)Order at 09:30: product_id=101Lookup AS OF 09:30Returns price=100.00 (version 1)Version 2: price=120.00 (10:00)Order at 10:30: product_id=101Lookup AS OF 10:30Returns price=120.00 (version 2)
Ключевая идея:

Temporal join использует event time из orders для поиска правильной версии product. Если цена изменилась между заказами — каждый заказ получит цену, актуальную на момент его создания.

Требования: PRIMARY KEY на dimension table, WATERMARK на обеих таблицах

Temporal join использует event time из orders для поиска правильной версии product.

SQL Syntax

# Orders stream (левая сторона join)
table_env.execute_sql("""
    CREATE TABLE orders (
        order_id INT,
        customer_id INT,
        product_id INT,
        quantity INT,
        order_time TIMESTAMP(3),
        WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dbserver1.public.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""")

# Products dimension table (правая сторона join)
table_env.execute_sql("""
    CREATE TABLE products (
        product_id INT,
        product_name STRING,
        price DECIMAL(10,2),
        category STRING,
        update_time TIMESTAMP(3),
        WATERMARK FOR update_time AS update_time - INTERVAL '5' SECONDS,
        PRIMARY KEY (product_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dbserver1.public.products',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
""")

# Temporal join
result = table_env.sql_query("""
    SELECT
        o.order_id,
        o.customer_id,
        o.quantity,
        p.product_name,
        p.price,
        p.category,
        o.quantity * p.price AS total_amount,
        o.order_time
    FROM orders AS o
    LEFT JOIN products FOR SYSTEM_TIME AS OF o.order_time AS p
    ON o.product_id = p.product_id
""")

Ключевые требования

  1. PRIMARY KEY на dimension table (products): Обязателен для temporal join
  2. WATERMARK на обеих таблицах: Для event time semantics
  3. FOR SYSTEM_TIME AS OF: Специальный синтаксис для temporal join
  4. LEFT JOIN: Обычно используется, чтобы не терять orders если product не найден

Внутренняя реализация

Flink хранит версионированную state для products table:

product_id=101:
  [09:00] → {name: "Widget", price: 100.00}
  [10:00] → {name: "Widget", price: 120.00}
  [11:00] → {name: "Widget Pro", price: 150.00}

Когда приходит order с order_time=09:30, Flink ищет версию product ≤ 09:30 → price=100.00.

Warning: Temporal joins создают большой state (вся история dimension table). Используйте state TTL для cleanup старых версий.

Управление размером состояния

Проблема: State Size Explosion

Stateful operations создают managed state в памяти:

  1. Window aggregations: State для каждого ключа в каждом окне
  2. Temporal joins: Версионированная история dimension tables
  3. Deduplication: Set всех обработанных IDs

Проблема: State растёт unbounded → OutOfMemoryError → job failure.

State Size Explosion — Проблема Unbounded State

State растёт unbounded без TTL конфигурации

Day 1: 1 GB state
Day 7: 10 GB state
Day 30: 50 GB state
Day 90: OOM crash! 💥
Решения:
  • State TTL: Автоматический cleanup старого state (например, TTL = 7 дней)
  • RocksDB State Backend: On-disk state вместо in-memory (терабайты state)
  • Session windows: Вместо global aggregations (state cleanup после session end)
  • Мониторинг: Alert на state size > 80% heap memory

Решение 1: State TTL (Time-To-Live)

Настройте automatic cleanup для старого state:

# В Table API TTL конфигурируется через table hints (Flink 1.13+)
table_env.execute_sql("""
    CREATE TABLE orders (
        customer_id INT,
        total DECIMAL(10,2),
        event_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders',
        'properties.bootstrap.servers' = 'kafka:9092'
    )
""")

# Для DataStream API используйте StateTtlConfig
from pyflink.datastream.state import StateTtlConfig
from pyflink.common.time import Time

ttl_config = StateTtlConfig \
    .new_builder(Time.days(7)) \
    .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
    .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) \
    .build()

Параметры:

  • TTL duration: Как долго хранить state (например, 7 дней)
  • Update type: Когда обновлять TTL (при записи или при чтении)
  • State visibility: Возвращать ли expired state до cleanup

Решение 2: RocksDB State Backend

По умолчанию Flink хранит state in-memory. Для большого state используйте RocksDB (on-disk):

# Конфигурация RocksDB state backend
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.state_backend import RocksDBStateBackend

env = StreamExecutionEnvironment.get_execution_environment()
env.set_state_backend(RocksDBStateBackend("file:///tmp/flink-checkpoints", True))

Плюсы:

  • ✅ Поддерживает state размером в терабайты (ограничение = disk size)
  • ✅ Checkpoints быстрее (incremental checkpoints)

Минусы:

  • ❌ Медленнее доступ (disk I/O вместо memory)
  • ❌ Больше CPU для serialization/deserialization

Решение 3: Мониторинг State Size

Настройте metrics для отслеживания state size:

# Flink предоставляет метрики state size
# Доступны через Flink Web UI или Prometheus

Метрики для мониторинга:

  • taskmanager_job_task_operator_state_size_bytes
  • taskmanager_job_task_operator_rocksdb_estimate_num_keys
  • flink_taskmanager_Status_JVM_Memory_Heap_Used

Alert thresholds:

  • Warning: State size > 80% heap memory
  • Critical: State size > 95% heap memory

Решение 4: Design Patterns

Паттерн 1: Use session windows вместо global aggregations

-- Плохо: global aggregation (unbounded state)
SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id

-- Хорошо: session window (state cleanup after session end)
SELECT
    customer_id,
    COUNT(*) AS orders_in_session
FROM orders
GROUP BY SESSION(event_time, INTERVAL '30' MINUTES), customer_id

Паттерн 2: Filter before stateful operations

-- Сначала отфильтровать (уменьшить объём для state)
SELECT ... FROM (
    SELECT * FROM orders WHERE total > 100
)
GROUP BY ...

Production рекомендация: Всегда конфигурируйте state TTL для production jobs. Мониторьте state size metrics. Используйте RocksDB для state > 10 GB.

Проверка знаний
Почему global aggregation (SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id) в streaming mode приводит к unbounded state growth? Как session windows решают эту проблему?
Ответ
Global aggregation хранит state для каждого уникального customer_id навсегда: новые customer добавляются, но старые никогда не удаляются. Через месяцы работы state содержит миллионы ключей, что приводит к OutOfMemoryError. Session windows решают проблему: после gap (например, 30 минут без событий для customer) сессия закрывается, и Flink может очистить state для этого ключа. State становится bounded: хранятся только активные сессии. Альтернатива: state TTL автоматически удаляет записи старше заданного времени.

Лабораторная работа: Real-Time Dashboard

Давайте создадим real-time dashboard с tumbling window aggregations и sliding window moving averages.

Задание

  1. Откройте JupyterLab и создайте новый notebook module5/pyflink_stateful_processing.ipynb

  2. Создайте Kafka source для dbserver1.public.orders (как в предыдущем уроке)

  3. Создайте VIEW с watermark:

table_env.execute_sql("""
    CREATE VIEW orders_with_watermark AS
    SELECT
        payload.after.id AS id,
        payload.after.customer_id AS customer_id,
        payload.after.total AS total,
        payload.after.status AS status,
        TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time
    FROM orders_cdc
    WHERE payload.op IN ('c', 'u', 'r')
    WATERMARK FOR TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS TO_TIMESTAMP_LTZ(payload.ts_ms, 3) - INTERVAL '5' SECONDS
""")
  1. Tumbling window: Orders per customer (5 minutes)
tumbling_result = table_env.sql_query("""
    SELECT
        TUMBLE_START(event_time, INTERVAL '5' MINUTES) AS window_start,
        customer_id,
        COUNT(*) AS order_count,
        SUM(total) AS total_revenue
    FROM orders_with_watermark
    GROUP BY
        TUMBLE(event_time, INTERVAL '5' MINUTES),
        customer_id
""")

tumbling_result.execute().print()
  1. Sliding window: Moving average order value (10-minute window, 2-minute slide)
sliding_result = table_env.sql_query("""
    SELECT
        HOP_START(event_time, INTERVAL '2' MINUTES, INTERVAL '10' MINUTES) AS window_start,
        AVG(total) AS avg_order_value
    FROM orders_with_watermark
    GROUP BY HOP(event_time, INTERVAL '2' MINUTES, INTERVAL '10' MINUTES)
""")

sliding_result.execute().print()
  1. В отдельном терминале вставьте несколько заказов с разными timestamp:
INSERT INTO orders (customer_id, product_id, quantity, total, status, created_at)
VALUES
  (1, 101, 2, 150.00, 'pending', NOW()),
  (1, 102, 1, 200.00, 'pending', NOW() + INTERVAL '1 minute'),
  (2, 103, 3, 450.00, 'shipped', NOW() + INTERVAL '2 minutes'),
  (1, 104, 1, 100.00, 'pending', NOW() + INTERVAL '6 minutes');
  1. Наблюдайте результаты в PyFlink output

Ожидаемый результат

Tumbling window output:

+I[2024-02-01 10:00:00.000, 1, 3, 450.00]  -- Customer 1, окно 10:00-10:05
+I[2024-02-01 10:00:00.000, 2, 1, 450.00]  -- Customer 2, окно 10:00-10:05
+I[2024-02-01 10:05:00.000, 1, 1, 100.00]  -- Customer 1, окно 10:05-10:10

Sliding window output:

+I[2024-02-01 10:00:00.000, 266.67]  -- Avg for 10:00-10:10 window
+I[2024-02-01 10:02:00.000, 250.00]  -- Avg for 10:02-10:12 window (overlap)

Bonus: Temporal Join

Если успеете, добавьте temporal join с products table:

# Создайте products source и выполните temporal join
# (код аналогичен примеру выше)

Ключевые выводы

  1. Stateful processing — это ядро Flink: агрегации, joins, sessionization требуют managed state
  2. Watermarks определяют progress event time и когда закрывать окна
  3. WATERMARK FOR event_time AS event_time - INTERVAL ‘X’ SECONDS — конфигурация allowed lateness
  4. Tumbling windows — fixed-size, non-overlapping (hourly/daily metrics)
  5. Sliding windows (HOP) — overlapping windows для moving averages
  6. Session windows — динамические окна с inactivity gap для session analysis
  7. Temporal joins используют FOR SYSTEM_TIME AS OF для versioned dimension enrichment
  8. State size management критичен: конфигурируйте TTL, используйте RocksDB, мониторьте metrics
  9. PRIMARY KEY обязателен для temporal join на dimension table
  10. Late events по умолчанию отбрасываются после watermark; для advanced handling используйте DataStream API

Что дальше?

Мы освоили stateful stream processing в PyFlink: windows, watermarks, temporal joins. Но PyFlink — это один из многих движков для stream processing.

В следующем уроке мы переключимся на PySpark Structured Streaming — альтернативу PyFlink для batch + streaming unified processing. Мы сравним PySpark и PyFlink, изучим PySpark CDC patterns, и поймём, когда использовать каждый фреймворк.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какую роль выполняет watermark в PyFlink при обработке CDC событий?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс