Capstone implementation — full Python код
Этот урок — полная implementation capstone DAG из предыдущего урока. Production-grade Python код объединяющий все killer-темы курса. Code в этом уроке can быть скопирован в production codebase почти as-is — после adjusting к вашим infrastructure specifics.
Структура файлов:
dags/
├── capstone/
│ ├── orders_etl_realtime.py # Main DAG
│ ├── datasets.py # Dataset definitions
│ └── helpers.py # Shared utilities
├── plugins/
│ ├── xcom_backends/
│ │ └── s3_xcom_backend.py # Custom XCom (S3 storage)
│ └── operators/
│ └── clickhouse_iceberg_load.py # Custom operator
└── config/
└── capstone_config.yaml # Configuration
Step 1: Dataset definitions
# dags/capstone/datasets.py
"""Centralized dataset definitions for capstone pipeline."""
from airflow import Dataset
# Source datasets — обновляются Debezium/Kafka consumers
kafka_orders = Dataset("kafka://confluent.cloud/orders.public.orders")
kafka_line_items = Dataset("kafka://confluent.cloud/orders.public.line_items")
kafka_customers = Dataset("kafka://confluent.cloud/orders.public.customers")
# Intermediate datasets — обновляются нашим DAG
s3_staging_orders = Dataset("s3://datalake/staging/orders/")
s3_staging_line_items = Dataset("s3://datalake/staging/line_items/")
s3_staging_customers = Dataset("s3://datalake/staging/customers/")
# Output datasets — потребляются downstream DAGs
iceberg_bronze_orders = Dataset("iceberg://lakehouse/bronze.orders_raw")
iceberg_silver_orders = Dataset("iceberg://lakehouse/silver.orders_clean")
iceberg_gold_daily_revenue = Dataset("iceberg://lakehouse/gold.daily_revenue")
clickhouse_dashboards = Dataset("clickhouse://clickhouse.internal/dashboards.daily_revenue")
Step 2: Custom XCom backend для large dataframes
# plugins/xcom_backends/s3_xcom_backend.py
"""Custom XCom backend storing large objects на S3.
XCom DB column limit ~48 KB; для dataframes / large dicts используем S3.
"""
import json
import pickle
import uuid
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
S3_BUCKET = "airflow-xcom"
SIZE_THRESHOLD = 4 * 1024 # 4 KB — small XComs stay в DB, large на S3
class S3XComBackend(BaseXCom):
"""
Custom XCom backend:
- Small values (<4 KB) — stored как обычно в DB
- Large values — pickled, uploaded к S3, only reference stored в DB
"""
PREFIX = "xcom:s3:"
@staticmethod
def serialize_value(value: Any, **kwargs) -> bytes:
"""Сериализация — если value большой, store на S3, return reference."""
# First — pickle для measuring size
pickled = pickle.dumps(value)
if len(pickled) < SIZE_THRESHOLD:
# Small — обычная сериализация в DB
return BaseXCom.serialize_value(value, **kwargs)
# Large — upload в S3
key = f"xcom/{uuid.uuid4()}.pickle"
hook = S3Hook(aws_conn_id="aws_default")
hook.load_bytes(
bytes_data=pickled,
bucket_name=S3_BUCKET,
key=key,
replace=True,
)
reference = {
"_s3_xcom": True,
"bucket": S3_BUCKET,
"key": key,
"size_bytes": len(pickled),
}
return BaseXCom.serialize_value(reference, **kwargs)
@staticmethod
def deserialize_value(result) -> Any:
"""Deserialize — если reference на S3, fetch + unpickle."""
value = BaseXCom.deserialize_value(result)
if isinstance(value, dict) and value.get("_s3_xcom"):
hook = S3Hook(aws_conn_id="aws_default")
content = hook.read_key(key=value["key"], bucket_name=value["bucket"])
return pickle.loads(content.encode() if isinstance(content, str) else content)
return value
Configuration в airflow.cfg:
[core]
xcom_backend = plugins.xcom_backends.s3_xcom_backend.S3XComBackend
Step 3: Main DAG — Full implementation
# dags/capstone/orders_etl_realtime.py
"""
Capstone — real-time orders ETL pipeline.
Sources: 3 Kafka topics (CDC от Postgres OLTP)
Sinks: Iceberg lakehouse + ClickHouse dashboards
Demonstrates: Datasets, Dynamic Mapping, Deferrable, Multiple Executors,
custom XCom backend, OpenLineage emission, HA scheduler+triggerer.
Airflow 2.10/2.11 LTS.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any
from airflow.decorators import dag, task
from airflow.lineage.entities import Table
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier
from dags.capstone.datasets import (
kafka_orders, kafka_line_items, kafka_customers,
s3_staging_orders, s3_staging_line_items, s3_staging_customers,
iceberg_bronze_orders, iceberg_silver_orders, iceberg_gold_daily_revenue,
clickhouse_dashboards,
)
# Configuration ВЫНЕСЕНА в config — bake в Git, не runtime fetch
KAFKA_TOPICS = ["orders.public.orders", "orders.public.line_items", "orders.public.customers"]
SPARK_K8S_NAMESPACE = "spark-jobs"
ICEBERG_CATALOG = "glue"
@dag(
dag_id="capstone_orders_etl_realtime",
# ВАЖНО: dataset-triggered, НЕ cron — модуль 08
schedule=[kafka_orders, kafka_line_items, kafka_customers],
start_date=datetime(2024, 1, 1), # Hardcoded — модуль 02 quiz
catchup=False, # ALWAYS False для новых DAGs — модуль 17.05
max_active_runs=3, # Resource control
tags=["capstone", "production", "critical", "etl"],
description="Real-time orders ETL: Kafka CDC → S3 → Spark → Iceberg → ClickHouse"
default_args={
"owner": "[email protected]",
"retries": 3,
"retry_delay": timedelta(minutes=2),
"retry_exponential_backoff": True,
"max_retry_delay": timedelta(minutes=30),
# Listener API handles alerting (модуль 12), не callback в каждой DAG
},
doc_md=__doc__,
)
def capstone_orders_etl():
"""Capstone DAG — full pipeline."""
# =========================================================================
# Phase 1 — Consume from Kafka topics
# Использует Dynamic Task Mapping — модуль 07
# =========================================================================
@task(
# Multiple Executors AIP-61 (2.10+) — light tasks на Celery
executor="CeleryExecutor",
# Lineage inlets/outlets — модуль 17.07
inlets=[Table(database="kafka", schema="public", name="orders")],
outlets=[s3_staging_orders],
)
def consume_kafka(topic: str, **context) -> dict:
"""Consume Kafka topic to S3 staging — one task per topic via mapping."""
from confluent_kafka import Consumer
import boto3
import json
from airflow.models import Variable
# ВАЖНО: Variable.get внутри task, не top-level (модуль 02 quiz)
# data_interval для idempotent processing (модуль 17.05)
interval_start = context["data_interval_start"]
interval_end = context["data_interval_end"]
ds = context["ds"]
kafka_conf = {
"bootstrap.servers": Variable.get("kafka_bootstrap"),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": Variable.get("kafka_username"),
"sasl.password": Variable.get("kafka_password"),
"group.id": f"airflow-{topic}-{ds}", # Deterministic group id
"auto.offset.reset": "earliest",
}
consumer = Consumer(kafka_conf)
consumer.subscribe([topic])
records = []
timeout_seconds = 60
end_time = datetime.utcnow().timestamp() + timeout_seconds
try:
while datetime.utcnow().timestamp() < end_time:
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
if msg.error():
continue
event = json.loads(msg.value())
# Filter to data_interval window — backfill-safe
event_ts = datetime.fromisoformat(event["ts_ms"])
if interval_start <= event_ts < interval_end:
records.append(event)
finally:
consumer.close()
# Deterministic S3 path (модуль 17.02)
topic_short = topic.split(".")[-1]
s3_key = f"staging/{topic_short}/dt={ds}/part-00000.json"
s3 = boto3.client("s3")
# Atomic write через tmp + rename (модуль 17.02)
tmp_key = f"{s3_key}.tmp"
s3.put_object(
Bucket="datalake"
Key=tmp_key,
Body=json.dumps(records).encode(),
ContentType="application/json",
)
s3.copy_object(
Bucket="datalake"
CopySource={"Bucket": "datalake", "Key": tmp_key},
Key=s3_key,
)
s3.delete_object(Bucket="datalake", Key=tmp_key)
return {
"topic": topic,
"s3_key": s3_key,
"record_count": len(records),
"data_interval_start": str(interval_start),
"data_interval_end": str(interval_end),
}
# Dynamic Mapping — один task definition × 3 topics
consume_results = consume_kafka.expand(topic=KAFKA_TOPICS)
# =========================================================================
# Phase 2 — Wait for all staging files (deferrable sensors)
# Модуль 09 — Deferrable Operators
# =========================================================================
@task(executor="CeleryExecutor")
def verify_staging(consume_results: list[dict]) -> list[dict]:
"""Verify все staging files exist on S3."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
hook = S3Hook(aws_conn_id="aws_default")
for result in consume_results:
exists = hook.check_for_key(bucket_name="datalake", key=result["s3_key"])
if not exists:
from airflow.exceptions import AirflowFailException
raise AirflowFailException(f"Staging file missing: {result['s3_key']}")
return consume_results
staging_verified = verify_staging(consume_results)
# =========================================================================
# Phase 3 — Spark transformation on Kubernetes
# Multiple Executors — модуль 05 / AIP-61
# =========================================================================
@task(
executor="KubernetesExecutor", # Heavy task — own pod
pool="spark_submission_pool", # Resource pool — модуль 11
retries=2,
retry_delay=timedelta(minutes=5),
inlets=[s3_staging_orders, s3_staging_line_items, s3_staging_customers],
outlets=[iceberg_bronze_orders, iceberg_silver_orders, iceberg_gold_daily_revenue],
executor_config={
"pod_override": {
"spec": {
"containers": [{
"name": "base",
"resources": {
"requests": {"cpu": "1", "memory": "2Gi"},
"limits": {"cpu": "2", "memory": "4Gi"},
},
}],
"serviceAccountName": "airflow-spark-runner",
},
},
},
)
def spark_transform(staging_results: list[dict], **context) -> dict:
"""Submit Spark job для transformation Kafka events → Iceberg."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import boto3
ds = context["ds"]
data_interval_start = context["data_interval_start"]
# Spark job config — bake в S3 (committed)
spark_args = {
"ds": ds,
"data_interval_start": str(data_interval_start),
"staging_paths": {r["topic"]: f"s3://datalake/{r['s3_key']}" for r in staging_results},
"iceberg_catalog": ICEBERG_CATALOG,
"iceberg_tables": {
"bronze": "lakehouse.bronze.orders_raw",
"silver": "lakehouse.silver.orders_clean",
"gold": "lakehouse.gold.daily_revenue",
},
}
# Submit Spark job через K8s API (kubectl apply)
k8s_apps_v1 = boto3.client("eks") # simplified
# ... full SparkApplication CRD submission (omitted для brevity) ...
# Wait for completion (synchronous) — в реальности через async sensor
# Здесь возвращаем metrics
return {
"rows_bronze": 1_500_000,
"rows_silver": 1_450_000, # After dedup/quality filter
"rows_gold": 1_250, # Daily aggregations
"duration_seconds": 320,
"spark_application_id": f"orders-etl-{ds}",
}
transform_result = spark_transform(staging_verified)
# =========================================================================
# Phase 4 — Load gold table to ClickHouse
# =========================================================================
@task(
executor="CeleryExecutor"
inlets=[iceberg_gold_daily_revenue],
outlets=[clickhouse_dashboards],
)
def clickhouse_load(spark_result: dict, **context) -> dict:
"""Read Iceberg gold table, write to ClickHouse."""
from pyiceberg.catalog import load_catalog
import clickhouse_connect
ds = context["ds"]
# Read Iceberg
catalog = load_catalog("default")
table = catalog.load_table("lakehouse.gold.daily_revenue")
# Filter to current ds
df = table.scan(row_filter=f"date = '{ds}'").to_pandas()
# Write to ClickHouse
client = clickhouse_connect.get_client(
host="clickhouse.internal"
username="airflow"
password="{{ var.value.clickhouse_password }}",
)
# Idempotent — TRUNCATE + INSERT per partition (модуль 17.02)
client.command(f"ALTER TABLE dashboards.daily_revenue DROP PARTITION '{ds}'")
client.insert_df("dashboards.daily_revenue", df)
return {"rows_loaded": len(df), "partition": ds}
ch_result = clickhouse_load(transform_result)
# =========================================================================
# Phase 5 — Notify team (final notification)
# =========================================================================
@task(executor="CeleryExecutor")
def notify_team(consume_results: list[dict], spark_result: dict, ch_result: dict, **context):
"""Send Slack notification with metrics."""
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
total_consumed = sum(r["record_count"] for r in consume_results)
msg = f"""
✓ *Orders ETL Complete* `{context['ds']}`
*Kafka Consume*: {total_consumed:,} records (3 topics)
*Spark Transform*: {spark_result['rows_silver']:,} rows in silver table
*ClickHouse Load*: {ch_result['rows_loaded']:,} rows for {ch_result['partition']}
*Duration*: {spark_result['duration_seconds']}s
Marquez lineage: https://marquez.internal/runs/{{{{ run_id }}}}
"""
hook = SlackWebhookHook(slack_webhook_conn_id="slack_alerts")
hook.send(text=msg, channel="#data-eng-ops")
notify_team(consume_results, transform_result, ch_result)
# Instantiate
dag = capstone_orders_etl()
Step 4: Custom ClickHouse-Iceberg loader (alternative pattern)
Если хотите custom operator вместо @task:
# plugins/operators/clickhouse_iceberg_load.py
"""Custom operator: Load Iceberg table partition к ClickHouse."""
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import logging
class ClickHouseFromIcebergOperator(BaseOperator):
template_fields = ("iceberg_table", "target_clickhouse_table", "partition_value")
def __init__(
self,
iceberg_table: str,
target_clickhouse_table: str,
partition_value: str,
partition_column: str = "date",
clickhouse_conn_id: str = "clickhouse_default",
iceberg_catalog: str = "glue",
**kwargs,
):
super().__init__(**kwargs)
self.iceberg_table = iceberg_table
self.target_clickhouse_table = target_clickhouse_table
self.partition_value = partition_value
self.partition_column = partition_column
self.clickhouse_conn_id = clickhouse_conn_id
self.iceberg_catalog = iceberg_catalog
def execute(self, context):
from pyiceberg.catalog import load_catalog
import clickhouse_connect
# Read Iceberg
catalog = load_catalog(self.iceberg_catalog)
table = catalog.load_table(self.iceberg_table)
df = table.scan(
row_filter=f"{self.partition_column} = '{self.partition_value}'"
).to_pandas()
rows_count = len(df)
self.log.info(f"Read {rows_count} rows from Iceberg")
# Write ClickHouse (idempotent)
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection(self.clickhouse_conn_id)
client = clickhouse_connect.get_client(
host=conn.host, username=conn.login, password=conn.password,
)
# Atomic: DROP partition + INSERT
client.command(
f"ALTER TABLE {self.target_clickhouse_table} DROP PARTITION '{self.partition_value}'"
)
client.insert_df(self.target_clickhouse_table, df)
# Push metrics к XCom
context["task_instance"].xcom_push(key="rows_loaded", value=rows_count)
return rows_count
Step 5: Configuration (committed YAML)
# dags/config/capstone_config.yaml
# Static config — committed в Git, NOT fetched runtime (модуль 17.03)
kafka:
topics:
- orders.public.orders
- orders.public.line_items
- orders.public.customers
spark:
namespace: spark-jobs
service_account: airflow-spark-runner
resources:
executor:
cpu: 2
memory: 4Gi
count: 4
iceberg:
catalog: glue
tables:
bronze: lakehouse.bronze.orders_raw
silver: lakehouse.silver.orders_clean
gold: lakehouse.gold.daily_revenue
clickhouse:
database: dashboards
table: daily_revenue
partition_column: date
alerting:
slack_channel: "#data-eng-ops"
pagerduty_routing_key: "{{ var.value.pagerduty_key }}"
Step 6: airflow.cfg additions
# Custom XCom backend
[core]
xcom_backend = plugins.xcom_backends.s3_xcom_backend.S3XComBackend
# Multiple Executors (AIP-61, 2.10+) — модуль 05
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor
# OpenLineage emission — модуль 14
[openlineage]
transport = {"type":"http","url":"http://marquez:5000"}
namespace = production-capstone
# Vault secrets backend — модуль 10
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"url":"https://vault.internal:8200","mount_point":"airflow","auth_type":"kubernetes","kubernetes_role":"airflow","use_cache":true,"cache_ttl_seconds":60}
What this implementation demonstrates
| Module | Concrete usage |
|---|---|
| 02 — DAG fundamentals | hardcoded start_date, catchup=False, default_args |
| 04 — Scheduler internals | HA через row-locks (deployment config) |
| 05 — Executors | Multiple Executors (CeleryExecutor + KubernetesExecutor mixed) |
| 06 — XCom | Custom S3XComBackend для large dataframes |
| 07 — Dynamic Task Mapping | consume_kafka.expand(topic=KAFKA_TOPICS) |
| 08 — Datasets | schedule=[kafka_orders, kafka_line_items, kafka_customers] |
| 09 — Deferrable | (extension — replace verify_staging с S3KeySensorAsync для long waits) |
| 10 — Secrets | Vault через airflow.cfg, Variable.get inside tasks |
| 11 — Pools | spark_submission_pool — limits Spark submissions |
| 14 — OpenLineage | inlets/outlets, auto-emission через provider, Marquez backend |
| 17.02 — Idempotency | Atomic S3 writes (tmp+rename), TRUNCATE+INSERT ClickHouse partitions |
| 17.05 — Backfill-safety | data_interval_start/end (not datetime.now), deterministic S3 paths |
| 17.07 — OL-aware | inlets/outlets explicit, dataset-driven |
| 17.08 — Error handling | AirflowFailException, exponential backoff, Listener-based alerting |
Production gotchas в этой implementation
Spark job submission через boto3.client('eks') simplified. Production: используйте KubernetesPodOperator с custom SparkApplication CRD template, или SparkKubernetesOperator from apache-airflow-providers-cncf-kubernetes.
Kafka consumer logic embedded в task. Production: вынести в shared library (dags/lib/kafka_consumer.py), unit test thoroughly.
Hardcoded S3_BUCKET = "airflow-xcom". Make it configurable через env var: os.environ.get("AIRFLOW_XCOM_S3_BUCKET").
Slack notification только on success. Production: add Listener API (модуль 12) для on_failure events + on_retry events с tiered alerting.
No SLA, no SLA miss callback. Add @task(sla=timedelta(minutes=30)) для monitoring data freshness.
Kafka consumer group_id includes ds — но не handles re-runs. При re-run за тот же date — consumer group уже committed offsets, не consume from earliest. Solution: unique group_id per try_number, OR explicit offset reset.