Learning Platform
Глоссарий Troubleshooting
Урок 08.06 · 28 мин
Продвинутый
Dynamic MappingProduction PatternsS3ShardingMulti-regionError Handling

Real-world patterns — production применения Dynamic Mapping

Теория Dynamic Task Mapping (уроки 02-05) — это foundation. Этот урок — концентрированная подборка production patterns с готовым кодом для типичных задач data engineering:

  1. Process S3 prefix files
  2. Parallel DB shard processing
  3. Multi-region deployment
  4. Per-mapped-TI error handling и partial success
  5. Backfill conscious mapping
  6. Hybrid с XComObjectStorageBackend

Каждый pattern — это не только syntax, но и discussion когда применять, какие limits, как мониторить.


Pattern 1: Process S3 prefix files

Use case: ежедневно поступают N files в S3 prefix, надо обработать каждый и загрузить в warehouse.

from airflow.decorators import dag, task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from datetime import datetime
import pandas as pd

@dag(
    schedule="@daily"
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=["s3", "etl"],
    max_active_tasks=16,   # global limit для DAG
)
def s3_daily_processor():

    @task
    def list_files(execution_date) -> list[dict]:
        """List files в S3 для текущего execution date."""
        hook = S3Hook(aws_conn_id="aws_default")
        prefix = f"raw/{execution_date.strftime('%Y/%m/%d')}/"
        keys = hook.list_keys(bucket_name="raw-data", prefix=prefix)

        # Filter только .parquet
        keys = [k for k in keys if k.endswith(".parquet")]

        # Validate — fail fast если empty
        if not keys:
            raise ValueError(f"No files found in {prefix}")

        # Group в metadata
        return [
            {
                "key": k,
                "expected_size": hook.head_object(key=k, bucket_name="raw-data")["ContentLength"],
            }
            for k in sorted(keys)   # ← sort для deterministic order
        ]

    @task(
        max_active_tis_per_dag=8,   # ← max 8 parallel (worker capacity)
        retries=3,
        retry_delay=300,            # 5 min — для S3 transient errors
    )
    def process_file(file_info: dict) -> dict:
        """Скачать file, transform, upload в warehouse staging."""
        hook = S3Hook(aws_conn_id="aws_default")
        key = file_info["key"]

        # Download to /tmp
        local_path = hook.download_file(
            key=key,
            bucket_name="raw-data"
            local_path="/tmp",
        )

        # Read + transform
        df = pd.read_parquet(local_path)
        df["processed_at"] = datetime.utcnow()
        df = df.dropna(subset=["customer_id"])

        # Upload to staging
        staging_key = key.replace("raw/", "staging/").replace(".parquet", "_v2.parquet")
        df.to_parquet(f"/tmp/staging.parquet")
        hook.load_file(
            filename="/tmp/staging.parquet"
            key=staging_key,
            bucket_name="processed-data"
            replace=True,
        )

        return {
            "source_key": key,
            "staging_key": staging_key,
            "rows_processed": len(df),
            "rows_dropped": file_info["expected_size"] - len(df),  # approx
        }

    @task(trigger_rule="all_done")   # ← runs даже если некоторые failed
    def summarize_run(results: list[dict | None]):
        """Aggregate result + report partial failures."""
        successful = [r for r in results if r is not None]
        failed_count = len(results) - len(successful)
        total_rows = sum(r["rows_processed"] for r in successful)
        print(f"Success: {len(successful)} files, {total_rows} rows")
        print(f"Failed: {failed_count} files")

        if failed_count > len(results) * 0.1:   # >10% failure
            raise ValueError(f"Too many failures: {failed_count}/{len(results)}")

    files = list_files()
    results = process_file.expand(file_info=files)
    summarize_run(results)

s3_daily_processor()

Ключевое:

  • list_files — sorted, validates not empty
  • process_filemax_active_tis_per_dag=8 (rate limit), 3 retries with delay
  • summarize_runtrigger_rule="all_done" — agg even с failures, fails если > 10% failed

Pattern 2: Parallel DB shard processing

Use case: даты разнесены по N DB shards, daily process каждый shard в parallel.

@dag(schedule="@daily", start_date=datetime(2026, 1, 1), catchup=False)
def parallel_shard_processor():

    @task
    def get_active_shards() -> list[dict]:
        """List active shards с connection info."""
        # Можно загружать из external config или DB
        return [
            {"shard_id": 0, "conn_id": "shard_0_db", "schema": "users_0_to_1M"},
            {"shard_id": 1, "conn_id": "shard_1_db", "schema": "users_1M_to_2M"},
            {"shard_id": 2, "conn_id": "shard_2_db", "schema": "users_2M_to_3M"},
        ]

    @task(
        max_active_tis_per_dag=3,    # max 3 parallel — connection pool limit
        pool="db_etl",                # cross-DAG resource limit
        pool_slots=1,
    )
    def process_shard(
        shard_info: dict,
        target_schema: str,           # constant
        batch_size: int,              # constant
    ) -> dict:
        from airflow.providers.postgres.hooks.postgres import PostgresHook

        hook = PostgresHook(postgres_conn_id=shard_info["conn_id"])

        # Extract from shard
        sql = f"""
            SELECT * FROM {shard_info['schema']}.user_events
            WHERE event_date = CURRENT_DATE - 1
        """
        df = hook.get_pandas_df(sql)

        # Transform — anonymize PII, denormalize
        df["email_hash"] = df["email"].apply(lambda e: hashlib.sha256(e.encode()).hexdigest())
        df.drop(columns=["email", "phone"], inplace=True)

        # Load to warehouse
        target_hook = PostgresHook(postgres_conn_id="warehouse")
        target_hook.insert_rows(
            table=f"{target_schema}.events"
            rows=df.itertuples(index=False),
            commit_every=batch_size,
        )

        return {
            "shard_id": shard_info["shard_id"],
            "rows_loaded": len(df),
        }

    @task
    def quality_check(results: list[dict]):
        total = sum(r["rows_loaded"] for r in results)
        # Cross-check vs expected daily volume
        if total < 100000:   # known baseline
            raise ValueError(f"Suspicious low volume: {total}")
        print(f"OK: {total} rows loaded across {len(results)} shards")

    shards = get_active_shards()
    results = process_shard.partial(
        target_schema="warehouse_events"
        batch_size=10000,
    ).expand_kwargs(shards)
    quality_check(results)

parallel_shard_processor()

Pattern features:

  • expand_kwargs(shards) — каждый shard = dict с своими connection info
  • partial(target_schema, batch_size) — constants для всех mapped TI, хранятся одиножды
  • pool="db_etl" + max_active_tis_per_dag=3 — limit concurrent shards (connection pool)
  • quality_check aggregates, validates total

Pattern 3: Multi-region deployment

Use case: deploy artifact в N regions, каждая deployment independent.

@dag(schedule=None, start_date=datetime(2026, 1, 1), catchup=False)
def multi_region_deploy():

    @task
    def get_deploy_targets(params: dict) -> list[dict]:
        """Готовит deploy targets с per-region config."""
        env = params.get("env", "staging")

        if env == "prod":
            return [
                {"region": "us-east-1", "cluster": "prod-use1", "replicas": 5},
                {"region": "us-west-2", "cluster": "prod-usw2", "replicas": 3},
                {"region": "eu-west-1", "cluster": "prod-euw1", "replicas": 3},
                {"region": "ap-southeast-1", "cluster": "prod-apse1", "replicas": 2},
            ]
        else:
            return [
                {"region": "us-east-1", "cluster": "staging-use1", "replicas": 1},
            ]

    @task(
        max_active_tis_per_dag=2,   # roll-out по 2 region за раз (canary)
        retries=2,
        retry_delay=60,
    )
    def deploy_region(
        region: str,
        cluster: str,
        replicas: int,
        artifact_uri: str,         # constant
        version: str,              # constant
    ) -> dict:
        from kubernetes import client
        # Deploy через K8s API
        # ... apply manifest ...
        return {
            "region": region,
            "version": version,
            "status": "deployed",
        }

    @task(trigger_rule="all_done")
    def post_deploy_check(results: list[dict | None]):
        """Final validation."""
        deployed = [r for r in results if r is not None]
        failed = len([r for r in results if r is None])
        if failed > 0:
            # Trigger alerts, rollback?
            raise ValueError(f"{failed} regions failed to deploy")
        print(f"All {len(deployed)} regions deployed")

    targets = get_deploy_targets()
    results = deploy_region.partial(
        artifact_uri="s3://artifacts/v1.2.3"
        version="1.2.3",
    ).expand_kwargs(targets)
    post_deploy_check(results)

multi_region_deploy()

Pattern features:

  • get_deploy_targets returns dynamic list based on params (env=staging vs prod)
  • max_active_tis_per_dag=2 — controlled rollout
  • partial(artifact_uri, version) — common constants
  • post_deploy_check validates all regions, fails on any failure

Pattern 4: Per-mapped-TI error handling

Use case: некоторые items могут fail без блокировки остальных. Collect errors, report.

@dag(...)
def resilient_processing():

    @task
    def get_items() -> list[dict]:
        return [{"id": i, "data": f"item_{i}"} for i in range(100)]

    @task(retries=2, retry_delay=30)
    def process_item(item: dict) -> dict:
        """
        Try to process item. Return result dict or raise.
        Если final retry failed — mapped TI failed, downstream видит это.
        """
        try:
            result = expensive_processing(item)
            return {
                "id": item["id"],
                "status": "success",
                "result": result,
            }
        except RecoverableError as e:
            # Retry-worthy
            raise

        except FatalError as e:
            # Не worth retry — log и swallow
            log.error(f"Fatal error processing item {item['id']}: {e}")
            return {
                "id": item["id"],
                "status": "failed",
                "error": str(e),
            }

    @task(trigger_rule="all_done")
    def aggregate(results: list[dict | None]):
        """
        Collect: successes, soft-failures (status=failed), hard-failures (None=task failed).
        """
        successes = [r for r in results if r and r.get("status") == "success"]
        soft_failures = [r for r in results if r and r.get("status") == "failed"]
        hard_failures_count = len([r for r in results if r is None])

        print(f"Total: {len(results)}")
        print(f"Success: {len(successes)}")
        print(f"Soft-failures (handled): {len(soft_failures)}")
        print(f"Hard-failures (task crash): {hard_failures_count}")

        # Push to monitoring
        send_metric("processing.success", len(successes))
        send_metric("processing.soft_failure", len(soft_failures))
        send_metric("processing.hard_failure", hard_failures_count)

        # Fail DAG если > 5% hard failures
        if hard_failures_count > len(results) * 0.05:
            raise ValueError("Too many hard failures")

    items = get_items()
    results = process_item.expand(item=items)
    aggregate(results)

Pattern features:

  • Internal try/except для distinguishing soft vs hard failures
  • Soft failures — return result dict с status=“failed”, no retry
  • Hard failures — re-raise → mapped TI fails → downstream получает None
  • trigger_rule="all_done" для aggregate — runs anyway
  • Metrics для monitoring, conditional fail на high error rate

Pattern 5: Backfill-conscious mapping

Use case: backfill старых runs может обработать огромное количество items. Limit per run.

@dag(
    schedule="@daily"
    start_date=datetime(2025, 1, 1),   # год назад — много backfill
    catchup=True,
    max_active_runs=2,                  # ← limit одновременных runs
    tags=["backfill-safe"],
)
def backfill_safe_dag():

    @task
    def list_files_for_date(data_interval_start) -> list[str]:
        """List files за конкретный day."""
        return list_s3_for_date(data_interval_start)

    @task(
        max_active_tis_per_dag=20,   # ← critical для backfill
        retries=2,
    )
    def process_file(filename: str):
        pass

    process_file.expand(filename=list_files_for_date())

Ключевое:

  • max_active_runs=2 — limits сколько DagRun-ов одновременно при backfill
  • max_active_tis_per_dag=20 — per-task limit предотвращает 1 run × 1000 mapped × 2 runs = 2000 concurrent

Без этих limits backfill 365 дней × 1000 items/day = 365k mapped TI скоро после старт → scheduler death.


Pattern 6: Hybrid с XComObjectStorageBackend

Use case: каждая mapped TI обрабатывает large data (pandas DataFrame). Default XCom падает. С Object Storage backend работает прозрачно.

# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
xcom_objectstorage_path = s3://aws_default@airflow-xcom/xcom
xcom_objectstorage_threshold = 4096   # > 4KB в S3
@dag(...)
def heavy_data_mapping():

    @task
    def get_segments() -> list[str]:
        return ["segment_A", "segment_B", "segment_C", "segment_D"]

    @task
    def fetch_segment_data(segment: str) -> dict:
        """Download large data for segment."""
        df = fetch_from_warehouse(segment)   # 50MB DataFrame
        return df.to_dict("records")   # → каждая mapped TI пишет ~50MB в S3 backend

    @task
    def aggregate(all_data: list[dict]):
        # Pulls 4 × 50MB = 200MB через Object Storage backend
        # Streamed from S3, не raw в DB
        total = sum(len(d) for d in all_data)
        print(f"Total records: {total}")

    fetch_segment_data.expand(segment=get_segments())
    # aggregate(fetch_segment_data.expand(segment=get_segments()))

Without ObjectStorage backend — каждая 50MB XCom попадает в Postgres → DB перегружена. With backend — DB остаётся компактной, S3 absorbs large data.


Visualization: production deployment

Real-world S3 prefix processor — flow
list_files (S3 LIST)Single TI: S3Hook.list_keys для prefix дня. Validates not empty. Sorts deterministic. Returns list of dict с key+size metadata. Один XCom row.
scheduler expand_mapped_task
N mapped TI (process_file)Каждая mapped TI обрабатывает 1 file. Download, transform, upload to staging. retries=3 с 5min delay для S3 transient errors. max_active_tis_per_dag=8 limits concurrent.
aggregation
summarize_run (trigger_rule=all_done)Runs даже если некоторые mapped TI failed. Pulls list[N] от mapped TI (агрегация). Validates failure rate < 10% — иначе DAG fails. Sends metrics в monitoring.

Comparison: anti-patterns vs proper patterns

Anti-patternProper pattern
.expand(item=list(range(10000)))Chunked: .expand(chunk=chunks_of_100)
Mapping без max_active_tis_per_dagAlways set explicit limit
No trigger_rule="all_done" на aggregatorUse it для resilience к partial failures
Large XCom return от mapped TICustom Backend / Object Storage
No validation upstream lengthAssert non-empty / max size в upstream task
Mapping для very fast itemsBatch processing in single task
Cartesian product 3+ axesExplicit list of dicts через expand_kwargs
No retry strategyretries=N, retry_delay=appropriate

Production monitoring checklist

Что мониторить для mapping DAG:

MetricSourceThreshold
Mapped TI per runtask_instance countwarning > 500
Avg mapped TI durationtask_instance.durationvaries
Failure rate per mapped taskstate=failed countwarning > 5%
Scheduler tick durationPrometheus / metricsp99 < 30s
max_active_tis_per_dag saturationscheduled state countwarning > 80%
XCom backend latencyS3 latency / DB query timep99 < 500ms
DAG queue backlogscheduled TI countwarning > 1000

Alerts:

  • mapped_ti_count > 1000 per run — review for batch refactoring
  • mapped_task_failure_rate > 10% — investigate root cause
  • scheduler_p99 > 60s — scheduler перегружен

Production gotchas resume

Из предыдущих уроков + новых:

  1. Backfill может create thousands mapped TI — max_active_runs, max_active_tis_per_dag
  2. upstream order — sort upstream если порядок важен для map_index
  3. Custom XCom Backend — обязательно для large mapped XCom
  4. trigger_rule=“all_done” — для aggregator если хотите partial failure resilience
  5. retries per mapped TI — isolated, set carefully (не 10 retries × 1000 TI)
  6. Pool + max_active_tis_per_dag — combine для cross-DAG + intra-DAG limits
  7. Logging volume — 1000 mapped × 100KB = 100MB/run на storage
  8. UI usage — educate users on list view для huge mappings, не grid

Проверка знанийKnowledge check
Какой полный production-grade pattern для DAG ежедневной обработки 200-500 S3 files? Опиши обязательные элементы: limits, error handling, monitoring.
ОтветAnswer
**Pattern**: `list_files() → process_file.expand() → summarize(trigger_rule=all_done)`. **Обязательные элементы**: (1) **list_files** — sorted deterministic order (для consistent map_index), validates non-empty (fail fast если пусто), returns list[dict] с метаданными (size, etc); (2) **process_file** с **max_active_tis_per_dag=8-16** (concurrency limit — не overwhelm workers, downstream service, DB connections), **retries=3** с **retry_delay=300s** (S3 transient errors), потенциально pool='s3_etl' для cross-DAG limit; (3) Internal try/except в process_file для distinguishing soft failures (return dict с status='failed') vs hard failures (raise → mapped TI failed); (4) **summarize с trigger_rule='all_done'** — runs даже если некоторые mapped failed, aggregates results, validates failure rate < 10% threshold (иначе DAG fail), sends metrics в monitoring (success/soft_fail/hard_fail counts). (5) **DAG-level limits**: max_active_runs=2-3 (backfill safety), max_active_tasks=16 (global). (6) **XCom strategy**: если каждый file produces large XCom (>48KB) — Custom Backend или XComObjectStorageBackend с threshold. (7) **Monitoring**: alerts на mapped_ti_count > 500, failure_rate > 10%, scheduler p99 > 30s. (8) **Backfill consciousness**: catchup=False или explicit limits, иначе 365 days × 500 mapped = 180k TI после deploy. **Avoid**: cartesian с 3+ axes, mapping для fast items (<1s — overhead dominates, batch better), no limits configuration.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Production DAG обрабатывает 200-500 S3 files daily. Какие 3 обязательных элемента pattern?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6