Real-world patterns — production применения Dynamic Mapping
Теория Dynamic Task Mapping (уроки 02-05) — это foundation. Этот урок — концентрированная подборка production patterns с готовым кодом для типичных задач data engineering:
- Process S3 prefix files
- Parallel DB shard processing
- Multi-region deployment
- Per-mapped-TI error handling и partial success
- Backfill conscious mapping
- Hybrid с XComObjectStorageBackend
Каждый pattern — это не только syntax, но и discussion когда применять, какие limits, как мониторить.
Pattern 1: Process S3 prefix files
Use case: ежедневно поступают N files в S3 prefix, надо обработать каждый и загрузить в warehouse.
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import pandas as pd
@dag(
schedule="@daily"
start_date=datetime(2026, 1, 1),
catchup=False,
tags=["s3", "etl"],
max_active_tasks=16, # global limit для DAG
)
def s3_daily_processor():
@task
def list_files(execution_date) -> list[dict]:
"""List files в S3 для текущего execution date."""
hook = S3Hook(aws_conn_id="aws_default")
prefix = f"raw/{execution_date.strftime('%Y/%m/%d')}/"
keys = hook.list_keys(bucket_name="raw-data", prefix=prefix)
# Filter только .parquet
keys = [k for k in keys if k.endswith(".parquet")]
# Validate — fail fast если empty
if not keys:
raise ValueError(f"No files found in {prefix}")
# Group в metadata
return [
{
"key": k,
"expected_size": hook.head_object(key=k, bucket_name="raw-data")["ContentLength"],
}
for k in sorted(keys) # ← sort для deterministic order
]
@task(
max_active_tis_per_dag=8, # ← max 8 parallel (worker capacity)
retries=3,
retry_delay=300, # 5 min — для S3 transient errors
)
def process_file(file_info: dict) -> dict:
"""Скачать file, transform, upload в warehouse staging."""
hook = S3Hook(aws_conn_id="aws_default")
key = file_info["key"]
# Download to /tmp
local_path = hook.download_file(
key=key,
bucket_name="raw-data"
local_path="/tmp",
)
# Read + transform
df = pd.read_parquet(local_path)
df["processed_at"] = datetime.utcnow()
df = df.dropna(subset=["customer_id"])
# Upload to staging
staging_key = key.replace("raw/", "staging/").replace(".parquet", "_v2.parquet")
df.to_parquet(f"/tmp/staging.parquet")
hook.load_file(
filename="/tmp/staging.parquet"
key=staging_key,
bucket_name="processed-data"
replace=True,
)
return {
"source_key": key,
"staging_key": staging_key,
"rows_processed": len(df),
"rows_dropped": file_info["expected_size"] - len(df), # approx
}
@task(trigger_rule="all_done") # ← runs даже если некоторые failed
def summarize_run(results: list[dict | None]):
"""Aggregate result + report partial failures."""
successful = [r for r in results if r is not None]
failed_count = len(results) - len(successful)
total_rows = sum(r["rows_processed"] for r in successful)
print(f"Success: {len(successful)} files, {total_rows} rows")
print(f"Failed: {failed_count} files")
if failed_count > len(results) * 0.1: # >10% failure
raise ValueError(f"Too many failures: {failed_count}/{len(results)}")
files = list_files()
results = process_file.expand(file_info=files)
summarize_run(results)
s3_daily_processor()
Ключевое:
list_files— sorted, validates not emptyprocess_file—max_active_tis_per_dag=8(rate limit), 3 retries with delaysummarize_run—trigger_rule="all_done"— agg even с failures, fails если > 10% failed
Pattern 2: Parallel DB shard processing
Use case: даты разнесены по N DB shards, daily process каждый shard в parallel.
@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def parallel_shard_processor():
@task
def get_active_shards() -> list[dict]:
"""List active shards с connection info."""
# Можно загружать из external config или DB
return [
{"shard_id": 0, "conn_id": "shard_0_db", "schema": "users_0_to_1M"},
{"shard_id": 1, "conn_id": "shard_1_db", "schema": "users_1M_to_2M"},
{"shard_id": 2, "conn_id": "shard_2_db", "schema": "users_2M_to_3M"},
]
@task(
max_active_tis_per_dag=3, # max 3 parallel — connection pool limit
pool="db_etl", # cross-DAG resource limit
pool_slots=1,
)
def process_shard(
shard_info: dict,
target_schema: str, # constant
batch_size: int, # constant
) -> dict:
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id=shard_info["conn_id"])
# Extract from shard
sql = f"""
SELECT * FROM {shard_info['schema']}.user_events
WHERE event_date = CURRENT_DATE - 1
"""
df = hook.get_pandas_df(sql)
# Transform — anonymize PII, denormalize
df["email_hash"] = df["email"].apply(lambda e: hashlib.sha256(e.encode()).hexdigest())
df.drop(columns=["email", "phone"], inplace=True)
# Load to warehouse
target_hook = PostgresHook(postgres_conn_id="warehouse")
target_hook.insert_rows(
table=f"{target_schema}.events"
rows=df.itertuples(index=False),
commit_every=batch_size,
)
return {
"shard_id": shard_info["shard_id"],
"rows_loaded": len(df),
}
@task
def quality_check(results: list[dict]):
total = sum(r["rows_loaded"] for r in results)
# Cross-check vs expected daily volume
if total < 100000: # known baseline
raise ValueError(f"Suspicious low volume: {total}")
print(f"OK: {total} rows loaded across {len(results)} shards")
shards = get_active_shards()
results = process_shard.partial(
target_schema="warehouse_events"
batch_size=10000,
).expand_kwargs(shards)
quality_check(results)
parallel_shard_processor()
Pattern features:
expand_kwargs(shards)— каждый shard = dict с своими connection infopartial(target_schema, batch_size)— constants для всех mapped TI, хранятся одинождыpool="db_etl"+max_active_tis_per_dag=3— limit concurrent shards (connection pool)quality_checkaggregates, validates total
Pattern 3: Multi-region deployment
Use case: deploy artifact в N regions, каждая deployment independent.
@dag(schedule=None, start_date=datetime(2026, 1, 1), catchup=False)
def multi_region_deploy():
@task
def get_deploy_targets(params: dict) -> list[dict]:
"""Готовит deploy targets с per-region config."""
env = params.get("env", "staging")
if env == "prod":
return [
{"region": "us-east-1", "cluster": "prod-use1", "replicas": 5},
{"region": "us-west-2", "cluster": "prod-usw2", "replicas": 3},
{"region": "eu-west-1", "cluster": "prod-euw1", "replicas": 3},
{"region": "ap-southeast-1", "cluster": "prod-apse1", "replicas": 2},
]
else:
return [
{"region": "us-east-1", "cluster": "staging-use1", "replicas": 1},
]
@task(
max_active_tis_per_dag=2, # roll-out по 2 region за раз (canary)
retries=2,
retry_delay=60,
)
def deploy_region(
region: str,
cluster: str,
replicas: int,
artifact_uri: str, # constant
version: str, # constant
) -> dict:
from kubernetes import client
# Deploy через K8s API
# ... apply manifest ...
return {
"region": region,
"version": version,
"status": "deployed",
}
@task(trigger_rule="all_done")
def post_deploy_check(results: list[dict | None]):
"""Final validation."""
deployed = [r for r in results if r is not None]
failed = len([r for r in results if r is None])
if failed > 0:
# Trigger alerts, rollback?
raise ValueError(f"{failed} regions failed to deploy")
print(f"All {len(deployed)} regions deployed")
targets = get_deploy_targets()
results = deploy_region.partial(
artifact_uri="s3://artifacts/v1.2.3"
version="1.2.3",
).expand_kwargs(targets)
post_deploy_check(results)
multi_region_deploy()
Pattern features:
get_deploy_targetsreturns dynamic list based on params (env=staging vs prod)max_active_tis_per_dag=2— controlled rolloutpartial(artifact_uri, version)— common constantspost_deploy_checkvalidates all regions, fails on any failure
Pattern 4: Per-mapped-TI error handling
Use case: некоторые items могут fail без блокировки остальных. Collect errors, report.
@dag(...)
def resilient_processing():
@task
def get_items() -> list[dict]:
return [{"id": i, "data": f"item_{i}"} for i in range(100)]
@task(retries=2, retry_delay=30)
def process_item(item: dict) -> dict:
"""
Try to process item. Return result dict or raise.
Если final retry failed — mapped TI failed, downstream видит это.
"""
try:
result = expensive_processing(item)
return {
"id": item["id"],
"status": "success",
"result": result,
}
except RecoverableError as e:
# Retry-worthy
raise
except FatalError as e:
# Не worth retry — log и swallow
log.error(f"Fatal error processing item {item['id']}: {e}")
return {
"id": item["id"],
"status": "failed",
"error": str(e),
}
@task(trigger_rule="all_done")
def aggregate(results: list[dict | None]):
"""
Collect: successes, soft-failures (status=failed), hard-failures (None=task failed).
"""
successes = [r for r in results if r and r.get("status") == "success"]
soft_failures = [r for r in results if r and r.get("status") == "failed"]
hard_failures_count = len([r for r in results if r is None])
print(f"Total: {len(results)}")
print(f"Success: {len(successes)}")
print(f"Soft-failures (handled): {len(soft_failures)}")
print(f"Hard-failures (task crash): {hard_failures_count}")
# Push to monitoring
send_metric("processing.success", len(successes))
send_metric("processing.soft_failure", len(soft_failures))
send_metric("processing.hard_failure", hard_failures_count)
# Fail DAG если > 5% hard failures
if hard_failures_count > len(results) * 0.05:
raise ValueError("Too many hard failures")
items = get_items()
results = process_item.expand(item=items)
aggregate(results)
Pattern features:
- Internal try/except для distinguishing soft vs hard failures
- Soft failures — return result dict с status=“failed”, no retry
- Hard failures — re-raise → mapped TI fails → downstream получает None
trigger_rule="all_done"для aggregate — runs anyway- Metrics для monitoring, conditional fail на high error rate
Pattern 5: Backfill-conscious mapping
Use case: backfill старых runs может обработать огромное количество items. Limit per run.
@dag(
schedule="@daily"
start_date=datetime(2025, 1, 1), # год назад — много backfill
catchup=True,
max_active_runs=2, # ← limit одновременных runs
tags=["backfill-safe"],
)
def backfill_safe_dag():
@task
def list_files_for_date(data_interval_start) -> list[str]:
"""List files за конкретный day."""
return list_s3_for_date(data_interval_start)
@task(
max_active_tis_per_dag=20, # ← critical для backfill
retries=2,
)
def process_file(filename: str):
pass
process_file.expand(filename=list_files_for_date())
Ключевое:
max_active_runs=2— limits сколько DagRun-ов одновременно при backfillmax_active_tis_per_dag=20— per-task limit предотвращает 1 run × 1000 mapped × 2 runs = 2000 concurrent
Без этих limits backfill 365 дней × 1000 items/day = 365k mapped TI скоро после старт → scheduler death.
Pattern 6: Hybrid с XComObjectStorageBackend
Use case: каждая mapped TI обрабатывает large data (pandas DataFrame). Default XCom падает. С Object Storage backend работает прозрачно.
# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://aws_default@airflow-xcom/xcom
xcom_objectstorage_threshold = 4096 # > 4KB в S3
@dag(...)
def heavy_data_mapping():
@task
def get_segments() -> list[str]:
return ["segment_A", "segment_B", "segment_C", "segment_D"]
@task
def fetch_segment_data(segment: str) -> dict:
"""Download large data for segment."""
df = fetch_from_warehouse(segment) # 50MB DataFrame
return df.to_dict("records") # → каждая mapped TI пишет ~50MB в S3 backend
@task
def aggregate(all_data: list[dict]):
# Pulls 4 × 50MB = 200MB через Object Storage backend
# Streamed from S3, не raw в DB
total = sum(len(d) for d in all_data)
print(f"Total records: {total}")
fetch_segment_data.expand(segment=get_segments())
# aggregate(fetch_segment_data.expand(segment=get_segments()))
Without ObjectStorage backend — каждая 50MB XCom попадает в Postgres → DB перегружена. With backend — DB остаётся компактной, S3 absorbs large data.
Visualization: production deployment
Comparison: anti-patterns vs proper patterns
| Anti-pattern | Proper pattern |
|---|---|
.expand(item=list(range(10000))) | Chunked: .expand(chunk=chunks_of_100) |
| Mapping без max_active_tis_per_dag | Always set explicit limit |
No trigger_rule="all_done" на aggregator | Use it для resilience к partial failures |
| Large XCom return от mapped TI | Custom Backend / Object Storage |
| No validation upstream length | Assert non-empty / max size в upstream task |
| Mapping для very fast items | Batch processing in single task |
| Cartesian product 3+ axes | Explicit list of dicts через expand_kwargs |
| No retry strategy | retries=N, retry_delay=appropriate |
Production monitoring checklist
Что мониторить для mapping DAG:
| Metric | Source | Threshold |
|---|---|---|
| Mapped TI per run | task_instance count | warning > 500 |
| Avg mapped TI duration | task_instance.duration | varies |
| Failure rate per mapped task | state=failed count | warning > 5% |
| Scheduler tick duration | Prometheus / metrics | p99 < 30s |
| max_active_tis_per_dag saturation | scheduled state count | warning > 80% |
| XCom backend latency | S3 latency / DB query time | p99 < 500ms |
| DAG queue backlog | scheduled TI count | warning > 1000 |
Alerts:
mapped_ti_count > 1000per run — review for batch refactoringmapped_task_failure_rate > 10%— investigate root causescheduler_p99 > 60s— scheduler перегружен
Production gotchas resume
Из предыдущих уроков + новых:
- Backfill может create thousands mapped TI —
max_active_runs,max_active_tis_per_dag - upstream order — sort upstream если порядок важен для map_index
- Custom XCom Backend — обязательно для large mapped XCom
- trigger_rule=“all_done” — для aggregator если хотите partial failure resilience
- retries per mapped TI — isolated, set carefully (не 10 retries × 1000 TI)
- Pool + max_active_tis_per_dag — combine для cross-DAG + intra-DAG limits
- Logging volume — 1000 mapped × 100KB = 100MB/run на storage
- UI usage — educate users on list view для huge mappings, не grid