Требуемые знания:
- module-7/01-capstone-overview
Архитектура и Deliverables
В предыдущем уроке вы узнали о цели capstone проекта — построить production-ready CDC pipeline. Теперь детализируем архитектуру и точные deliverables, которые вам нужно создать.
Recommended Project Structure
Ваш capstone project должен иметь четкую структуру для production readiness.
Directory Layout
capstone-project/
├── infrastructure/ # Docker Compose, Kubernetes manifests
│ ├── docker-compose.yml # Local dev environment
│ ├── debezium/ # Connector configs
│ │ └── connector.json # Debezium connector configuration
│ └── monitoring/ # Prometheus, Grafana configs
│ ├── prometheus.yml # Prometheus scrape configs
│ └── grafana/ # Grafana dashboards (JSON exports)
│
├── database/ # Aurora/PostgreSQL schema and migrations
│ ├── schema.sql # Tables including outbox
│ ├── migrations/ # Schema evolution scripts
│ └── seed-data/ # Test data generation scripts
│ └── generate_orders.sql
│
├── pyflink-jobs/ # Stream processing applications
│ ├── cdc_processor.py # Main PyFlink Table API job
│ ├── requirements.txt # Python dependencies
│ └── tests/ # Unit and integration tests
│ └── test_transformations.py
│
├── bigquery/ # Warehouse schema and config
│ ├── schema.sql # Table definitions with primary keys
│ └── ddl/ # BigQuery-specific DDL scripts
│
├── monitoring/ # Dashboards and alerts
│ ├── dashboards/ # Grafana JSON exports
│ │ └── debezium-overview.json
│ └── alerts/ # Alert rules (Prometheus)
│ └── debezium-alerts.yml
│
├── docs/ # Project documentation
│ ├── architecture.md # C4 diagrams, system context
│ ├── runbook.md # Operational procedures
│ └── testing-strategy.md # Validation approach
│
└── README.md # Project overview and setup instructions
Почему важна структура:
- Разделение concerns: Infrastructure vs application code vs documentation
- Reproducibility: Коллега может поднять весь pipeline по README
- Production pattern: Реальные проекты используют аналогичную структуру
Component Architecture Diagrams
Используйте C4 Model для документирования архитектуры.
C4 Level 1: System Context
Показывает: Внешние пользователи и системы, с которыми взаимодействует ваш CDC pipeline.
C4 Model Level 1: Внешние актеры и границы системы
C4 System Context показывает:
- Person (rounded node) - внешний актер, взаимодействующий с системой
- System (solid border) - наша система, которую документируем
- System_Ext (dashed border) - внешние системы вне нашего контроля
Цель: Показать, кто использует систему и с какими external systems она интегрируется.
C4 Level 2: Container Diagram
Показывает: Основные компоненты (containers) внутри CDC pipeline и их взаимодействия.
C4 Model Level 2: Компоненты и технологии внутри CDC Pipeline
C4 Container Diagram показывает:
- Container_Boundary - границы подсистем (DiagramContainer)
- ContainerDb - database компоненты (variant="database")
- Container - application компоненты (variant="connector")
- ContainerQueue - messaging компоненты (variant="cluster")
- Технология указана под каждым компонентом
Цель: Показать технологии каждого компонента и data flow между ними.
Проверка знанийВ C4 модели какой уровень диаграммы показывает внешних пользователей и системы, а какой — внутренние компоненты pipeline?
Technical Requirements по компонентам
Детальные требования для каждого слоя вашей архитектуры.
1. Aurora PostgreSQL / PostgreSQL
Ваша задача: Создать source database с logical replication и outbox table.
Configuration Requirements
-- Если используете локальный PostgreSQL (для тестирования)
-- Включить logical replication в postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
-- Если используете Aurora PostgreSQL
-- Установить через parameter group:
-- rds.logical_replication = 1
Outbox Table Schema
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregatetype VARCHAR(255) NOT NULL, -- Routing key (e.g., "orders")
aggregateid VARCHAR(255) NOT NULL, -- Entity ID (e.g., order_id)
type VARCHAR(255) NOT NULL, -- Event type (e.g., "OrderCreated")
payload JSONB NOT NULL, -- Event data
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- КРИТИЧНО: REPLICA IDENTITY FULL для DELETE событий
ALTER TABLE outbox REPLICA IDENTITY FULL;
-- Index для cleanup queries (удаление старых событий)
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
Replication User
-- Создать пользователя для Debezium
CREATE USER debezium_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
-- Предоставить права на чтение outbox table
GRANT SELECT ON outbox TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;
Validation checklist:
-
SHOW wal_level;возвращаетlogical - Outbox table создана с правильной schema
-
REPLICA IDENTITY FULLустановлен - Replication user имеет права
REPLICATIONиSELECT
2. Debezium Connector Configuration
Ваша задача: Настроить Debezium с Outbox Event Router SMT.
Connector JSON Config
{
"name": "aurora-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium_user",
"database.password": "secure_password",
"database.dbname": "ecommerce",
"database.server.name": "aurora_prod",
"table.include.list": "public.outbox",
"plugin.name": "pgoutput",
"publication.autocreate.mode": "filtered",
"slot.name": "debezium_outbox_slot",
"heartbeat.interval.ms": "10000",
"snapshot.mode": "when_needed",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "aggregatetype",
"transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
"transforms.outbox.table.fields.additional.placement": "type:header:eventType,created_at:envelope:timestamp",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}
Ключевые параметры:
| Parameter | Value | Purpose |
|---|---|---|
table.include.list | public.outbox | Захватывать только outbox table |
plugin.name | pgoutput | Built-in PostgreSQL plugin (Aurora compatible) |
heartbeat.interval.ms | 10000 | Keep replication slot alive (prevent WAL bloat) |
snapshot.mode | when_needed | Auto-detect if snapshot required |
transforms.outbox.route.by.field | aggregatetype | Route events to topics by aggregate type |
transforms.outbox.route.topic.replacement | outbox.event.${routedByValue} | Topic naming pattern |
Validation checklist:
- Connector status =
RUNNING(GET /connectors/aurora-outbox-connector/status) - Replication slot создан (
SELECT * FROM pg_replication_slots) - Kafka topic
outbox.event.ordersсодержит события
3. PyFlink Stream Processing
Ваша задача: Создать PyFlink job для transformations перед отправкой в BigQuery.
PyFlink Table API Job Structure
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit
# Initialize Flink Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Configure checkpointing (fault tolerance)
table_env.get_config().set("execution.checkpointing.interval", "60000") # 60s
table_env.get_config().set("parallelism.default", "2")
table_env.get_config().set("table.exec.source.cdc-events-duplicate", "true")
# Define Kafka source with Debezium format
source_ddl = """
CREATE TABLE orders_cdc (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
status STRING,
total_amount DECIMAL(10, 2),
created_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'outbox.event.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'pyflink-cdc-processor',
'format' = 'debezium-json',
'scan.startup.mode' = 'earliest-offset',
'debezium-json.schema-include' = 'false'
)
"""
table_env.execute_sql(source_ddl)
# Define transformation view
transform_sql = """
CREATE VIEW enriched_orders AS
SELECT
order_id,
customer_id,
product_id,
status,
total_amount,
created_at,
CASE
WHEN status = 'completed' THEN 'FULFILLED'
WHEN status = 'cancelled' THEN 'CANCELLED'
ELSE 'PENDING'
END AS fulfillment_status,
CURRENT_TIMESTAMP AS processed_at
FROM orders_cdc
"""
table_env.execute_sql(transform_sql)
# Define BigQuery sink (via Kafka topic)
sink_ddl = """
CREATE TABLE orders_bigquery_sink (
order_id BIGINT,
customer_id BIGINT,
product_id BIGINT,
status STRING,
total_amount DECIMAL(10, 2),
created_at TIMESTAMP(3),
fulfillment_status STRING,
processed_at TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'bigquery.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
"""
table_env.execute_sql(sink_ddl)
# Execute streaming query
table_env.execute_sql("""
INSERT INTO orders_bigquery_sink
SELECT * FROM enriched_orders
""").wait()
Key Patterns:
- Debezium format:
'format' = 'debezium-json'автоматически парсит CDC envelope - Checkpointing:
execution.checkpointing.intervalобеспечивает fault tolerance - Duplicate handling:
table.exec.source.cdc-events-duplicate=trueдля at-least-once semantics - Transformations: SQL VIEW для enrichment логики
Validation checklist:
- PyFlink job запущен без ошибок
- Checkpoints успешно завершаются (логи Flink)
- Kafka topic
bigquery.ordersсодержит transformed events - Transformations применяются корректно (проверить
fulfillment_status)
4. BigQuery CDC Table
Ваша задача: Создать BigQuery table с primary key для CDC ingestion.
BigQuery Table DDL
CREATE TABLE `your-project.your_dataset.orders` (
order_id INT64 NOT NULL,
customer_id INT64 NOT NULL,
product_id INT64 NOT NULL,
status STRING,
total_amount NUMERIC(10, 2),
created_at TIMESTAMP,
fulfillment_status STRING,
processed_at TIMESTAMP,
PRIMARY KEY (order_id) NOT ENFORCED
) OPTIONS (
max_staleness = INTERVAL 15 MINUTE,
description = "Orders table with CDC ingestion from Aurora via PyFlink"
);
Критические детали:
| Requirement | Explanation |
|---|---|
PRIMARY KEY (order_id) NOT ENFORCED | BigQuery CDC требует explicit primary key declaration |
NOT ENFORCED | BigQuery не enforce uniqueness (responsibility на источнике) |
max_staleness = INTERVAL 15 MINUTE | Balances freshness vs cost (avoid < 5 min for cost) |
BigQuery Connector Configuration (via Kafka Connect):
{
"name": "bigquery-sink-connector",
"config": {
"connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
"topics": "bigquery.orders",
"project": "your-gcp-project",
"defaultDataset": "your_dataset",
"keyfile": "/path/to/service-account-key.json",
"autoCreateTables": "false",
"upsertEnabled": "true",
"deleteEnabled": "true",
"intermediateTableSuffix": "_temp",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
}
}
Validation checklist:
- BigQuery table создана с PRIMARY KEY
- Connector status =
RUNNING - Sample INSERT event появился в BigQuery
- UPDATE event применился (UPSERT работает)
- DELETE event удалил строку
5. Monitoring Infrastructure
Ваша задача: Настроить Prometheus + Grafana для observability.
Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kafka-connect'
static_configs:
- targets: ['kafka-connect:9999'] # JMX exporter port
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
Key Metrics to Monitor
| Metric | Description | Alert Threshold |
|---|---|---|
debezium_metrics_MilliSecondsBehindSource | Replication lag (ms) | > 10000 (10s) |
debezium_metrics_NumberOfEventsFiltered | Filtered events count | Monitor for drops |
kafka_connect_connector_status | Connector state (1=running, 0=failed) | < 1 (failed) |
kafka_consumer_lag | PyFlink consumer lag | > 1000 messages |
flink_taskmanager_job_task_numRecordsInPerSecond | PyFlink throughput | Monitor for zero |
Grafana Dashboard Structure
Panels:
- Connector Status - Gauge showing RUNNING/FAILED
- Replication Lag - Time series graph (ms behind source)
- Events/Second - Throughput graph
- Kafka Consumer Lag - PyFlink consumption lag
- Checkpoint Duration - PyFlink checkpoint completion time
Validation checklist:
- Prometheus scrapes JMX metrics (check targets at
localhost:9090/targets) - Grafana dashboard shows live metrics
- Alert configured for: connector failure, lag > 10s, error rate > 0
Deliverables Checklist
1. Working Code
-
infrastructure/docker-compose.yml- Full stack (PostgreSQL, Kafka, Connect, Prometheus, Grafana) -
database/schema.sql- Outbox table DDL -
infrastructure/debezium/connector.json- Debezium connector config -
pyflink-jobs/cdc_processor.py- PyFlink transformation job -
pyflink-jobs/requirements.txt- Python dependencies -
bigquery/schema.sql- BigQuery table DDL
2. Documentation
-
README.md- Project overview, setup instructions, quick start -
docs/architecture.md- C4 diagrams (System Context + Container) -
docs/runbook.md- Operational procedures for common failures -
docs/testing-strategy.md- How to validate end-to-end functionality
3. Monitoring
-
infrastructure/monitoring/prometheus.yml- Prometheus scrape config -
monitoring/dashboards/debezium-overview.json- Grafana dashboard export -
monitoring/alerts/debezium-alerts.yml- Alert rules
4. Testing Evidence
- Screenshots or logs showing:
- Outbox table INSERT
- Debezium captured event in Kafka
- PyFlink processed event
- Event appeared in BigQuery
- Chaos testing results:
- Connector restart without data loss
- PyFlink restart from checkpoint
Проверка знанийПочему snapshot.mode=always является anti-pattern для production capstone проекта?
Anti-Patterns to Avoid
❌ Anti-Pattern 1: Missing REPLICA IDENTITY FULL
Проблема: Debezium events для UPDATE/DELETE не содержат полных данных.
Решение:
ALTER TABLE outbox REPLICA IDENTITY FULL;
❌ Anti-Pattern 2: No Primary Key in BigQuery
Проблема: BigQuery CDC ingestion fails без explicit primary key.
Решение:
CREATE TABLE orders (
order_id INT64 NOT NULL,
...,
PRIMARY KEY (order_id) NOT ENFORCED
);
❌ Anti-Pattern 3: snapshot.mode=always
Проблема: Каждый restart connector делает full re-snapshot, дублируя данные.
Решение:
"snapshot.mode": "when_needed"
❌ Anti-Pattern 4: Нет мониторинга replication slot
Проблема: WAL bloat заполняет диск, непонятно когда lag растет.
Решение:
-- Регулярно проверять
SELECT
slot_name,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'debezium_outbox_slot';
❌ Anti-Pattern 5: Hardcoded credentials
Проблема: Passwords в git repository, security risk.
Решение:
"database.password": "${file:/secrets/db-password.txt:password}"
❌ Anti-Pattern 6: Нет checkpointing в PyFlink
Проблема: Restart теряет state, дублирует обработку.
Решение:
table_env.get_config().set("execution.checkpointing.interval", "60000")
Production Patterns to Follow
✅ Pattern 1: Heartbeat для replication slot
Зачем: Предотвращает WAL bloat при отсутствии событий.
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO heartbeat (ts) VALUES (NOW()) ON CONFLICT (id) DO UPDATE SET ts=EXCLUDED.ts"
✅ Pattern 2: At-least-once delivery с idempotency
Зачем: CDC дает at-least-once; downstream должен быть idempotent.
# PyFlink config
table_env.get_config().set("table.exec.source.cdc-events-duplicate", "true")
# BigQuery uses PRIMARY KEY for automatic UPSERT
✅ Pattern 3: Monitoring Four Golden Signals
Зачем: SRE best practice для observability.
| Signal | Metric | Meaning |
|---|---|---|
| Latency | Replication lag (ms behind source) | How fresh is data? |
| Traffic | Events/second throughput | System load |
| Errors | Connector status, error rate | Failure detection |
| Saturation | CPU/memory utilization | Capacity planning |
✅ Pattern 4: Runbook для common failures
Зачем: Быстрый response при incidents.
Пример runbook entries:
-
Connector fails with “too many replication slots”
- Symptom: Connector status = FAILED
- Diagnosis:
SELECT * FROM pg_replication_slots - Remediation: Drop unused slots or increase
max_replication_slots
-
WAL segments no longer exist
- Symptom: Connector fails with “requested WAL segment … no longer exists”
- Diagnosis: Check
wal_keep_sizeparameter - Remediation: Re-snapshot with
snapshot.mode=initialor restore from backup
Ключевые выводы
- Project structure критичен: Разделение infrastructure, code, documentation облегчает maintenance
- C4 diagrams обязательны: System Context показывает external integrations, Container показывает internal components
- Technical requirements детальны: Каждый компонент имеет explicit checklist для validation
- Outbox table требует REPLICA IDENTITY FULL для DELETE событий
- Debezium Outbox Event Router SMT роутирует события по
aggregatetypefield - PyFlink Table API упрощает CDC processing с Debezium format support
- BigQuery CDC требует PRIMARY KEY (NOT ENFORCED) для UPSERT/DELETE
- Monitoring = Four Golden Signals: Latency, Traffic, Errors, Saturation
- Anti-patterns избегать: No REPLICA IDENTITY, no primary key, snapshot.mode=always, hardcoded credentials
- Production patterns: Heartbeat, at-least-once + idempotency, monitoring, runbook
Что дальше?
Вы готовы начать работу над capstone проектом. Используйте:
- Project structure как шаблон для организации кода
- C4 diagrams для документирования архитектуры
- Technical requirements как checklist при реализации
- Anti-patterns как warning signs при code review
- Production patterns как best practices
Время применить все знания курса в едином проекте. Удачи!
Проверьте понимание
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс