Prerequisites:
- module-6/02-debezium-server-pubsub
- module-6/03-iam-workload-identity
Dataflow Template: CDC в BigQuery
После того как Debezium Server публикует CDC события в Pub/Sub, следующий шаг — репликация данных в аналитическое хранилище. В этом уроке мы развернем managed Dataflow template для автоматической репликации CDC событий в BigQuery.
Зачем BigQuery для CDC?
BigQuery как destination для CDC:
- Аналитика: SQL-запросы на исторических данных для анализа трендов
- Machine Learning: BigQuery ML для построения моделей на CDC данных
- Reporting: Интеграция с Data Studio, Looker для визуализации
- Долгосрочное хранение: Cost-effective хранилище для CDC истории (columnar storage)
Типичные use case:
- Построение data warehouse из operational databases
- Event sourcing архитектура с BigQuery как event store
- Real-time reporting dashboards на свежих данных
- Feature store для ML моделей
Архитектура CDC → BigQuery
Managed Dataflow template для репликации CDC событий
PostgreSQL
Server
Topics
Changelog
Replica
updateFrequencySecs=60 определяет частоту MERGE (каждую минуту)
At-least-once достаточно для CDC: MERGE по PK идемпотентен
Компоненты потока:
- Cloud SQL PostgreSQL — source database с logical replication
- Debezium Server — CDC engine, публикует события в Pub/Sub
- Pub/Sub Topics — буфер CDC событий (один топик на таблицу)
- Dataflow Job — stream processing, применяет MERGE операции
- BigQuery Changelog — staging таблица с полной CDC историей
- BigQuery Replica — финальная таблица с текущим состоянием (latest по PK)
Dataflow CDC Template Overview
Google предоставляет managed template “MySQL Change Data Capture to BigQuery”, который автоматически обрабатывает CDC события из Pub/Sub и реплицирует в BigQuery.
Что делает template
Автоматическая обработка:
- Чтение CDC событий из Pub/Sub subscriptions
- Парсинг Debezium event envelope (before, after, op, source)
- Создание staging таблиц (changelog) с полной историей
- Создание replica таблиц с текущим состоянием
- MERGE операции для upsert/delete (каждые N секунд)
- Schema evolution — автоматическое добавление новых колонок
Паттерн двух таблиц:
- Changelog table (staging) — raw CDC events, все операции (INSERT/UPDATE/DELETE)
- Replica table — current state, результат применения всех MERGE операций
Важная заметка о совместимости
Template называется “MySQL CDC to BigQuery”, но работает с PostgreSQL CDC!
Debezium использует одинаковый формат события для MySQL и PostgreSQL connectors (envelope structure:
before,after,op,source). Template парсит этот формат, поэтому совместим с PostgreSQL CDC из Pub/Sub.Официальной документации по PostgreSQL нет, но формат событий идентичен, что подтверждается production deployments.
Подготовка: Pre-requisites
Перед запуском Dataflow job необходимо подготовить инфраструктуру.
1. Pub/Sub Subscriptions
Для каждого топика создайте subscription (Dataflow читает из subscriptions, не из топиков напрямую).
# Пример для таблицы orders
gcloud pubsub subscriptions create cdc.public.orders-sub \
--topic=cdc.public.orders \
--ack-deadline=60 \
--project=PROJECT_ID
# Пример для таблицы customers
gcloud pubsub subscriptions create cdc.public.customers-sub \
--topic=cdc.public.customers \
--ack-deadline=60 \
--project=PROJECT_ID
Параметры:
--ack-deadline=60— 60 секунд на обработку сообщения (увеличьте для больших batch)- Subscription name совпадает с topic name +
-sub(для удобства)
Dead Letter Queue (рекомендуется для production):
# Создать DLQ topic
gcloud pubsub topics create cdc-dead-letter --project=PROJECT_ID
# Создать subscription с DLQ
gcloud pubsub subscriptions create cdc.public.orders-sub \
--topic=cdc.public.orders \
--dead-letter-topic=cdc-dead-letter \
--max-delivery-attempts=5 \
--ack-deadline=60 \
--project=PROJECT_ID
2. BigQuery Datasets
Создайте два dataset: один для changelog (staging), другой для replica.
# Staging dataset для raw CDC events
bq mk --dataset \
--location=us-central1 \
--description="CDC changelog tables" \
PROJECT_ID:cdc_staging
# Replica dataset для current state
bq mk --dataset \
--location=us-central1 \
--description="CDC replica tables" \
PROJECT_ID:cdc_replica
Важно: Location dataset должен совпадать с region Dataflow job (например, us-central1).
3. IAM Roles для Dataflow Worker
Dataflow job запускается под service account, которому нужны права:
# Создать service account для Dataflow
gcloud iam service-accounts create dataflow-worker-sa \
--display-name="Dataflow CDC to BigQuery" \
--project=PROJECT_ID
# Pub/Sub Subscriber (читать из subscriptions)
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/pubsub.subscriber"
# BigQuery Data Editor (писать в таблицы)
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/bigquery.dataEditor"
# Dataflow Worker (управление ресурсами Dataflow)
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/dataflow.worker"
# Storage Object Admin (для temp/staging buckets Dataflow)
gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/storage.objectAdmin"
Запуск Dataflow Job
Теперь запустим Dataflow job из managed template.
Команда запуска
gcloud dataflow flex-template run "cdc-to-bigquery-$(date +%Y%m%d-%H%M%S)" \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Mysql_Change_Data_Capture_to_BigQuery \
--region us-central1 \
--service-account-email dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com \
--parameters \
inputSubscriptions="projects/PROJECT_ID/subscriptions/cdc.public.orders-sub,projects/PROJECT_ID/subscriptions/cdc.public.customers-sub",\
changeLogDataset="cdc_staging",\
replicaDataset="cdc_replica",\
updateFrequencySecs=60,\
useStorageWriteApi=true,\
useStorageWriteApiAtLeastOnce=false
Разбор параметров:
| Параметр | Значение | Назначение |
|---|---|---|
--template-file-gcs-location | gs://dataflow-templates-REGION/latest/flex/Mysql_Change_Data_Capture_to_BigQuery | Путь к managed template (замените REGION на ваш) |
--region | us-central1 | GCP region для Dataflow workers |
--service-account-email | dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com | Service account для Dataflow (настроили выше) |
inputSubscriptions | projects/.../subscriptions/cdc.public.orders-sub,... | Список Pub/Sub subscriptions (через запятую, без пробелов) |
changeLogDataset | cdc_staging | BigQuery dataset для changelog таблиц |
replicaDataset | cdc_replica | BigQuery dataset для replica таблиц |
updateFrequencySecs | 60 | Частота MERGE операций (60 секунд = каждую минуту) |
useStorageWriteApi | true | Использовать BigQuery Storage Write API (faster, cheaper) |
useStorageWriteApiAtLeastOnce | false | Exactly-once семантика (см. раздел ниже) |
Job naming: "cdc-to-bigquery-$(date +%Y%m%d-%H%M%S)" создает уникальное имя с timestamp.
Проверка запуска
# Список активных Dataflow jobs
gcloud dataflow jobs list --region=us-central1 --status=active
# Детали конкретного job
gcloud dataflow jobs describe JOB_ID --region=us-central1
# Логи job (streaming)
gcloud dataflow jobs show JOB_ID --region=us-central1
Web Console: https://console.cloud.google.com/dataflow/jobs
Структура output таблиц
Template автоматически создает таблицы в BigQuery при получении первых событий.
Changelog Table (staging)
Название: cdc_staging.TABLE_NAME_changelog
Пример: cdc_staging.orders_changelog
Schema:
CREATE TABLE cdc_staging.orders_changelog (
-- CDC metadata columns
_metadata_timestamp TIMESTAMP, -- Время получения события Dataflow
_metadata_read_timestamp TIMESTAMP, -- Время чтения из Pub/Sub
_metadata_source STRING, -- Pub/Sub subscription name
_metadata_deleted BOOLEAN, -- True для DELETE операций
-- Debezium envelope fields
before STRUCT<...>, -- Before state (для UPDATE/DELETE)
after STRUCT<...>, -- After state (для INSERT/UPDATE)
source STRUCT<
version STRING,
connector STRING,
name STRING,
ts_ms INT64,
snapshot STRING,
db STRING,
schema STRING,
table STRING,
txId INT64,
lsn INT64,
xmin INT64
>,
op STRING, -- Operation: c/u/d/r
ts_ms INT64, -- Debezium event timestamp
transaction STRUCT<...> -- Transaction metadata (если есть)
);
Использование:
- Полная история всех CDC операций
- Аудит и compliance (все изменения сохранены)
- Debugging (почему данные в replica выглядят так?)
- Time-travel queries (состояние на конкретный момент времени)
Replica Table (current state)
Название: cdc_replica.TABLE_NAME
Пример: cdc_replica.orders
Schema: Соответствует schema source таблицы + metadata колонки
CREATE TABLE cdc_replica.orders (
-- Source table columns
id INT64,
customer_id INT64,
product_id INT64,
quantity INT64,
created_at TIMESTAMP,
-- CDC metadata columns
_metadata_timestamp TIMESTAMP, -- Последнее обновление в replica
_metadata_deleted BOOLEAN, -- True если удалено (soft delete)
_metadata_lsn INT64 -- PostgreSQL LSN последнего изменения
);
MERGE logic:
- INSERT операции (op=c) → INSERT в replica
- UPDATE операции (op=u) → UPDATE в replica (или INSERT если не существует)
- DELETE операции (op=d) → DELETE в replica (или UPDATE
_metadata_deleted=true)
Частота MERGE: Каждые updateFrequencySecs секунд (в примере: 60 секунд).
Проверка знанийЗачем Dataflow CDC template использует двухтабличный паттерн (changelog + replica), а не записывает данные напрямую в одну таблицу?
Пример запроса текущего состояния:
-- Все активные заказы (не удаленные)
SELECT id, customer_id, product_id, quantity, created_at
FROM cdc_replica.orders
WHERE _metadata_deleted = false;
-- Все заказы, включая удаленные
SELECT *,
CASE WHEN _metadata_deleted THEN 'DELETED' ELSE 'ACTIVE' END AS status
FROM cdc_replica.orders;
Exactly-Once vs At-Least-Once
Dataflow template поддерживает два режима доставки:
Exactly-Once (useStorageWriteApiAtLeastOnce=false)
Что это значит:
- Каждое CDC событие применяется к BigQuery ровно один раз
- Используется BigQuery Storage Write API в exactly-once режиме
- Dataflow гарантирует отсутствие дубликатов даже при retry
Когда использовать:
- Критичная корректность данных (финансы, compliance)
- Невозможна идемпотентная обработка
- Нужны точные счетчики, аггрегаты
Стоимость: ~2x выше (Dataflow exactly-once overhead + BigQuery Storage Write API)
At-Least-Once (useStorageWriteApiAtLeastOnce=true)
Что это значит:
- CDC событие может быть применено несколько раз при сбоях
- Дубликаты возможны, но MERGE операции идемпотентны (upsert по PK)
- Финальный результат корректный, но промежуточное состояние может иметь дубли
Когда использовать:
- MERGE по primary key гарантирует корректность (upsert идемпотентен)
- Cost optimization важна
- Небольшая задержка в консистентности приемлема
Стоимость: ~50% от exactly-once режима
Рекомендация
Для большинства CDC use cases: at-least-once достаточно.
Почему?
- MERGE операции в replica таблице идемпотентны (upsert по PK)
- Дубликаты в changelog table не критичны (это staging)
- Существенная экономия на Dataflow costs
Используйте exactly-once только если:
- Строгие compliance требования (аудит, регуляторика)
- Невозможность дубликатов в changelog table
- Бюджет позволяет
Проверка знанийПочему для большинства CDC use cases at-least-once режим Dataflow достаточен, несмотря на возможность дубликатов?
Связь с Module 5 концепциями:
В Module 5 - Advanced Python Consumer мы разбирали at-least-once vs exactly-once для Kafka consumers. Те же концепции применимы к Dataflow:
- At-least-once требует идемпотентной обработки (BigQuery MERGE по PK)
- Exactly-once использует транзакционный API (BigQuery Storage Write API)
Мониторинг Dataflow Job
После запуска job необходимо мониторить его состояние.
Google Cloud Console
Dataflow Console: https://console.cloud.google.com/dataflow/jobs
Метрики для мониторинга:
| Метрика | Описание | Threshold |
|---|---|---|
| System Lag | Задержка между event time и processing time | < 60 секунд |
| Elements Added | Количество обработанных CDC событий | Растет стабильно |
| Data Watermark Lag | Задержка watermark (для windowing operations) | < 30 секунд |
| Worker Utilization | CPU utilization Dataflow workers | 60-80% (optimal) |
| Throughput | Elements/sec обработанных событий | Зависит от volume |
Cloud Monitoring Alerts
Настройте алерты на критичные метрики:
# Alert: System Lag > 5 minutes
displayName: "Dataflow CDC Job System Lag High"
conditions:
- displayName: "System Lag > 300 seconds"
conditionThreshold:
filter: |
resource.type="dataflow_job"
metric.type="dataflow.googleapis.com/job/system_lag"
comparison: COMPARISON_GT
thresholdValue: 300000 # 5 minutes in milliseconds
duration: 300s
aggregations:
- alignmentPeriod: 60s
perSeriesAligner: ALIGN_MEAN
notificationChannels:
- projects/PROJECT_ID/notificationChannels/email-oncall
documentation:
content: |
Dataflow CDC job имеет высокую задержку обработки.
Действия:
1. Проверить throughput Pub/Sub topics (backlog растет?)
2. Проверить CPU/memory utilization workers
3. Рассмотреть увеличение количества workers
4. Проверить BigQuery quota limits
Ключевая метрика для CDC: system_lag
- Показывает, насколько Dataflow отстает от реального времени
- Для real-time CDC критично держать lag < 1 минуты
gcloud Commands для мониторинга
# Список метрик job
gcloud dataflow metrics list JOB_ID --region=us-central1
# Детали конкретной метрики
gcloud dataflow metrics list JOB_ID \
--region=us-central1 \
--filter="name:SystemLag"
# Автоматический refresh каждые 10 секунд
watch -n 10 "gcloud dataflow jobs describe JOB_ID --region=us-central1 | grep -A 5 'currentState'"
Schema Evolution
Один из больших преимуществ managed template — автоматическая обработка schema changes.
Что происходит при добавлении колонки в source
Пример:
-- В Cloud SQL PostgreSQL добавляем колонку
ALTER TABLE orders ADD COLUMN discount_percent DECIMAL(5,2);
Dataflow template автоматически:
- Детектирует новую колонку в Debezium event (в
afterstruct) - Добавляет колонку в BigQuery changelog table
- Добавляет колонку в BigQuery replica table
- Продолжает обработку без перерыва
SQL в BigQuery:
-- BigQuery автоматически выполнит (через template)
ALTER TABLE cdc_staging.orders_changelog
ADD COLUMN after.discount_percent FLOAT64;
ALTER TABLE cdc_replica.orders
ADD COLUMN discount_percent FLOAT64;
Важно: Старые события (до добавления колонки) будут иметь NULL в новой колонке.
Ограничения schema evolution
Template НЕ поддерживает:
- Изменение типа колонки (INT → STRING требует пересоздания таблицы)
- Удаление колонки (будет оставаться в BigQuery с NULL значениями)
- Переименование колонки (воспринимается как удаление + добавление)
Решение для breaking changes:
Следуйте паттерну из Module 4 - Schema Evolution:
- Добавить новую колонку с новым именем/типом
- Backfill данные из старой колонки в новую
- Обновить приложения на использование новой колонки
- Удалить старую колонку (оставить в BigQuery или игнорировать)
Альтернатива: Custom Dataflow Pipeline
Managed template упрощает deployment, но ограничивает кастомизацию. Для сложных трансформаций можно написать custom Dataflow pipeline.
Когда использовать Custom Pipeline
- Сложные трансформации: Enrichment из других источников, business logic
- Нестандартный schema: Template ожидает Debezium envelope, custom pipeline может парсить любой формат
- Multi-destination: Параллельная запись в BigQuery + Cloud Storage + другие системы
- Custom windowing: Aggregations, sessionization, late data handling
Пример Custom Pipeline (Apache Beam Python)
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import json
class ParseCDCEvent(beam.DoFn):
def process(self, message):
"""Parse Debezium CDC event from Pub/Sub"""
event = json.loads(message.decode('utf-8'))
payload = event.get('payload', {})
# Extract operation and data
op = payload.get('op')
after = payload.get('after')
before = payload.get('before')
source = payload.get('source', {})
# Transform for BigQuery
if op in ['c', 'r']: # Create or Read (snapshot)
yield {
'id': after['id'],
'customer_id': after['customer_id'],
'quantity': after['quantity'],
'operation': op,
'ts_ms': source.get('ts_ms'),
'_metadata_timestamp': beam.pvalue.TaggedOutput('current_timestamp')
}
elif op == 'u': # Update
yield {
'id': after['id'],
'customer_id': after['customer_id'],
'quantity': after['quantity'],
'operation': op,
'ts_ms': source.get('ts_ms'),
'before_quantity': before.get('quantity') # Custom field for analytics
}
elif op == 'd': # Delete
yield {
'id': before['id'],
'operation': op,
'ts_ms': source.get('ts_ms'),
'_deleted': True
}
def run():
pipeline_options = PipelineOptions(
project='PROJECT_ID',
region='us-central1',
runner='DataflowRunner',
temp_location='gs://BUCKET/temp',
staging_location='gs://BUCKET/staging'
)
with beam.Pipeline(options=pipeline_options) as p:
# Read from Pub/Sub
cdc_events = (
p
| 'Read from Pub/Sub' >> ReadFromPubSub(
subscription='projects/PROJECT_ID/subscriptions/cdc.public.orders-sub'
)
| 'Parse CDC Events' >> beam.ParDo(ParseCDCEvent())
)
# Write to BigQuery
cdc_events | 'Write to BigQuery' >> WriteToBigQuery(
table='PROJECT_ID:cdc_replica.orders',
schema='id:INTEGER,customer_id:INTEGER,quantity:INTEGER,operation:STRING,ts_ms:INTEGER',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
if __name__ == '__main__':
run()
Deployment:
python custom_cdc_pipeline.py \
--project=PROJECT_ID \
--region=us-central1 \
--runner=DataflowRunner \
--temp_location=gs://BUCKET/temp \
--staging_location=gs://BUCKET/staging \
--service_account_email=dataflow-worker-sa@PROJECT_ID.iam.gserviceaccount.com
Tradeoffs:
| Managed Template | Custom Pipeline |
|---|---|
| ✅ Быстрый setup (одна команда) | ❌ Требует код, тестирование |
| ✅ Автоматическое schema evolution | ✅ Полный контроль трансформаций |
| ✅ Google поддерживает и обновляет | ❌ Поддержка на вашей команде |
| ❌ Ограниченная кастомизация | ✅ Любые сложные transformations |
Рекомендация: Начните с managed template. Переходите на custom pipeline только если template не покрывает ваш use case.
Полный workflow: от PostgreSQL до BigQuery
Объединим все шаги в единый workflow.
Архитектура
Полный цикл CDC репликации с использованием Dataflow template
Key Points:
- • Latency: ~60-90 секунд от INSERT в Cloud SQL до Replica в BigQuery
- • Throughput: до 10,000 events/sec на 1 Dataflow worker
- • Auto-scaling: workers увеличиваются при росте Pub/Sub backlog
- • Idempotency: повторная обработка событий безопасна (MERGE по PK)
Checklist развертывания
1. Cloud SQL (Prerequisites)
- ✅ Logical replication enabled (
cloudsql.logical_decoding=on) - ✅ Replication user created with SELECT permissions
- ✅ Publication created for CDC tables
2. Debezium Server (Prerequisites)
- ✅ GKE cluster created with Workload Identity enabled
- ✅ Kubernetes Service Account with annotation
- ✅ GCP Service Account with
roles/pubsub.publisher - ✅ Workload Identity binding configured
- ✅ ConfigMap with
application.properties - ✅ PersistentVolumeClaim for offset storage
- ✅ Deployment running and publishing events
3. Pub/Sub
- ✅ Topics created (one per table:
cdc.schema.table) - ✅ Subscriptions created (
topic-name-sub) - ✅ Dead Letter Queue configured
4. BigQuery
- ✅ Datasets created (
cdc_staging,cdc_replica) - ✅ Location matches Dataflow region
5. Dataflow
- ✅ Service Account with required roles
- ✅ Template launched with correct parameters
- ✅ Job status: Running
- ✅ System lag < 60 seconds
6. Monitoring
- ✅ Cloud Monitoring alerts configured (system lag, errors)
- ✅ Changelog and replica tables populated
Проверка end-to-end:
# 1. Вставить данные в Cloud SQL
psql -h CLOUD_SQL_IP -U postgres -d production -c "
INSERT INTO orders (customer_id, product_id, quantity)
VALUES (101, 201, 5);
"
# 2. Проверить Pub/Sub topic (должны увидеть сообщение)
gcloud pubsub subscriptions pull cdc.public.orders-sub \
--limit=1 --auto-ack
# 3. Подождать 60 секунд (updateFrequencySecs)
# 4. Проверить BigQuery replica table
bq query --use_legacy_sql=false "
SELECT * FROM cdc_replica.orders
WHERE customer_id = 101
ORDER BY _metadata_timestamp DESC
LIMIT 1;
"
Что мы узнали
- Dataflow CDC Template: Managed solution для репликации Pub/Sub CDC events в BigQuery
- Двухтабличный паттерн: Changelog (staging) для full history + Replica для current state
- MERGE операции: Автоматические upsert/delete каждые
updateFrequencySecsсекунд - Exactly-once vs At-least-once: At-least-once достаточно для большинства CDC use cases (MERGE идемпотентен)
- Schema evolution: Template автоматически добавляет новые колонки в BigQuery
- Мониторинг: System lag — ключевая метрика для real-time CDC
- Custom pipeline: Альтернатива для сложных трансформаций (Apache Beam)
Что дальше?
В следующем уроке мы настроим Cloud Run event-driven процессинг для выполнения business logic на CDC событиях (уведомления, enrichment, маршрутизация).
Check Your Understanding
Finished the lesson?
Mark it as complete to track your progress