Skip to content
Learning Platform
Advanced
40 minutes
Architecture Production Documentation Monitoring

Prerequisites:

  • module-7/01-capstone-overview

Архитектура и Deliverables

В предыдущем уроке вы узнали о цели capstone проекта — построить production-ready CDC pipeline. Теперь детализируем архитектуру и точные deliverables, которые вам нужно создать.


Ваш capstone project должен иметь четкую структуру для production readiness.

Directory Layout

capstone-project/
├── infrastructure/          # Docker Compose, Kubernetes manifests
│   ├── docker-compose.yml  # Local dev environment
│   ├── debezium/           # Connector configs
│   │   └── connector.json  # Debezium connector configuration
│   └── monitoring/         # Prometheus, Grafana configs
│       ├── prometheus.yml  # Prometheus scrape configs
│       └── grafana/        # Grafana dashboards (JSON exports)

├── database/               # Aurora/PostgreSQL schema and migrations
│   ├── schema.sql          # Tables including outbox
│   ├── migrations/         # Schema evolution scripts
│   └── seed-data/          # Test data generation scripts
│       └── generate_orders.sql

├── pyflink-jobs/           # Stream processing applications
│   ├── cdc_processor.py    # Main PyFlink Table API job
│   ├── requirements.txt    # Python dependencies
│   └── tests/              # Unit and integration tests
│       └── test_transformations.py

├── bigquery/               # Warehouse schema and config
│   ├── schema.sql          # Table definitions with primary keys
│   └── ddl/                # BigQuery-specific DDL scripts

├── monitoring/             # Dashboards and alerts
│   ├── dashboards/         # Grafana JSON exports
│   │   └── debezium-overview.json
│   └── alerts/             # Alert rules (Prometheus)
│       └── debezium-alerts.yml

├── docs/                   # Project documentation
│   ├── architecture.md     # C4 diagrams, system context
│   ├── runbook.md          # Operational procedures
│   └── testing-strategy.md # Validation approach

└── README.md               # Project overview and setup instructions

Почему важна структура:

  • Разделение concerns: Infrastructure vs application code vs documentation
  • Reproducibility: Коллега может поднять весь pipeline по README
  • Production pattern: Реальные проекты используют аналогичную структуру

Component Architecture Diagrams

Используйте C4 Model для документирования архитектуры.

C4 Level 1: System Context

Показывает: Внешние пользователи и системы, с которыми взаимодействует ваш CDC pipeline.

System Context: E-commerce CDC Pipeline

C4 Model Level 1: Внешние актеры и границы системы

Data Analyst
Queries real-time data
CDC Pipeline
Captures and streams changes from Aurora to BigQuery
Writes to outbox
E-commerce Application
[System_Ext]
Streams CDC events
BigQuery
[System_Ext]

C4 System Context показывает:

  • Person (rounded node) - внешний актер, взаимодействующий с системой
  • System (solid border) - наша система, которую документируем
  • System_Ext (dashed border) - внешние системы вне нашего контроля

Цель: Показать, кто использует систему и с какими external systems она интегрируется.

C4 Level 2: Container Diagram

Показывает: Основные компоненты (containers) внутри CDC pipeline и их взаимодействия.

Container Diagram: CDC Pipeline Components

C4 Model Level 2: Компоненты и технологии внутри CDC Pipeline

Source
Aurora PostgreSQL
Database
Outbox Table
PostgreSQL Table
CDC Layer
Debezium Connector
Kafka Connect
Kafka
Event Streaming
Stream Processing
PyFlink Job
Python/Flink
Analytics Warehouse
BigQuery
Data Warehouse
Observability
Prometheus
Metrics DB
Grafana
Dashboards

C4 Container Diagram показывает:

  • Container_Boundary - границы подсистем (DiagramContainer)
  • ContainerDb - database компоненты (variant="database")
  • Container - application компоненты (variant="connector")
  • ContainerQueue - messaging компоненты (variant="cluster")
  • Технология указана под каждым компонентом

Цель: Показать технологии каждого компонента и data flow между ними.

Проверка знаний
В C4 модели какой уровень диаграммы показывает внешних пользователей и системы, а какой — внутренние компоненты pipeline?
Ответ
System Context (Level 1) показывает внешних пользователей и системы, с которыми взаимодействует CDC pipeline — бизнес-аналитиков, e-commerce приложение, BigQuery. Container Diagram (Level 2) показывает внутренние компоненты: Aurora PostgreSQL, Kafka, Debezium Connect, PyFlink, BigQuery Sink и их взаимодействия.

Technical Requirements по компонентам

Детальные требования для каждого слоя вашей архитектуры.

1. Aurora PostgreSQL / PostgreSQL

Ваша задача: Создать source database с logical replication и outbox table.

Configuration Requirements

-- Если используете локальный PostgreSQL (для тестирования)
-- Включить logical replication в postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

-- Если используете Aurora PostgreSQL
-- Установить через parameter group:
-- rds.logical_replication = 1

Outbox Table Schema

CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregatetype VARCHAR(255) NOT NULL,  -- Routing key (e.g., "orders")
    aggregateid VARCHAR(255) NOT NULL,    -- Entity ID (e.g., order_id)
    type VARCHAR(255) NOT NULL,           -- Event type (e.g., "OrderCreated")
    payload JSONB NOT NULL,               -- Event data
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- КРИТИЧНО: REPLICA IDENTITY FULL для DELETE событий
ALTER TABLE outbox REPLICA IDENTITY FULL;

-- Index для cleanup queries (удаление старых событий)
CREATE INDEX idx_outbox_created_at ON outbox(created_at);

Replication User

-- Создать пользователя для Debezium
CREATE USER debezium_user WITH REPLICATION LOGIN PASSWORD 'secure_password';

-- Предоставить права на чтение outbox table
GRANT SELECT ON outbox TO debezium_user;
GRANT USAGE ON SCHEMA public TO debezium_user;

Validation checklist:

  • SHOW wal_level; возвращает logical
  • Outbox table создана с правильной schema
  • REPLICA IDENTITY FULL установлен
  • Replication user имеет права REPLICATION и SELECT

2. Debezium Connector Configuration

Ваша задача: Настроить Debezium с Outbox Event Router SMT.

Connector JSON Config

{
  "name": "aurora-outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium_user",
    "database.password": "secure_password",
    "database.dbname": "ecommerce",
    "database.server.name": "aurora_prod",

    "table.include.list": "public.outbox",
    "plugin.name": "pgoutput",
    "publication.autocreate.mode": "filtered",
    "slot.name": "debezium_outbox_slot",

    "heartbeat.interval.ms": "10000",
    "snapshot.mode": "when_needed",

    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.route.by.field": "aggregatetype",
    "transforms.outbox.route.topic.replacement": "outbox.event.${routedByValue}",
    "transforms.outbox.table.fields.additional.placement": "type:header:eventType,created_at:envelope:timestamp",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"
  }
}

Ключевые параметры:

ParameterValuePurpose
table.include.listpublic.outboxЗахватывать только outbox table
plugin.namepgoutputBuilt-in PostgreSQL plugin (Aurora compatible)
heartbeat.interval.ms10000Keep replication slot alive (prevent WAL bloat)
snapshot.modewhen_neededAuto-detect if snapshot required
transforms.outbox.route.by.fieldaggregatetypeRoute events to topics by aggregate type
transforms.outbox.route.topic.replacementoutbox.event.${routedByValue}Topic naming pattern

Validation checklist:

  • Connector status = RUNNING (GET /connectors/aurora-outbox-connector/status)
  • Replication slot создан (SELECT * FROM pg_replication_slots)
  • Kafka topic outbox.event.orders содержит события

Ваша задача: Создать PyFlink job для transformations перед отправкой в BigQuery.

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col, lit

# Initialize Flink Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Configure checkpointing (fault tolerance)
table_env.get_config().set("execution.checkpointing.interval", "60000")  # 60s
table_env.get_config().set("parallelism.default", "2")
table_env.get_config().set("table.exec.source.cdc-events-duplicate", "true")

# Define Kafka source with Debezium format
source_ddl = """
    CREATE TABLE orders_cdc (
        order_id BIGINT,
        customer_id BIGINT,
        product_id BIGINT,
        status STRING,
        total_amount DECIMAL(10, 2),
        created_at TIMESTAMP(3),
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'outbox.event.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'pyflink-cdc-processor',
        'format' = 'debezium-json',
        'scan.startup.mode' = 'earliest-offset',
        'debezium-json.schema-include' = 'false'
    )
"""
table_env.execute_sql(source_ddl)

# Define transformation view
transform_sql = """
    CREATE VIEW enriched_orders AS
    SELECT
        order_id,
        customer_id,
        product_id,
        status,
        total_amount,
        created_at,
        CASE
            WHEN status = 'completed' THEN 'FULFILLED'
            WHEN status = 'cancelled' THEN 'CANCELLED'
            ELSE 'PENDING'
        END AS fulfillment_status,
        CURRENT_TIMESTAMP AS processed_at
    FROM orders_cdc
"""
table_env.execute_sql(transform_sql)

# Define BigQuery sink (via Kafka topic)
sink_ddl = """
    CREATE TABLE orders_bigquery_sink (
        order_id BIGINT,
        customer_id BIGINT,
        product_id BIGINT,
        status STRING,
        total_amount DECIMAL(10, 2),
        created_at TIMESTAMP(3),
        fulfillment_status STRING,
        processed_at TIMESTAMP(3),
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'bigquery.orders',
        'properties.bootstrap.servers' = 'kafka:9092',
        'format' = 'json'
    )
"""
table_env.execute_sql(sink_ddl)

# Execute streaming query
table_env.execute_sql("""
    INSERT INTO orders_bigquery_sink
    SELECT * FROM enriched_orders
""").wait()

Key Patterns:

  • Debezium format: 'format' = 'debezium-json' автоматически парсит CDC envelope
  • Checkpointing: execution.checkpointing.interval обеспечивает fault tolerance
  • Duplicate handling: table.exec.source.cdc-events-duplicate=true для at-least-once semantics
  • Transformations: SQL VIEW для enrichment логики

Validation checklist:

  • PyFlink job запущен без ошибок
  • Checkpoints успешно завершаются (логи Flink)
  • Kafka topic bigquery.orders содержит transformed events
  • Transformations применяются корректно (проверить fulfillment_status)

4. BigQuery CDC Table

Ваша задача: Создать BigQuery table с primary key для CDC ingestion.

BigQuery Table DDL

CREATE TABLE `your-project.your_dataset.orders` (
    order_id INT64 NOT NULL,
    customer_id INT64 NOT NULL,
    product_id INT64 NOT NULL,
    status STRING,
    total_amount NUMERIC(10, 2),
    created_at TIMESTAMP,
    fulfillment_status STRING,
    processed_at TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
) OPTIONS (
    max_staleness = INTERVAL 15 MINUTE,
    description = "Orders table with CDC ingestion from Aurora via PyFlink"
);

Критические детали:

RequirementExplanation
PRIMARY KEY (order_id) NOT ENFORCEDBigQuery CDC требует explicit primary key declaration
NOT ENFORCEDBigQuery не enforce uniqueness (responsibility на источнике)
max_staleness = INTERVAL 15 MINUTEBalances freshness vs cost (avoid < 5 min for cost)

BigQuery Connector Configuration (via Kafka Connect):

{
  "name": "bigquery-sink-connector",
  "config": {
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "topics": "bigquery.orders",
    "project": "your-gcp-project",
    "defaultDataset": "your_dataset",
    "keyfile": "/path/to/service-account-key.json",
    "autoCreateTables": "false",
    "upsertEnabled": "true",
    "deleteEnabled": "true",
    "intermediateTableSuffix": "_temp",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter"
  }
}

Validation checklist:

  • BigQuery table создана с PRIMARY KEY
  • Connector status = RUNNING
  • Sample INSERT event появился в BigQuery
  • UPDATE event применился (UPSERT работает)
  • DELETE event удалил строку

5. Monitoring Infrastructure

Ваша задача: Настроить Prometheus + Grafana для observability.

Prometheus Configuration

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'kafka-connect'
    static_configs:
      - targets: ['kafka-connect:9999']  # JMX exporter port

  - job_name: 'prometheus'
    static_configs:
      - targets: ['localhost:9090']

Key Metrics to Monitor

MetricDescriptionAlert Threshold
debezium_metrics_MilliSecondsBehindSourceReplication lag (ms)> 10000 (10s)
debezium_metrics_NumberOfEventsFilteredFiltered events countMonitor for drops
kafka_connect_connector_statusConnector state (1=running, 0=failed)< 1 (failed)
kafka_consumer_lagPyFlink consumer lag> 1000 messages
flink_taskmanager_job_task_numRecordsInPerSecondPyFlink throughputMonitor for zero

Grafana Dashboard Structure

Panels:

  1. Connector Status - Gauge showing RUNNING/FAILED
  2. Replication Lag - Time series graph (ms behind source)
  3. Events/Second - Throughput graph
  4. Kafka Consumer Lag - PyFlink consumption lag
  5. Checkpoint Duration - PyFlink checkpoint completion time

Validation checklist:

  • Prometheus scrapes JMX metrics (check targets at localhost:9090/targets)
  • Grafana dashboard shows live metrics
  • Alert configured for: connector failure, lag > 10s, error rate > 0

Deliverables Checklist

1. Working Code

  • infrastructure/docker-compose.yml - Full stack (PostgreSQL, Kafka, Connect, Prometheus, Grafana)
  • database/schema.sql - Outbox table DDL
  • infrastructure/debezium/connector.json - Debezium connector config
  • pyflink-jobs/cdc_processor.py - PyFlink transformation job
  • pyflink-jobs/requirements.txt - Python dependencies
  • bigquery/schema.sql - BigQuery table DDL

2. Documentation

  • README.md - Project overview, setup instructions, quick start
  • docs/architecture.md - C4 diagrams (System Context + Container)
  • docs/runbook.md - Operational procedures for common failures
  • docs/testing-strategy.md - How to validate end-to-end functionality

3. Monitoring

  • infrastructure/monitoring/prometheus.yml - Prometheus scrape config
  • monitoring/dashboards/debezium-overview.json - Grafana dashboard export
  • monitoring/alerts/debezium-alerts.yml - Alert rules

4. Testing Evidence

  • Screenshots or logs showing:
    • Outbox table INSERT
    • Debezium captured event in Kafka
    • PyFlink processed event
    • Event appeared in BigQuery
  • Chaos testing results:
    • Connector restart without data loss
    • PyFlink restart from checkpoint
Проверка знаний
Почему snapshot.mode=always является anti-pattern для production capstone проекта?
Ответ
snapshot.mode=always выполняет полный re-snapshot при каждом перезапуске коннектора, игнорируя существующий offset. Для outbox table это означает повторную отправку ВСЕХ записей в Kafka, создавая массовые дубликаты. В production следует использовать snapshot.mode=when_needed — snapshot выполняется только при отсутствии offset или невозможности продолжить с сохраненной позиции.

Anti-Patterns to Avoid

❌ Anti-Pattern 1: Missing REPLICA IDENTITY FULL

Проблема: Debezium events для UPDATE/DELETE не содержат полных данных.

Решение:

ALTER TABLE outbox REPLICA IDENTITY FULL;

❌ Anti-Pattern 2: No Primary Key in BigQuery

Проблема: BigQuery CDC ingestion fails без explicit primary key.

Решение:

CREATE TABLE orders (
    order_id INT64 NOT NULL,
    ...,
    PRIMARY KEY (order_id) NOT ENFORCED
);

❌ Anti-Pattern 3: snapshot.mode=always

Проблема: Каждый restart connector делает full re-snapshot, дублируя данные.

Решение:

"snapshot.mode": "when_needed"

❌ Anti-Pattern 4: Нет мониторинга replication slot

Проблема: WAL bloat заполняет диск, непонятно когда lag растет.

Решение:

-- Регулярно проверять
SELECT
  slot_name,
  active,
  pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag
FROM pg_replication_slots
WHERE slot_name = 'debezium_outbox_slot';

❌ Anti-Pattern 5: Hardcoded credentials

Проблема: Passwords в git repository, security risk.

Решение:

"database.password": "${file:/secrets/db-password.txt:password}"

Проблема: Restart теряет state, дублирует обработку.

Решение:

table_env.get_config().set("execution.checkpointing.interval", "60000")

Production Patterns to Follow

✅ Pattern 1: Heartbeat для replication slot

Зачем: Предотвращает WAL bloat при отсутствии событий.

"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "INSERT INTO heartbeat (ts) VALUES (NOW()) ON CONFLICT (id) DO UPDATE SET ts=EXCLUDED.ts"

✅ Pattern 2: At-least-once delivery с idempotency

Зачем: CDC дает at-least-once; downstream должен быть idempotent.

# PyFlink config
table_env.get_config().set("table.exec.source.cdc-events-duplicate", "true")

# BigQuery uses PRIMARY KEY for automatic UPSERT

✅ Pattern 3: Monitoring Four Golden Signals

Зачем: SRE best practice для observability.

SignalMetricMeaning
LatencyReplication lag (ms behind source)How fresh is data?
TrafficEvents/second throughputSystem load
ErrorsConnector status, error rateFailure detection
SaturationCPU/memory utilizationCapacity planning

✅ Pattern 4: Runbook для common failures

Зачем: Быстрый response при incidents.

Пример runbook entries:

  1. Connector fails with “too many replication slots”

    • Symptom: Connector status = FAILED
    • Diagnosis: SELECT * FROM pg_replication_slots
    • Remediation: Drop unused slots or increase max_replication_slots
  2. WAL segments no longer exist

    • Symptom: Connector fails with “requested WAL segment … no longer exists”
    • Diagnosis: Check wal_keep_size parameter
    • Remediation: Re-snapshot with snapshot.mode=initial or restore from backup

Ключевые выводы

  1. Project structure критичен: Разделение infrastructure, code, documentation облегчает maintenance
  2. C4 diagrams обязательны: System Context показывает external integrations, Container показывает internal components
  3. Technical requirements детальны: Каждый компонент имеет explicit checklist для validation
  4. Outbox table требует REPLICA IDENTITY FULL для DELETE событий
  5. Debezium Outbox Event Router SMT роутирует события по aggregatetype field
  6. PyFlink Table API упрощает CDC processing с Debezium format support
  7. BigQuery CDC требует PRIMARY KEY (NOT ENFORCED) для UPSERT/DELETE
  8. Monitoring = Four Golden Signals: Latency, Traffic, Errors, Saturation
  9. Anti-patterns избегать: No REPLICA IDENTITY, no primary key, snapshot.mode=always, hardcoded credentials
  10. Production patterns: Heartbeat, at-least-once + idempotency, monitoring, runbook

Что дальше?

Вы готовы начать работу над capstone проектом. Используйте:

  • Project structure как шаблон для организации кода
  • C4 diagrams для документирования архитектуры
  • Technical requirements как checklist при реализации
  • Anti-patterns как warning signs при code review
  • Production patterns как best practices

Время применить все знания курса в едином проекте. Удачи!

Check Your Understanding

Score: 0 of 0
Applied
Question 1 of 4. В capstone проекте для outbox table установлен REPLICA IDENTITY FULL. Если пропустить эту настройку, какие CDC операции будут затронуты и почему?

Finished the lesson?

Mark it as complete to track your progress