Требуемые знания:
- module-5/02-pandas-integration
PyFlink CDC Connector — настройка и использование
Pandas отлично подходит для batch-анализа CDC событий — вы загрузили данные, построили DataFrame, выполнили агрегации. Но что если вам нужна непрерывная обработка потока событий? Агрегации в реальном времени? Сложные join’ы между таблицами? Apache Flink — это распределённый движок для stateful stream processing, а PyFlink — его Python API.
В этом уроке мы настроим PyFlink Table API для чтения CDC событий из Kafka, научимся определять схему Debezium envelope, и разберём, как извлекать текущее состояние записей для downstream обработки.
Что такое PyFlink?
Apache Flink — это framework для распределённой потоковой обработки данных. PyFlink — официальный Python API, позволяющий писать Flink jobs на Python вместо Java/Scala.
Pandas vs PyFlink: В чём разница?
Batch-обработка: накопить → обработать → записать
Непрерывная обработка потока событий
Ключевые отличия:
| Характеристика | Pandas | PyFlink |
|---|---|---|
| Модель обработки | Batch (загрузили → обработали → записали) | Streaming (непрерывная обработка) |
| Масштабирование | Single-machine (ограничено памятью) | Distributed (кластер из N машин) |
| Latency | Минуты/часы (batch interval) | Миллисекунды/секунды |
| Состояние | Нет встроенного (DataFrame ephemeral) | Stateful (управляемое состояние) |
| Когда использовать | Exploratory analysis, периодические отчёты | Real-time dashboards, continuous ETL |
Пример сценариев:
- Pandas: Каждый час загружаем CDC события за последний час, считаем агрегаты, пишем в DWH
- PyFlink: Считаем 5-минутные tumbling window агрегации в реальном времени, пишем в Kafka topic для dashboard’а
Production истина: Pandas для batch ETL, PyFlink для real-time processing. Если нужна latency менее 1 минуты — PyFlink. Если достаточно hourly batches — Pandas проще.
Table API vs DataStream API
PyFlink предлагает два API для работы с потоками:
1. DataStream API — низкоуровневый API для произвольной обработки потоков:
# DataStream API (более гибкий, но сложнее)
stream = env.add_source(FlinkKafkaConsumer(...))
stream.map(lambda x: transform(x)).add_sink(...)
2. Table API — SQL-подобный API для структурированных данных:
# Table API (SQL DDL для источников)
table_env.execute_sql("CREATE TABLE orders (...) WITH ('connector' = 'kafka', ...)")
result = table_env.sql_query("SELECT customer_id, COUNT(*) FROM orders GROUP BY customer_id")
Для CDC мы используем Table API, потому что:
- Структурированные данные: CDC события имеют чёткую схему (envelope + payload)
- SQL знаком: Table API использует SQL DDL и DML — проще для data engineers
- Оптимизация: Flink лучше оптимизирует SQL-запросы, чем произвольный DataStream код
- Поддержка CDC: Kafka connector в Table API понимает Debezium envelope
Рекомендация: Для CDC pipeline используйте Table API. DataStream API только если нужна низкоуровневая обработка (custom state management, side outputs).
Проверка знанийПочему для обработки CDC событий в PyFlink рекомендуется Table API, а не DataStream API? Какое ключевое преимущество даёт SQL DDL?
Настройка окружения
PyFlink требует следующие зависимости:
Python версия:
- Python 3.9-3.12 (PyFlink 2.2.0 не поддерживает 3.13+)
- Для этого курса используйте Python 3.11 или 3.12
Java Runtime:
- PyFlink работает через JVM — требуется Java 11+
- Проверьте:
java -version(должен быть 11 или выше)
Установка PyFlink:
# В JupyterLab терминале
pip install apache-flink==2.2.0
Проверка установки:
from pyflink.table import EnvironmentSettings
print("PyFlink установлен успешно!")
Важно: В нашем lab окружении JupyterLab уже имеет все зависимости. Если настраиваете локально — убедитесь, что Java установлена.
Создание TableEnvironment
TableEnvironment — это entry point для PyFlink Table API. Он управляет контекстом выполнения, регистрацией таблиц, и выполнением запросов.
Streaming Mode
Для CDC обработки нужен streaming mode (не batch):
from pyflink.table import EnvironmentSettings, TableEnvironment
# Создаём settings для streaming
env_settings = EnvironmentSettings.in_streaming_mode()
# Создаём TableEnvironment
table_env = TableEnvironment.create(env_settings)
print("TableEnvironment создан в streaming mode")
Что происходит:
in_streaming_mode()— конфигурирует unbounded stream processing (не batch)create()— инициализирует JVM, создаёт Flink execution environment
Примечание: Можно также использовать
in_batch_mode()для batch processing, но для CDC мы всегда используем streaming.
Определение Kafka Source Table
Теперь определим таблицу-источник для чтения CDC событий из Kafka.
SQL DDL для Kafka Connector
PyFlink Table API использует SQL DDL для регистрации источников данных:
# Определяем таблицу для CDC событий из Kafka
table_env.execute_sql("""
CREATE TABLE orders_cdc (
payload ROW<
before ROW<
id INT,
customer_id INT,
product_id INT,
quantity INT,
total DECIMAL(10,2),
status STRING,
created_at TIMESTAMP(3)
>,
after ROW<
id INT,
customer_id INT,
product_id INT,
quantity INT,
total DECIMAL(10,2),
status STRING,
created_at TIMESTAMP(3)
>,
op STRING,
ts_ms BIGINT,
source ROW<
db STRING,
`table` STRING,
connector STRING
>
>
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.public.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pyflink-cdc-consumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
""")
Разбор конфигурации
Схема (payload ROW):
ROW<...>— композитный тип для nested JSON structures- before/after: Вложенные ROW с полями таблицы
- op: Тип операции (STRING: ‘c’, ‘u’, ‘d’, ‘r’)
- ts_ms: Timestamp Debezium события (BIGINT: milliseconds)
- source: Метаданные источника (db, table, connector)
Connector опции:
| Опция | Значение | Назначение |
|---|---|---|
connector | kafka | Использовать Kafka connector |
topic | dbserver1.public.orders | Имя топика с CDC событиями |
properties.bootstrap.servers | kafka:9092 | Kafka brokers (Docker network hostname) |
properties.group.id | pyflink-cdc-consumer | Consumer group ID |
scan.startup.mode | earliest-offset | Читать с начала топика |
format | json | Формат сообщений (JSON) |
json.fail-on-missing-field | false | Не падать на отсутствующие поля |
json.ignore-parse-errors | true | Пропускать malformed JSON |
Важно:
kafka:9092— это hostname внутри Docker network. Если PyFlink запущен на host machine — используйтеlocalhost:9092.
Backticks для зарезервированных слов
Обратите внимание на backticks вокруг table:
source ROW<
db STRING,
`table` STRING, -- backticks, т.к. 'table' зарезервированное слово
connector STRING
>
SQL резервирует table как keyword. Без backticks получите syntax error.
Извлечение текущего состояния
Debezium envelope содержит before и after поля. Для большинства downstream обработок нужно текущее состояние записи (т.е. after для create/update, before для delete).
VIEW для извлечения after state
Создадим VIEW, которая извлекает только after поля для inserts/updates:
table_env.execute_sql("""
CREATE VIEW orders_current AS
SELECT
payload.after.id AS id,
payload.after.customer_id AS customer_id,
payload.after.product_id AS product_id,
payload.after.quantity AS quantity,
payload.after.total AS total,
payload.after.status AS status,
payload.after.created_at AS created_at,
payload.op AS operation,
TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time
FROM orders_cdc
WHERE payload.op IN ('c', 'u', 'r')
""")
Что происходит:
- SELECT payload.after.*: Извлекаем поля из
after(текущее состояние после операции) - WHERE payload.op IN (‘c’, ‘u’, ‘r’): Фильтруем только CREATE, UPDATE, READ (snapshot) операции
- TO_TIMESTAMP_LTZ(payload.ts_ms, 3): Конвертируем milliseconds в TIMESTAMP (3 — millisecond precision)
Примечание: DELETE события (
op='d') не включены, т.к. у нихafter=null. Если нужны deletes — используйтеpayload.before.
Обработка DELETE событий
Если нужно обрабатывать DELETE, создайте отдельный VIEW:
table_env.execute_sql("""
CREATE VIEW orders_deleted AS
SELECT
payload.before.id AS id,
payload.before.customer_id AS customer_id,
'd' AS operation,
TO_TIMESTAMP_LTZ(payload.ts_ms, 3) AS event_time
FROM orders_cdc
WHERE payload.op = 'd'
""")
Обработка ошибок десериализации
Production Kafka topics могут содержать malformed messages из-за:
- Schema changes в source database
- Manual data modifications напрямую в Kafka
- Bugs в Debezium connector
Проблема: PyFlink по умолчанию падает на первом JSON parse error → job останавливается.
Решение: Конфигурируем error tolerance в connector options:
'json.fail-on-missing-field' = 'false', -- Не падать на отсутствующие поля
'json.ignore-parse-errors' = 'true' -- Пропускать malformed JSON
Trade-off:
- ✅ Плюс: Job продолжает работать при ошибках десериализации
- ⚠️ Минус: Malformed события молча пропускаются (data loss)
Production рекомендация: Используйте
json.ignore-parse-errors=trueдля resilience, но мониторьте метрики Flink на dropped events. Настройте alerting на аномалии.
Мониторинг parse errors
PyFlink не предоставляет built-in метрики для parse errors, но можно логировать:
# В production настройте Flink logging level
table_env.get_config().get_configuration().set_string(
"taskmanager.log.level", "WARN"
)
Flink будет писать в logs события, которые не удалось распарсить.
Проверка знанийКакой trade-off создаёт настройка json.ignore-parse-errors = true в PyFlink Kafka connector? Почему её всё равно рекомендуют для production?
Выполнение запроса
Теперь можем выполнить SQL-запрос и вывести результат:
# Запрос к VIEW
result_table = table_env.sql_query("""
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total) AS total_revenue
FROM orders_current
GROUP BY customer_id
""")
# Выполнить и вывести результаты
result_table.execute().print()
Вывод:
+I[1, 5, 1250.00]
+I[2, 3, 780.50]
-U[1, 5, 1250.00]
+U[1, 6, 1500.00]
...
Что означают префиксы:
- +I: Insert (новая строка)
- -U: Update (удаление старого значения)
- +U: Update (вставка нового значения)
- -D: Delete
Примечание: Table API выводит changelog stream, а не просто результаты. Для production используйте sink (Kafka, database, file).
Архитектура PyFlink CDC Pipeline
От PostgreSQL до sink через Debezium, Kafka и PyFlink
Distributed stream processing с stateful operations. Масштабируется горизонтально (кластер из N TaskManagers). Fault tolerance через checkpoints и state recovery.
Лабораторная работа: PyFlink CDC Source
Давайте настроим PyFlink для чтения CDC событий из вашего lab окружения.
Задание
-
Откройте JupyterLab и создайте новый notebook
module5/pyflink_cdc_source.ipynb -
Создайте TableEnvironment:
from pyflink.table import EnvironmentSettings, TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
-
Определите Kafka source table для топика
dbserver1.public.orders(используйте схему из примера выше) -
Создайте VIEW
orders_currentдля извлеченияafterstate -
Выполните запрос:
result = table_env.sql_query("""
SELECT
status,
COUNT(*) AS order_count
FROM orders_current
GROUP BY status
""")
result.execute().print()
- В отдельном терминале выполните несколько INSERT/UPDATE в PostgreSQL:
INSERT INTO orders (customer_id, product_id, quantity, total, status)
VALUES (1, 101, 2, 150.00, 'pending');
UPDATE orders SET status = 'shipped' WHERE id = 1;
- Наблюдайте обновления в PyFlink output (changelog stream)
Ожидаемый результат
Вы должны увидеть в output:
+I[pending, 1] -- После INSERT
-U[pending, 1] -- Удаление старого count
+U[pending, 0]
+I[shipped, 1] -- После UPDATE
Troubleshooting
Проблема: Connection refused к Kafka
Решение: Убедитесь, что используете kafka:9092 (Docker network hostname), не localhost:9092
Проблема: No data appears in output
Решение: Проверьте, что топик содержит события: kafka-console-consumer --topic dbserver1.public.orders --bootstrap-server kafka:9092 --from-beginning
Ключевые выводы
- PyFlink — это Python API for Apache Flink, позволяющий выполнять distributed stream processing
- Table API рекомендован для CDC, т.к. поддерживает SQL DDL и структурированные данные
- TableEnvironment создаётся в streaming mode для CDC обработки
- Kafka connector конфигурируется через SQL DDL с
WITH ('connector' = 'kafka', ...) - Debezium envelope маппится на
ROW<before, after, op, ts_ms, source>схему - VIEW orders_current извлекает
afterstate для inserts/updates - DELETE события требуют использования
beforefield вместоafter - json.ignore-parse-errors=true обеспечивает resilience при malformed messages
- TO_TIMESTAMP_LTZ(ts_ms, 3) конвертирует Debezium timestamp в TIMESTAMP тип
- PyFlink выводит changelog stream (+I, -U, +U, -D) вместо snapshot результатов
Что дальше?
Мы настроили PyFlink для чтения CDC событий из Kafka. Но пока мы только читаем поток — никаких агрегаций, окон, или joins.
В следующем уроке мы изучим Stateful Stream Processing в PyFlink: tumbling/sliding/session windows, temporal joins, и управление состоянием для производительных real-time analytics.
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс