Перейти к содержанию
Learning Platform
Продвинутый
40 минут
PyFlink Flink Stream Processing Kafka Connector Table API

Требуемые знания:

  • module-5/02-pandas-integration

PyFlink CDC Connector — настройка и использование

Pandas отлично подходит для batch-анализа CDC событий — вы загрузили данные, построили DataFrame, выполнили агрегации. Но что если вам нужна непрерывная обработка потока событий? Агрегации в реальном времени? Сложные join’ы между таблицами? Apache Flink — это распределённый движок для stateful stream processing, а PyFlink — его Python API.

В этом уроке мы настроим PyFlink Table API для чтения CDC событий из Kafka, научимся определять схему Debezium envelope, и разберём, как извлекать текущее состояние записей для downstream обработки.

Apache Flink — это framework для распределённой потоковой обработки данных. PyFlink — официальный Python API, позволяющий писать Flink jobs на Python вместо Java/Scala.

Pandas (Batch)

Batch-обработка: накопить → обработать → записать

1. Накопить события
2. Загрузить в DataFrame
3. Обработать batch
4. Записать результат
Цикл повторяется
Latency: минуты/часы
PyFlink (Streaming)Recommended

Непрерывная обработка потока событий

Непрерывный поток
Event-by-event обработка
Stateful агрегации
Непрерывный вывод
Поток продолжается
Latency: миллисекунды/секунды

Ключевые отличия:

ХарактеристикаPandasPyFlink
Модель обработкиBatch (загрузили → обработали → записали)Streaming (непрерывная обработка)
МасштабированиеSingle-machine (ограничено памятью)Distributed (кластер из N машин)
LatencyМинуты/часы (batch interval)Миллисекунды/секунды
СостояниеНет встроенного (DataFrame ephemeral)Stateful (управляемое состояние)
Когда использоватьExploratory analysis, периодические отчётыReal-time dashboards, continuous ETL

Пример сценариев:

  • Pandas: Каждый час загружаем CDC события за последний час, считаем агрегаты, пишем в DWH
  • PyFlink: Считаем 5-минутные tumbling window агрегации в реальном времени, пишем в Kafka topic для dashboard’а

Production истина: Pandas для batch ETL, PyFlink для real-time processing. Если нужна latency менее 1 минуты — PyFlink. Если достаточно hourly batches — Pandas проще.

Table API vs DataStream API

PyFlink предлагает два API для работы с потоками:

1. DataStream API — низкоуровневый API для произвольной обработки потоков:

# DataStream API (более гибкий, но сложнее)
stream = env.add_source(FlinkKafkaConsumer(...))
stream.map(lambda x: transform(x)).add_sink(...)

2. Table API — SQL-подобный API для структурированных данных:

# Table API (SQL DDL для источников)
table_env.execute_sql("CREATE TABLE orders (...) WITH ('connector' = 'kafka', ...)")
result = table_env.sql_query("SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id")

Для CDC мы используем Table API, потому что:

  1. Структурированные данные: CDC события имеют чёткую схему (envelope + payload)
  2. SQL знаком: Table API использует SQL DDL и DML — проще для data engineers
  3. Оптимизация: Flink лучше оптимизирует SQL-запросы, чем произвольный DataStream код
  4. Поддержка CDC: Kafka connector в Table API понимает Debezium envelope

Рекомендация: Для CDC pipeline используйте Table API. DataStream API только если нужна низкоуровневая обработка (custom state management, side outputs).

Проверка знаний
Почему для обработки CDC событий в PyFlink рекомендуется Table API, а не DataStream API? Какое ключевое преимущество даёт SQL DDL?
Ответ
Table API использует SQL DDL для описания структуры CDC событий, и Kafka connector автоматически парсит Debezium envelope (before, after, op, ts_ms, source). С DataStream API вам пришлось бы вручную десериализовать JSON и маппить вложенные структуры. Кроме того, Flink оптимизирует SQL-запросы (предикатный pushdown, projection pruning), чего нет для произвольного DataStream кода. Table API -- стандарт для CDC processing в Flink.

Настройка окружения

PyFlink требует следующие зависимости:

Python версия:

  • Python 3.9-3.12 (PyFlink 2.2.0 не поддерживает 3.13+)
  • Для этого курса используйте Python 3.11 или 3.12

Java Runtime:

  • PyFlink работает через JVM — требуется Java 11+
  • Проверьте: java -version (должен быть 11 или выше)

Установка PyFlink:

# В JupyterLab терминале
pip install apache-flink==2.2.0

Проверка установки:

from pyflink.table import EnvironmentSettings
print("PyFlink установлен успешно!")

Важно: В нашем lab окружении JupyterLab уже имеет все зависимости. Если настраиваете локально — убедитесь, что Java установлена.

Создание TableEnvironment

TableEnvironment — это entry point для PyFlink Table API. Он управляет контекстом выполнения, регистрацией таблиц, и выполнением запросов.

Streaming Mode

Для CDC обработки нужен streaming mode (не batch):

from pyflink.table import EnvironmentSettings, TableEnvironment

# Создаём settings для streaming
env_settings = EnvironmentSettings.in_streaming_mode()

# Создаём TableEnvironment
table_env = TableEnvironment.create(env_settings)

print("TableEnvironment создан в streaming mode")

Что происходит:

  1. in_streaming_mode() — конфигурирует unbounded stream processing (не batch)
  2. create() — инициализирует JVM, создаёт Flink execution environment

Примечание: Можно также использовать in_batch_mode() для batch processing, но для CDC мы всегда используем streaming.

Определение Kafka Source Table

Теперь определим таблицу-источник для чтения CDC событий из Kafka.

SQL DDL для Kafka Connector

PyFlink Table API использует SQL DDL для регистрации источников данных:

# Определяем таблицу для CDC событий из Kafka
table_env.execute_sql("""
    CREATE TABLE orders_cdc (
        payload ROW<
            before ROW<
                id INT,
                customer_id INT,
                product_id INT,
                quantity INT,
                total DECIMAL(10,2),
                status STRING,
                created_at TIMESTAMP(3)
            >,
            after ROW<
                id INT,
                customer_id INT,
                product_id INT,
                quantity INT,
                total DECIMAL(10,2),
                status STRING,
                created_at TIMESTAMP(3)
            >,
            op STRING,
            ts_ms BIGINT,
            source ROW<
                db STRING,
                `table` STRING,
                connector STRING
            >
        >
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'dbserver1.public.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'pyflink-cdc-consumer',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json',
        'json.fail-on-missing-field' = 'false',
        'json.ignore-parse-errors' = 'true'
    )
""")

Разбор конфигурации

Схема (payload ROW):

  • ROW<...> — композитный тип для nested JSON structures
  • before/after: Вложенные ROW с полями таблицы
  • op: Тип операции (STRING: ‘c’, ‘u’, ‘d’, ‘r’)
  • ts_ms: Timestamp Debezium события (BIGINT: milliseconds)
  • source: Метаданные источника (db, table, connector)

Connector опции:

ОпцияЗначениеНазначение
connectorkafkaИспользовать Kafka connector
topicdbserver1.public.ordersИмя топика с CDC событиями
properties.bootstrap.serverskafka:9092Kafka brokers (Docker network hostname)
properties.group.idpyflink-cdc-consumerConsumer group ID
scan.startup.modeearliest-offsetЧитать с начала топика
formatjsonФормат сообщений (JSON)
json.fail-on-missing-fieldfalseНе падать на отсутствующие поля
json.ignore-parse-errorstrueПропускать malformed JSON

Важно: kafka:9092 — это hostname внутри Docker network. Если PyFlink запущен на host machine — используйте localhost:9092.

Backticks для зарезервированных слов

Обратите внимание на backticks вокруг table:

source ROW<
    db STRING,
    `table` STRING,  -- backticks, т.к. 'table' зарезервированное слово
    connector STRING
>

SQL резервирует table как keyword. Без backticks получите syntax error.

Извлечение текущего состояния

Debezium envelope содержит before и after поля. Для большинства downstream обработок нужно текущее состояние записи (т.е. after для create/update, before для delete).

VIEW для извлечения after state

Создадим VIEW, которая извлекает только after поля для inserts/updates:

table_env.execute_sql("""
    CREATE VIEW orders_current AS
    SELECT
        payload.after.id AS id,
        payload.after.customer_id AS customer_id,
        payload.after.product_id AS product_id,
        payload.after.quantity AS quantity,
        payload.after.total AS total,
        payload.after.status AS status,
        payload.after.created_at AS created_at,
        payload.op AS operation,
        TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time
    FROM orders_cdc
    WHERE payload.op IN ('c', 'u', 'r')
""")

Что происходит:

  1. SELECT payload.after.*: Извлекаем поля из after (текущее состояние после операции)
  2. WHERE payload.op IN (‘c’, ‘u’, ‘r’): Фильтруем только CREATE, UPDATE, READ (snapshot) операции
  3. TO_TIMESTAMP_LTZ(payload.ts_ms, 3): Конвертируем milliseconds в TIMESTAMP (3 — millisecond precision)

Примечание: DELETE события (op='d') не включены, т.к. у них after=null. Если нужны deletes — используйте payload.before.

Обработка DELETE событий

Если нужно обрабатывать DELETE, создайте отдельный VIEW:

table_env.execute_sql("""
    CREATE VIEW orders_deleted AS
    SELECT
        payload.before.id AS id,
        payload.before.customer_id AS customer_id,
        'd' AS operation,
        TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time
    FROM orders_cdc
    WHERE payload.op = 'd'
""")

Обработка ошибок десериализации

Production Kafka topics могут содержать malformed messages из-за:

  1. Schema changes в source database
  2. Manual data modifications напрямую в Kafka
  3. Bugs в Debezium connector

Проблема: PyFlink по умолчанию падает на первом JSON parse error → job останавливается.

Решение: Конфигурируем error tolerance в connector options:

'json.fail-on-missing-field' = 'false',  -- Не падать на отсутствующие поля
'json.ignore-parse-errors' = 'true'      -- Пропускать malformed JSON

Trade-off:

  • Плюс: Job продолжает работать при ошибках десериализации
  • ⚠️ Минус: Malformed события молча пропускаются (data loss)

Production рекомендация: Используйте json.ignore-parse-errors=true для resilience, но мониторьте метрики Flink на dropped events. Настройте alerting на аномалии.

Мониторинг parse errors

PyFlink не предоставляет built-in метрики для parse errors, но можно логировать:

# В production настройте Flink logging level
table_env.get_config().get_configuration().set_string(
    "taskmanager.log.level", "WARN"
)

Flink будет писать в logs события, которые не удалось распарсить.

Проверка знаний
Какой trade-off создаёт настройка json.ignore-parse-errors = true в PyFlink Kafka connector? Почему её всё равно рекомендуют для production?
Ответ
json.ignore-parse-errors = true заставляет PyFlink молча пропускать malformed JSON события вместо остановки job. Trade-off: resilience (job не падает при ошибке) vs data completeness (потерянные события). В production рекомендуется, потому что остановка всего streaming job из-за одного malformed сообщения гораздо дороже, чем потеря единичных событий. Критически важно: необходимо настроить мониторинг и alerting на dropped events, чтобы обнаруживать систематические проблемы (schema changes, bugs в коннекторе).

Выполнение запроса

Теперь можем выполнить SQL-запрос и вывести результат:

# Запрос к VIEW
result_table = table_env.sql_query("""
    SELECT
        customer_id,
        COUNT(*) AS order_count,
        SUM(total) AS total_revenue
    FROM orders_current
    GROUP BY customer_id
""")

# Выполнить и вывести результаты
result_table.execute().print()

Вывод:

+I[1, 5, 1250.00]
+I[2, 3, 780.50]
-U[1, 5, 1250.00]
+U[1, 6, 1500.00]
...

Что означают префиксы:

  • +I: Insert (новая строка)
  • -U: Update (удаление старого значения)
  • +U: Update (вставка нового значения)
  • -D: Delete

Примечание: Table API выводит changelog stream, а не просто результаты. Для production используйте sink (Kafka, database, file).

PyFlink CDC Pipeline Architecture

От PostgreSQL до sink через Debezium, Kafka и PyFlink

Source Layer
PostgreSQLorders table
CDC events
CDC Layer
Debezium Connector
Publish
Kafka Topicdbserver1.public.orders
Read stream
Processing Layer
Kafka Connector
VIEW orders_current
SQL Query
Results
Output Layer
Kafka topic / Database / File
Ключевое преимущество PyFlink:

Distributed stream processing с stateful operations. Масштабируется горизонтально (кластер из N TaskManagers). Fault tolerance через checkpoints и state recovery.

Давайте настроим PyFlink для чтения CDC событий из вашего lab окружения.

Задание

  1. Откройте JupyterLab и создайте новый notebook module5/pyflink_cdc_source.ipynb

  2. Создайте TableEnvironment:

from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
  1. Определите Kafka source table для топика dbserver1.public.orders (используйте схему из примера выше)

  2. Создайте VIEW orders_current для извлечения after state

  3. Выполните запрос:

result = table_env.sql_query("""
    SELECT
        status,
        COUNT(*) AS order_count
    FROM orders_current
    GROUP BY status
""")

result.execute().print()
  1. В отдельном терминале выполните несколько INSERT/UPDATE в PostgreSQL:
INSERT INTO orders (customer_id, product_id, quantity, total, status)
VALUES (1, 101, 2, 150.00, 'pending');

UPDATE orders SET status = 'shipped' WHERE id = 1;
  1. Наблюдайте обновления в PyFlink output (changelog stream)

Ожидаемый результат

Вы должны увидеть в output:

+I[pending, 1]        -- После INSERT
-U[pending, 1]        -- Удаление старого count
+U[pending, 0]
+I[shipped, 1]        -- После UPDATE

Troubleshooting

Проблема: Connection refused к Kafka

Решение: Убедитесь, что используете kafka:9092 (Docker network hostname), не localhost:9092

Проблема: No data appears in output

Решение: Проверьте, что топик содержит события: kafka-console-consumer --topic dbserver1.public.orders --bootstrap-server kafka:9092 --from-beginning

Ключевые выводы

  1. PyFlink — это Python API for Apache Flink, позволяющий выполнять distributed stream processing
  2. Table API рекомендован для CDC, т.к. поддерживает SQL DDL и структурированные данные
  3. TableEnvironment создаётся в streaming mode для CDC обработки
  4. Kafka connector конфигурируется через SQL DDL с WITH ('connector' = 'kafka', ...)
  5. Debezium envelope маппится на ROW<before, after, op, ts_ms, source> схему
  6. VIEW orders_current извлекает after state для inserts/updates
  7. DELETE события требуют использования before field вместо after
  8. json.ignore-parse-errors=true обеспечивает resilience при malformed messages
  9. TO_TIMESTAMP_LTZ(ts_ms, 3) конвертирует Debezium timestamp в TIMESTAMP тип
  10. PyFlink выводит changelog stream (+I, -U, +U, -D) вместо snapshot результатов

Что дальше?

Мы настроили PyFlink для чтения CDC событий из Kafka. Но пока мы только читаем поток — никаких агрегаций, окон, или joins.

В следующем уроке мы изучим Stateful Stream Processing в PyFlink: tumbling/sliding/session windows, temporal joins, и управление состоянием для производительных real-time analytics.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Почему для обработки CDC событий в PyFlink рекомендуется Table API, а не DataStream API?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс