Перейти к содержанию
Learning Platform
Продвинутый
40 минут
ETL ELT Data Lake Parquet Delta Lake CDC Patterns

Требуемые знания:

  • module-5/05-pyspark-structured-streaming

ETL/ELT паттерны с CDC данными

Вы настроили PySpark для чтения CDC событий из Kafka. Но куда писать результаты? Как организовать data flow от transactional database через CDC stream в analytical data warehouse? ETL и ELT — это два фундаментальных подхода к построению data pipelines. В этом уроке мы изучим, как применить эти паттерны к CDC данным, какие challenges возникают, и как их решать в production.

ETL vs ELT: современный подход

ETL (Extract, Transform, Load) и ELT (Extract, Load, Transform) — это архитектурные паттерны для data integration.

Traditional ETL

Traditional ETL Pattern

Extract → Transform → Load с трансформациями перед загрузкой

Source DBPostgreSQL
Extract
Staging Area
Transform
ETL EngineFilter, aggregate, join, clean
Load
Data WarehouseClean, aggregated
Традиционный подход:

ETL трансформирует данные перед загрузкой в warehouse. Проблема: нет raw data для re-processing, сложность изменений (новая метрика = переделка ETL).

ETL паттерн:

  1. Extract — читаем данные из source
  2. Transform — применяем бизнес-логику, очистка, агрегации
  3. Load — загружаем готовые данные в warehouse

Плюсы:

  • Warehouse хранит только clean data
  • Меньше storage requirements
  • Controlled data quality

Минусы:

  • ETL bottleneck — трансформации могут быть медленными
  • Потеря raw data — нет возможности re-process
  • Сложность изменений — новая метрика → переделка ETL

Modern ELT

Modern ELT Pattern с CDC

Extract → Load → Transform с трансформациями в warehouse

Source DBPostgreSQL
Extract
CDC StreamDebezium
Load Raw
Data LakeParquet, Delta
Transform
Data Warehousedbt, Spark SQL
Современный подход:

ELT загружает raw data в lake, трансформации в warehouse. Преимущества: raw data для re-processing, гибкость (новая метрика = новый SQL query), warehouse выполняет трансформации.

ELT паттерн:

  1. Extract — CDC capture changes
  2. Load — записываем raw events в data lake
  3. Transform — трансформации выполняются в warehouse (SQL, dbt)

Плюсы:

  • Raw data сохранен — можно re-process
  • Warehouse выполняет трансформации — использует свою compute power
  • Гибкость — новая метрика = новый SQL query

Минусы:

  • Больше storage (храним raw + transformed)
  • Data quality проверки downstream

Modern trend: ELT становится стандартом с появлением cloud data warehouses (Snowflake, BigQuery, Redshift) и cheap storage (S3, Azure Blob).

CDC в контексте ETL/ELT

CDC меняет игру: Вместо periodic full dumps мы получаем continuous incremental updates.

CDC enables both patterns:

  • ETL with CDC: Transform в Kafka Streams/Flink → load clean data
  • ELT with CDC: Load raw CDC events → transform в warehouse

В этом уроке: Фокус на ELT with CDC — как спроектировать data lake для CDC events.

Проверка знаний
Почему ELT подход лучше ETL для CDC данных, если бизнес-требования могут измениться? Какое ключевое преимущество даёт сохранение raw CDC событий?
Ответ
В ETL трансформация происходит до загрузки: если бизнес захочет новую метрику, придётся переделать ETL pipeline и перезапустить обработку. В ELT raw CDC события сохраняются в data lake (Parquet/Delta), а трансформации выполняются в warehouse через SQL/dbt. Новая метрика = новый SQL query, без изменения pipeline. Кроме того, raw CDC данные содержат полную историю изменений (before/after, timestamps, operation types), что позволяет восстановить состояние на любой момент времени (time travel) и провести audit trail.

CDC как источник для Data Lake

Data Lake — это centralized repository для хранения raw data в native format (Parquet, ORC, Avro).

Advantages of CDC → Data Lake

CDC → Data Lake: Multi-Layer Architecture

Один CDC stream → несколько output layers

Transactional DatabaseCustomer changes (INSERT, UPDATE, DELETE)
CDC capture
Data Lake (Parquet)
Raw CDC events
Append-only log

All change events

+ metadata columns

Latest snapshot
Current state

One row per entity

Latest values only

Change history
Audit trail

All historical values

With timestamps

Три слоя data lake:

CDC stream записывается как raw events (append-only). Downstream processing создает snapshot (current state) и history (audit trail). Один source → три output layers для разных use cases.

Преимущества:

  1. Incremental updates — только изменения, не full table dumps
  2. Change history — полная история изменений для audit
  3. Audit trail — кто, когда, что изменил (via CDC metadata)
  4. Time travel — можно восстановить состояние на любую дату
  5. Low latency — real-time updates вместо batch loads

Challenges:

  1. Operation types — INSERT, UPDATE, DELETE требуют разной обработки
  2. Ordering — CDC события могут arrive out-of-order
  3. Schema evolution — schema changes в source database
  4. Compaction — history растет unbounded без cleanup

Target Formats

FormatКогда использоватьОсобенности
ParquetAppend-only history, simple queriesColumnar, good compression, immutable
Delta LakeACID updates, time travel, upsertsACID transactions, MERGE support, versioning
Apache IcebergSchema evolution, partition evolutionNetflix/Apple developed, advanced features

Для CDC: Delta Lake или Iceberg рекомендуются для production (ACID upserts). Parquet подходит для append-only scenarios.

Паттерн: Append-Only History

Идея: Сохранять все CDC события как append-only log. Never update or delete.

Architecture

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, current_timestamp, to_date, lit

spark = SparkSession.builder \
    .appName("CDC-Append-Only-History") \
    .config("spark.sql.streaming.checkpointLocation", "/data/checkpoints/history") \
    .getOrCreate()

# Read CDC stream
cdc_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.public.orders") \
    .load()

# Parse and add metadata
parsed_df = cdc_df.select(
    from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")

# Add metadata columns
history_df = parsed_df.select(
    col("after.*"),  # or before.* for deletes
    col("op").alias("_operation"),  # c, u, d, r
    col("ts_ms").alias("_cdc_timestamp"),
    current_timestamp().alias("_processed_at"),
    col("source.db").alias("_source_db"),
    col("source.table").alias("_source_table")
).withColumn(
    "_processed_date",
    to_date(col("_processed_at"))
)

# Write to Parquet with date partitioning
query = history_df \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/lake/orders_history") \
    .option("checkpointLocation", "/data/checkpoints/orders_history") \
    .partitionBy("_processed_date") \
    .trigger(processingTime="30 seconds") \
    .start()

query.awaitTermination()

Metadata columns:

  • _operation — тип операции (c=create, u=update, d=delete, r=read/snapshot)
  • _cdc_timestamp — timestamp от database transaction commit
  • _processed_at — timestamp когда Spark обработал событие
  • _source_db / _source_table — source metadata для multi-table pipelines

Partitioning strategy:

/data/lake/orders_history/
├── _processed_date=2026-01-01/
│   ├── part-00000.parquet
│   └── part-00001.parquet
├── _processed_date=2026-01-02/
│   └── part-00000.parquet

Зачем партиционирование по дате:

  • Efficient pruning для time-range queries
  • Easy cleanup старых partitions
  • Simplified backfill logic

Querying history:

-- Get all changes for specific order
SELECT * FROM orders_history
WHERE id = 12345
ORDER BY _cdc_timestamp;

-- Get state at specific point in time
SELECT * FROM (
    SELECT *, ROW_NUMBER() OVER (
        PARTITION BY id
        ORDER BY _cdc_timestamp DESC
    ) as rn
    FROM orders_history
    WHERE _cdc_timestamp <= '2026-01-15 10:00:00'
)
WHERE rn = 1 AND _operation != 'd';

Production tip: Append-only history идеально для compliance (GDPR audit trail), но растет unbounded. Нужна retention policy (например, delete partitions > 2 years).

Паттерн: Separate Streams by Operation

Идея: Разделить CDC stream на отдельные потоки по типу операции. INSERT → append table, UPDATE → upsert logic, DELETE → soft delete.

Architecture

Append-Only History с Metadata

Добавление metadata columns для downstream processing

Source Database
Debezium
Kafka Topic
PySpark + Metadata
_operation
c, u, d, r
_cdc_timestamp
from ts_ms
_processed_at
current_timestamp()
_source_db / _source_table
from source metadata
Parquet SinkWith metadata columns
Metadata columns essential:

Metadata columns критически важны для troubleshooting, lag monitoring, правильной обработки operation types, и multi-table pipelines. Всегда добавляйте их при записи CDC data в data lake.

Implementation:

from pyspark.sql.functions import when

# Read and parse CDC stream
parsed_df = kafka_df.select(
    from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")

# Separate by operation type
inserts = parsed_df \
    .filter(col("op") == "c") \
    .select("after.*") \
    .withColumn("created_at", current_timestamp())

updates = parsed_df \
    .filter(col("op") == "u") \
    .select("after.*") \
    .withColumn("updated_at", current_timestamp())

deletes = parsed_df \
    .filter(col("op") == "d") \
    .select(
        col("before.id"),
        current_timestamp().alias("deleted_at")
    )

# Write inserts to append-only table
inserts_query = inserts \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/lake/orders_inserts") \
    .option("checkpointLocation", "/data/checkpoints/inserts") \
    .start()

# Write updates to separate location for MERGE processing
updates_query = updates \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/lake/orders_updates") \
    .option("checkpointLocation", "/data/checkpoints/updates") \
    .start()

# Write deletes for soft-delete processing
deletes_query = deletes \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/lake/orders_deletes") \
    .option("checkpointLocation", "/data/checkpoints/deletes") \
    .start()

Зачем разделять потоки:

  1. Different processing logic — INSERT просто append, UPDATE требует upsert, DELETE требует soft-delete
  2. Different schemas — DELETE имеет только before fields
  3. Optimization — можно настроить разные partitioning/compression strategies
  4. Monitoring — отдельные metrics для каждого operation type

Downstream processing:

  • Inserts: Просто append в финальную таблицу
  • Updates: Batch MERGE job (hourly/daily) для применения updates
  • Deletes: Batch job для soft-delete (set deleted_at timestamp)

Паттерн: Merge/Upsert с Delta Lake

Delta Lake предоставляет ACID transactions и MERGE command для atomic upserts.

Delta Lake MERGE

from delta.tables import DeltaTable

# Initialize Delta Lake
spark.sparkContext.setSystemProperty("spark.databricks.delta.retentionDurationCheck.enabled", "false")

# Read CDC stream
cdc_stream = parsed_df.select(
    col("after.*"),
    col("op")
).filter(col("op").isin("c", "u", "d"))

# Function to merge batch into Delta table
def merge_to_delta(batch_df, batch_id):
    """Merge CDC batch into Delta Lake table."""

    # Target Delta table path
    delta_path = "/data/lake/orders_delta"

    # Create table if doesn't exist
    if not DeltaTable.isDeltaTable(spark, delta_path):
        batch_df.limit(0).write.format("delta").save(delta_path)

    delta_table = DeltaTable.forPath(spark, delta_path)

    # MERGE logic
    delta_table.alias("target").merge(
        batch_df.alias("source"),
        "target.id = source.id"
    ).whenMatchedDelete(
        condition="source.op = 'd'"
    ).whenMatchedUpdateAll(
        condition="source.op = 'u'"
    ).whenNotMatchedInsertAll(
        condition="source.op = 'c'"
    ).execute()

# Apply merge using foreachBatch
query = cdc_stream \
    .writeStream \
    .foreachBatch(merge_to_delta) \
    .option("checkpointLocation", "/data/checkpoints/delta_merge") \
    .trigger(processingTime="1 minute") \
    .start()

MERGE logic explained:

  1. whenMatchedDelete — если id существует и op = 'd' → DELETE row
  2. whenMatchedUpdateAll — если id существует и op = 'u' → UPDATE all columns
  3. whenNotMatchedInsertAll — если id не существует и op = 'c' → INSERT new row

Advantages:

  • ACID guarantees — atomic merge operations
  • Time travel — можно query historical versions
  • Schema evolution — automatic schema merging
  • Optimizations — Z-ordering, data skipping

Performance tip: MERGE операции дорогие. Используйте batching (trigger каждую минуту, не каждое событие).

Проверка знаний
Почему Delta Lake MERGE предпочтительнее append-only Parquet для CDC данных, если нужен актуальный snapshot таблицы? Какое ограничение Parquet это создаёт?
Ответ
Parquet -- immutable формат: файлы не изменяются после записи. Для CDC с UPDATE и DELETE операциями append-only Parquet сохраняет все версии строки, но для получения актуального snapshot нужен дорогой запрос с ROW_NUMBER() OVER (PARTITION BY id ORDER BY timestamp DESC). Delta Lake MERGE выполняет atomic upsert: при UPDATE перезаписывает строку, при DELETE удаляет. Результат -- актуальная таблица без дополнительных запросов. Компромисс: MERGE дороже append, но downstream queries быстрее и проще.

Streaming to Parquet

Parquet — это columnar storage format, оптимизированный для analytical queries.

Configuration

from pyspark.sql.functions import date_format

# Read CDC stream
cdc_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "dbserver1.public.orders") \
    .load()

parsed_df = cdc_df.select(
    from_json(col("value").cast("string"), cdc_schema).alias("data")
).select("data.payload.*")

# Extract current state (after field for c/u, before for d)
current_state = parsed_df.select(
    when(col("op").isin("c", "u", "r"), col("after.id")).otherwise(col("before.id")).alias("id"),
    when(col("op").isin("c", "u", "r"), col("after.customer_id")).otherwise(col("before.customer_id")).alias("customer_id"),
    when(col("op").isin("c", "u", "r"), col("after.total")).otherwise(None).alias("total"),
    when(col("op").isin("c", "u", "r"), col("after.status")).otherwise(None).alias("status"),
    col("op").alias("_operation"),
    col("ts_ms").alias("_cdc_timestamp"),
    current_timestamp().alias("_processed_at")
).withColumn(
    "_year_month",
    date_format(col("_processed_at"), "yyyy-MM")
)

# Write to Parquet with partitioning
query = current_state \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "/data/lake/orders") \
    .option("checkpointLocation", "/data/checkpoints/orders_parquet") \
    .option("compression", "snappy") \
    .partitionBy("_year_month") \
    .trigger(processingTime="30 seconds") \
    .start()

Partitioning strategies:

StrategyPatternWhen to use
By date_year_month=2026-01Time-range queries, retention policies
By table_table=ordersMulti-table CDC pipeline
By operation_operation=cSeparate analytics for inserts/updates
By dimensionregion=EUGeographic partitioning

Trigger options:

# Process as fast as possible (default)
.trigger(processingTime="0 seconds")

# Process every 30 seconds
.trigger(processingTime="30 seconds")

# Process once and stop (batch-like)
.trigger(once=True)

# Process all available data (Spark 3.3+)
.trigger(availableNow=True)

Compression codecs:

  • snappy — fast compression/decompression, moderate compression ratio (default)
  • gzip — high compression ratio, slower
  • zstd — balanced, good for modern workloads

Production recommendation: snappy для hot data (recent), zstd для cold data (archival).

Exactly-Once в PySpark Streaming

Critical limitation: PySpark Structured Streaming предоставляет at-least-once semantics для Kafka sink.

PySpark Guarantees

Operation Separation Pattern

Разделение CDC stream по типу операции

CDC StreamMixed operations
Operation Type Filter
op='c'INSERT
inserts_parquet/
Append-only
op='u'UPDATE
updates_parquet/
Requires MERGE
op='d'DELETE
deletes_parquet/
Soft/hard delete
Зачем разделять по operation type:

Разные операции требуют разной обработки downstream. INSERT — простой append, UPDATE — MERGE/upsert, DELETE — soft-delete. Разделение упрощает downstream logic и позволяет настроить separate partitioning, compression, monitoring для каждого типа.

What PySpark provides:

  1. Kafka source → Spark: Exactly-once (via checkpointed offsets)
  2. Spark → Parquet/Delta: Idempotent writes (same batch writes same files)
  3. Spark → Kafka: At-least-once only (no transactional writes)

Why at-least-once to Kafka:

  • PySpark не поддерживает Kafka transactional producer API
  • При failure и retry → duplicate messages в output Kafka topic
  • PyFlink поддерживает exactly-once to Kafka через transactional writes

Solutions for Exactly-Once

Option 1: Idempotent consumers

# Downstream consumer должен быть idempotent
# Example: Database upsert вместо insert
def process_message(msg):
    # INSERT ... ON CONFLICT UPDATE
    db.execute("""
        INSERT INTO orders (id, customer_id, total)
        VALUES (?, ?, ?)
        ON CONFLICT (id) DO UPDATE SET
            customer_id = EXCLUDED.customer_id,
            total = EXCLUDED.total
    """, (msg.id, msg.customer_id, msg.total))

Option 2: Deduplication with state store

# Store processed message IDs in Redis/database
def is_duplicate(message_id):
    return redis.sismember("processed_messages", message_id)

def mark_processed(message_id):
    redis.sadd("processed_messages", message_id)
    redis.expire("processed_messages", 86400)  # 24h TTL

Option 3: Use PyFlink instead

If exactly-once to Kafka is critical → use PyFlink with transactional sink.

Production insight: Для CDC → data lake пайплайнов at-least-once обычно acceptable (idempotent writes). Для CDC → Kafka → другой consumer рекомендуется PyFlink.

Мониторинг streaming pipeline

Streaming query metrics доступны через StreamingQueryListener API.

Key Metrics

from pyspark.sql.streaming import StreamingQueryListener

class MetricsListener(StreamingQueryListener):
    def onQueryProgress(self, event):
        progress = event.progress

        print(f"Batch: {progress.batchId}")
        print(f"Input rows: {progress.numInputRows}")
        print(f"Processed rows/sec: {progress.processedRowsPerSecond}")
        print(f"Batch duration: {progress.batchDuration}ms")

        # Check for lag
        if progress.processedRowsPerSecond < 100:
            print("WARNING: Processing lag detected!")

    def onQueryStarted(self, event):
        print(f"Query started: {event.id}")

    def onQueryTerminated(self, event):
        print(f"Query terminated: {event.id}")

# Register listener
spark.streams.addListener(MetricsListener())

Metrics to monitor:

  • inputRowsPerSecond — incoming data rate
  • processedRowsPerSecond — processing throughput
  • batchDuration — time to process each micro-batch
  • Lag = input rate - processed rate — if lag growing → scaling needed

Alerting thresholds:

alerts:
  - name: Processing Lag
    condition: processedRowsPerSecond < inputRowsPerSecond
    for: 5m
    severity: warning

  - name: Batch Duration High
    condition: batchDuration > 60000  # 60 seconds
    for: 3m
    severity: critical

Лабораторная работа: CDC to Data Lake

Цель: Создать production-like ETL pipeline от CDC stream до Parquet data lake.

Задание:

  1. Прочитать CDC события из dbserver1.public.orders
  2. Добавить metadata columns: _operation, _cdc_timestamp, _processed_at
  3. Партиционировать по _processed_date (date from _processed_at)
  4. Записать в Parquet format с compression snappy
  5. Trigger: каждые 30 секунд
  6. Query результаты через Spark SQL для проверки

Starter code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, current_timestamp, to_date, date_format
)

# TODO: Create SparkSession

# TODO: Define CDC schema

# TODO: Read from Kafka

# TODO: Parse JSON and add metadata columns

# TODO: Write to Parquet with partitioning

# TODO: Query Parquet files to verify
# spark.read.parquet("/data/lake/orders").show()

Expected directory structure:

/data/lake/orders/
├── _processed_date=2026-02-01/
│   ├── part-00000-xxx.snappy.parquet
│   └── part-00001-xxx.snappy.parquet
└── _processed_date=2026-02-02/
    └── part-00000-xxx.snappy.parquet

Verification query:

df = spark.read.parquet("/data/lake/orders")

# Check schema
df.printSchema()

# Count by operation type
df.groupBy("_operation").count().show()

# Check partitions
df.groupBy("_processed_date").count().show()

# Query specific order history
df.filter(col("id") == 12345).orderBy("_cdc_timestamp").show()

Что дальше?

Вы изучили ETL vs ELT подходы, паттерны для CDC → data lake integration, и практические аспекты работы с Parquet и Delta Lake.

Но остается вопрос: Как использовать CDC данные для real-time machine learning? Как извлекать features из streaming CDC events? Как обновлять feature store для online predictions?

В следующем уроке мы изучим Real-time Feature Engineering — как построить ML pipeline поверх CDC stream для fraud detection, recommendation systems, и real-time personalization.

Ключевые выводы

  1. ETL трансформирует данные перед загрузкой, ELT загружает raw и трансформирует в warehouse
  2. Modern trend: ELT с cheap storage и powerful cloud warehouses
  3. CDC enables incremental ELT — continuous updates вместо batch dumps
  4. Append-only history паттерн сохраняет все CDC события для audit trail
  5. Separate streams by operation позволяет применять разную логику для INSERT/UPDATE/DELETE
  6. Delta Lake MERGE предоставляет ACID upserts для CDC data
  7. Parquet partitioning критически важно для query performance и retention policies
  8. Metadata columns (_operation, _cdc_timestamp, _processed_at) essential для troubleshooting
  9. PySpark provides at-least-once для Kafka sink (не exactly-once)
  10. Idempotent consumers или PyFlink needed для exactly-once end-to-end
  11. Monitor processedRowsPerSecond для detection processing lag
  12. Trigger configuration влияет на latency vs efficiency tradeoff

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Чем ELT подход отличается от ETL при использовании CDC в качестве источника данных?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс