Learning Platform
Глоссарий Troubleshooting
Урок 19.03 · 32 мин
Продвинутый
CapstoneImplementationDatasetsDynamic MappingDeferrableCode

Capstone implementation — full Python код

Этот урок — полная implementation capstone DAG из предыдущего урока. Production-grade Python код объединяющий все killer-темы курса. Code в этом уроке can быть скопирован в production codebase почти as-is — после adjusting к вашим infrastructure specifics.

Capstone DAG implementation flow — все killer-темы курса в одном pipeline
schedule=[3 Kafka Datasets]Dataset-driven scheduling из модуля 08. DAG триггерится когда Debezium/Kafka producers обновляют любой из трёх Datasets — kafka_orders, kafka_line_items, kafka_customers. Никаких cron — pure data-aware orchestration.
DAG triggered by Dataset event
Phase 1 — Dynamic Task Mapping (модуль 07)consume_kafka.expand(topic=KAFKA_TOPICS) создаёт N parallel mapped tasks — по одному per Kafka topic. Каждый task получает свой topic как parameter. Executor: CeleryExecutor (light, short-lived).
3 parallel light tasks → S3 staging
Phase 2 — Verify stagingverify_staging проверяет что все S3 staging файлы записаны. AirflowFailException если missing — fail-fast pattern из модуля 17.08. Executor: CeleryExecutor.
Phase 3 — Multiple Executors AIP-61 (модуль 05)spark_transform запускается на KubernetesExecutor — own pod с custom resources (2 CPU, 4Gi). Light tasks остаются на Celery, heavy task получает изолированный pod. Mixed executors в одном DAG — feature 2.10+.
XCom metrics → custom S3 backend (модуль 06)
Phase 4 — Load ClickHouseclickhouse_load читает Iceberg gold table, idempotent DROP PARTITION + INSERT (модуль 17.02). inlets/outlets emit OpenLineage events автоматически — модуль 14.
Datasets updated → downstream DAGs триггерятся
Phase 5 — Notify teamSlack notification with full metrics + Marquez lineage URL. Production: replace на Listener API (модуль 12) для централизованного alerting через on_failure / on_retry events.

Структура файлов:

dags/
├── capstone/
│   ├── orders_etl_realtime.py        # Main DAG
│   ├── datasets.py                    # Dataset definitions
│   └── helpers.py                     # Shared utilities
├── plugins/
│   ├── xcom_backends/
│   │   └── s3_xcom_backend.py        # Custom XCom (S3 storage)
│   └── operators/
│       └── clickhouse_iceberg_load.py # Custom operator
└── config/
    └── capstone_config.yaml          # Configuration

Step 1: Dataset definitions

# dags/capstone/datasets.py
"""Centralized dataset definitions for capstone pipeline."""
from airflow import Dataset

# Source datasets — обновляются Debezium/Kafka consumers
kafka_orders = Dataset("kafka://confluent.cloud/orders.public.orders")
kafka_line_items = Dataset("kafka://confluent.cloud/orders.public.line_items")
kafka_customers = Dataset("kafka://confluent.cloud/orders.public.customers")

# Intermediate datasets — обновляются нашим DAG
s3_staging_orders = Dataset("s3://datalake/staging/orders/")
s3_staging_line_items = Dataset("s3://datalake/staging/line_items/")
s3_staging_customers = Dataset("s3://datalake/staging/customers/")

# Output datasets — потребляются downstream DAGs
iceberg_bronze_orders = Dataset("iceberg://lakehouse/bronze.orders_raw")
iceberg_silver_orders = Dataset("iceberg://lakehouse/silver.orders_clean")
iceberg_gold_daily_revenue = Dataset("iceberg://lakehouse/gold.daily_revenue")
clickhouse_dashboards = Dataset("clickhouse://clickhouse.internal/dashboards.daily_revenue")

Step 2: Custom XCom backend для large dataframes

# plugins/xcom_backends/s3_xcom_backend.py
"""Custom XCom backend storing large objects на S3.

XCom DB column limit ~48 KB; для dataframes / large dicts используем S3.
"""
import json
import pickle
import uuid
from typing import Any

from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

S3_BUCKET = "airflow-xcom"
SIZE_THRESHOLD = 4 * 1024  # 4 KB — small XComs stay в DB, large на S3

class S3XComBackend(BaseXCom):
    """
    Custom XCom backend:
    - Small values (<4 KB) — stored как обычно в DB
    - Large values — pickled, uploaded к S3, only reference stored в DB
    """

    PREFIX = "xcom:s3:"

    @staticmethod
    def serialize_value(value: Any, **kwargs) -> bytes:
        """Сериализация — если value большой, store на S3, return reference."""
        # First — pickle для measuring size
        pickled = pickle.dumps(value)

        if len(pickled) < SIZE_THRESHOLD:
            # Small — обычная сериализация в DB
            return BaseXCom.serialize_value(value, **kwargs)

        # Large — upload в S3
        key = f"xcom/{uuid.uuid4()}.pickle"
        hook = S3Hook(aws_conn_id="aws_default")
        hook.load_bytes(
            bytes_data=pickled,
            bucket_name=S3_BUCKET,
            key=key,
            replace=True,
        )
        reference = {
            "_s3_xcom": True,
            "bucket": S3_BUCKET,
            "key": key,
            "size_bytes": len(pickled),
        }
        return BaseXCom.serialize_value(reference, **kwargs)

    @staticmethod
    def deserialize_value(result) -> Any:
        """Deserialize — если reference на S3, fetch + unpickle."""
        value = BaseXCom.deserialize_value(result)
        if isinstance(value, dict) and value.get("_s3_xcom"):
            hook = S3Hook(aws_conn_id="aws_default")
            content = hook.read_key(key=value["key"], bucket_name=value["bucket"])
            return pickle.loads(content.encode() if isinstance(content, str) else content)
        return value

Configuration в airflow.cfg:

[core]
xcom_backend = plugins.xcom_backends.s3_xcom_backend.S3XComBackend

Step 3: Main DAG — Full implementation

# dags/capstone/orders_etl_realtime.py
"""
Capstone — real-time orders ETL pipeline.

Sources: 3 Kafka topics (CDC от Postgres OLTP)
Sinks: Iceberg lakehouse + ClickHouse dashboards

Demonstrates: Datasets, Dynamic Mapping, Deferrable, Multiple Executors,
custom XCom backend, OpenLineage emission, HA scheduler+triggerer.

Airflow 2.10/2.11 LTS.
"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any

from airflow.decorators import dag, task
from airflow.lineage.entities import Table
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.slack.notifications.slack_webhook import SlackWebhookNotifier

from dags.capstone.datasets import (
    kafka_orders, kafka_line_items, kafka_customers,
    s3_staging_orders, s3_staging_line_items, s3_staging_customers,
    iceberg_bronze_orders, iceberg_silver_orders, iceberg_gold_daily_revenue,
    clickhouse_dashboards,
)

# Configuration ВЫНЕСЕНА в config — bake в Git, не runtime fetch
KAFKA_TOPICS = ["orders.public.orders", "orders.public.line_items", "orders.public.customers"]
SPARK_K8S_NAMESPACE = "spark-jobs"
ICEBERG_CATALOG = "glue"

@dag(
    dag_id="capstone_orders_etl_realtime",
    # ВАЖНО: dataset-triggered, НЕ cron — модуль 08
    schedule=[kafka_orders, kafka_line_items, kafka_customers],
    start_date=datetime(2024, 1, 1),  # Hardcoded — модуль 02 quiz
    catchup=False,                     # ALWAYS False для новых DAGs — модуль 17.05
    max_active_runs=3,                 # Resource control
    tags=["capstone", "production", "critical", "etl"],
    description="Real-time orders ETL: Kafka CDC → S3 → Spark → Iceberg → ClickHouse"
    default_args={
        "owner": "[email protected]",
        "retries": 3,
        "retry_delay": timedelta(minutes=2),
        "retry_exponential_backoff": True,
        "max_retry_delay": timedelta(minutes=30),
        # Listener API handles alerting (модуль 12), не callback в каждой DAG
    },
    doc_md=__doc__,
)
def capstone_orders_etl():
    """Capstone DAG — full pipeline."""

    # =========================================================================
    # Phase 1 — Consume from Kafka topics
    # Использует Dynamic Task Mapping — модуль 07
    # =========================================================================

    @task(
        # Multiple Executors AIP-61 (2.10+) — light tasks на Celery
        executor="CeleryExecutor",
        # Lineage inlets/outlets — модуль 17.07
        inlets=[Table(database="kafka", schema="public", name="orders")],
        outlets=[s3_staging_orders],
    )
    def consume_kafka(topic: str, **context) -> dict:
        """Consume Kafka topic to S3 staging — one task per topic via mapping."""
        from confluent_kafka import Consumer
        import boto3
        import json
        from airflow.models import Variable
        # ВАЖНО: Variable.get внутри task, не top-level (модуль 02 quiz)

        # data_interval для idempotent processing (модуль 17.05)
        interval_start = context["data_interval_start"]
        interval_end = context["data_interval_end"]
        ds = context["ds"]

        kafka_conf = {
            "bootstrap.servers": Variable.get("kafka_bootstrap"),
            "security.protocol": "SASL_SSL",
            "sasl.mechanisms": "PLAIN",
            "sasl.username": Variable.get("kafka_username"),
            "sasl.password": Variable.get("kafka_password"),
            "group.id": f"airflow-{topic}-{ds}",  # Deterministic group id
            "auto.offset.reset": "earliest",
        }

        consumer = Consumer(kafka_conf)
        consumer.subscribe([topic])

        records = []
        timeout_seconds = 60
        end_time = datetime.utcnow().timestamp() + timeout_seconds

        try:
            while datetime.utcnow().timestamp() < end_time:
                msg = consumer.poll(timeout=5.0)
                if msg is None:
                    continue
                if msg.error():
                    continue
                event = json.loads(msg.value())
                # Filter to data_interval window — backfill-safe
                event_ts = datetime.fromisoformat(event["ts_ms"])
                if interval_start <= event_ts < interval_end:
                    records.append(event)
        finally:
            consumer.close()

        # Deterministic S3 path (модуль 17.02)
        topic_short = topic.split(".")[-1]
        s3_key = f"staging/{topic_short}/dt={ds}/part-00000.json"

        s3 = boto3.client("s3")
        # Atomic write через tmp + rename (модуль 17.02)
        tmp_key = f"{s3_key}.tmp"
        s3.put_object(
            Bucket="datalake"
            Key=tmp_key,
            Body=json.dumps(records).encode(),
            ContentType="application/json",
        )
        s3.copy_object(
            Bucket="datalake"
            CopySource={"Bucket": "datalake", "Key": tmp_key},
            Key=s3_key,
        )
        s3.delete_object(Bucket="datalake", Key=tmp_key)

        return {
            "topic": topic,
            "s3_key": s3_key,
            "record_count": len(records),
            "data_interval_start": str(interval_start),
            "data_interval_end": str(interval_end),
        }

    # Dynamic Mapping — один task definition × 3 topics
    consume_results = consume_kafka.expand(topic=KAFKA_TOPICS)

    # =========================================================================
    # Phase 2 — Wait for all staging files (deferrable sensors)
    # Модуль 09 — Deferrable Operators
    # =========================================================================

    @task(executor="CeleryExecutor")
    def verify_staging(consume_results: list[dict]) -> list[dict]:
        """Verify все staging files exist on S3."""
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook
        hook = S3Hook(aws_conn_id="aws_default")

        for result in consume_results:
            exists = hook.check_for_key(bucket_name="datalake", key=result["s3_key"])
            if not exists:
                from airflow.exceptions import AirflowFailException
                raise AirflowFailException(f"Staging file missing: {result['s3_key']}")

        return consume_results

    staging_verified = verify_staging(consume_results)

    # =========================================================================
    # Phase 3 — Spark transformation on Kubernetes
    # Multiple Executors — модуль 05 / AIP-61
    # =========================================================================

    @task(
        executor="KubernetesExecutor",  # Heavy task — own pod
        pool="spark_submission_pool",    # Resource pool — модуль 11
        retries=2,
        retry_delay=timedelta(minutes=5),
        inlets=[s3_staging_orders, s3_staging_line_items, s3_staging_customers],
        outlets=[iceberg_bronze_orders, iceberg_silver_orders, iceberg_gold_daily_revenue],
        executor_config={
            "pod_override": {
                "spec": {
                    "containers": [{
                        "name": "base",
                        "resources": {
                            "requests": {"cpu": "1", "memory": "2Gi"},
                            "limits": {"cpu": "2", "memory": "4Gi"},
                        },
                    }],
                    "serviceAccountName": "airflow-spark-runner",
                },
            },
        },
    )
    def spark_transform(staging_results: list[dict], **context) -> dict:
        """Submit Spark job для transformation Kafka events → Iceberg."""
        from airflow.providers.amazon.aws.hooks.s3 import S3Hook
        import boto3

        ds = context["ds"]
        data_interval_start = context["data_interval_start"]

        # Spark job config — bake в S3 (committed)
        spark_args = {
            "ds": ds,
            "data_interval_start": str(data_interval_start),
            "staging_paths": {r["topic"]: f"s3://datalake/{r['s3_key']}" for r in staging_results},
            "iceberg_catalog": ICEBERG_CATALOG,
            "iceberg_tables": {
                "bronze": "lakehouse.bronze.orders_raw",
                "silver": "lakehouse.silver.orders_clean",
                "gold": "lakehouse.gold.daily_revenue",
            },
        }

        # Submit Spark job через K8s API (kubectl apply)
        k8s_apps_v1 = boto3.client("eks")  # simplified
        # ... full SparkApplication CRD submission (omitted для brevity) ...

        # Wait for completion (synchronous) — в реальности через async sensor
        # Здесь возвращаем metrics
        return {
            "rows_bronze": 1_500_000,
            "rows_silver": 1_450_000,  # After dedup/quality filter
            "rows_gold": 1_250,         # Daily aggregations
            "duration_seconds": 320,
            "spark_application_id": f"orders-etl-{ds}",
        }

    transform_result = spark_transform(staging_verified)

    # =========================================================================
    # Phase 4 — Load gold table to ClickHouse
    # =========================================================================

    @task(
        executor="CeleryExecutor"
        inlets=[iceberg_gold_daily_revenue],
        outlets=[clickhouse_dashboards],
    )
    def clickhouse_load(spark_result: dict, **context) -> dict:
        """Read Iceberg gold table, write to ClickHouse."""
        from pyiceberg.catalog import load_catalog
        import clickhouse_connect

        ds = context["ds"]

        # Read Iceberg
        catalog = load_catalog("default")
        table = catalog.load_table("lakehouse.gold.daily_revenue")
        # Filter to current ds
        df = table.scan(row_filter=f"date = '{ds}'").to_pandas()

        # Write to ClickHouse
        client = clickhouse_connect.get_client(
            host="clickhouse.internal"
            username="airflow"
            password="{{ var.value.clickhouse_password }}",
        )

        # Idempotent — TRUNCATE + INSERT per partition (модуль 17.02)
        client.command(f"ALTER TABLE dashboards.daily_revenue DROP PARTITION '{ds}'")
        client.insert_df("dashboards.daily_revenue", df)

        return {"rows_loaded": len(df), "partition": ds}

    ch_result = clickhouse_load(transform_result)

    # =========================================================================
    # Phase 5 — Notify team (final notification)
    # =========================================================================

    @task(executor="CeleryExecutor")
    def notify_team(consume_results: list[dict], spark_result: dict, ch_result: dict, **context):
        """Send Slack notification with metrics."""
        from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

        total_consumed = sum(r["record_count"] for r in consume_results)
        msg = f"""
✓ *Orders ETL Complete* `{context['ds']}`

*Kafka Consume*: {total_consumed:,} records (3 topics)
*Spark Transform*: {spark_result['rows_silver']:,} rows in silver table
*ClickHouse Load*: {ch_result['rows_loaded']:,} rows for {ch_result['partition']}
*Duration*: {spark_result['duration_seconds']}s

Marquez lineage: https://marquez.internal/runs/{{{{ run_id }}}}
"""
        hook = SlackWebhookHook(slack_webhook_conn_id="slack_alerts")
        hook.send(text=msg, channel="#data-eng-ops")

    notify_team(consume_results, transform_result, ch_result)

# Instantiate
dag = capstone_orders_etl()

Step 4: Custom ClickHouse-Iceberg loader (alternative pattern)

Если хотите custom operator вместо @task:

# plugins/operators/clickhouse_iceberg_load.py
"""Custom operator: Load Iceberg table partition к ClickHouse."""
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
import logging

class ClickHouseFromIcebergOperator(BaseOperator):
    template_fields = ("iceberg_table", "target_clickhouse_table", "partition_value")

    def __init__(
        self,
        iceberg_table: str,
        target_clickhouse_table: str,
        partition_value: str,
        partition_column: str = "date",
        clickhouse_conn_id: str = "clickhouse_default",
        iceberg_catalog: str = "glue",
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.iceberg_table = iceberg_table
        self.target_clickhouse_table = target_clickhouse_table
        self.partition_value = partition_value
        self.partition_column = partition_column
        self.clickhouse_conn_id = clickhouse_conn_id
        self.iceberg_catalog = iceberg_catalog

    def execute(self, context):
        from pyiceberg.catalog import load_catalog
        import clickhouse_connect

        # Read Iceberg
        catalog = load_catalog(self.iceberg_catalog)
        table = catalog.load_table(self.iceberg_table)
        df = table.scan(
            row_filter=f"{self.partition_column} = '{self.partition_value}'"
        ).to_pandas()

        rows_count = len(df)
        self.log.info(f"Read {rows_count} rows from Iceberg")

        # Write ClickHouse (idempotent)
        from airflow.hooks.base import BaseHook
        conn = BaseHook.get_connection(self.clickhouse_conn_id)
        client = clickhouse_connect.get_client(
            host=conn.host, username=conn.login, password=conn.password,
        )

        # Atomic: DROP partition + INSERT
        client.command(
            f"ALTER TABLE {self.target_clickhouse_table} DROP PARTITION '{self.partition_value}'"
        )
        client.insert_df(self.target_clickhouse_table, df)

        # Push metrics к XCom
        context["task_instance"].xcom_push(key="rows_loaded", value=rows_count)
        return rows_count

Step 5: Configuration (committed YAML)

# dags/config/capstone_config.yaml
# Static config — committed в Git, NOT fetched runtime (модуль 17.03)

kafka:
  topics:
    - orders.public.orders
    - orders.public.line_items
    - orders.public.customers

spark:
  namespace: spark-jobs
  service_account: airflow-spark-runner
  resources:
    executor:
      cpu: 2
      memory: 4Gi
      count: 4

iceberg:
  catalog: glue
  tables:
    bronze: lakehouse.bronze.orders_raw
    silver: lakehouse.silver.orders_clean
    gold: lakehouse.gold.daily_revenue

clickhouse:
  database: dashboards
  table: daily_revenue
  partition_column: date

alerting:
  slack_channel: "#data-eng-ops"
  pagerduty_routing_key: "{{ var.value.pagerduty_key }}"

Step 6: airflow.cfg additions

# Custom XCom backend
[core]
xcom_backend = plugins.xcom_backends.s3_xcom_backend.S3XComBackend

# Multiple Executors (AIP-61, 2.10+) — модуль 05
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor,airflow.providers.cncf.kubernetes.executors.kubernetes_executor.KubernetesExecutor

# OpenLineage emission — модуль 14
[openlineage]
transport = {"type":"http","url":"http://marquez:5000"}
namespace = production-capstone

# Vault secrets backend — модуль 10
[secrets]
backend = airflow.providers.hashicorp.secrets.vault.VaultBackend
backend_kwargs = {"url":"https://vault.internal:8200","mount_point":"airflow","auth_type":"kubernetes","kubernetes_role":"airflow","use_cache":true,"cache_ttl_seconds":60}

What this implementation demonstrates

ModuleConcrete usage
02 — DAG fundamentalshardcoded start_date, catchup=False, default_args
04 — Scheduler internalsHA через row-locks (deployment config)
05 — ExecutorsMultiple Executors (CeleryExecutor + KubernetesExecutor mixed)
06 — XComCustom S3XComBackend для large dataframes
07 — Dynamic Task Mappingconsume_kafka.expand(topic=KAFKA_TOPICS)
08 — Datasetsschedule=[kafka_orders, kafka_line_items, kafka_customers]
09 — Deferrable(extension — replace verify_staging с S3KeySensorAsync для long waits)
10 — SecretsVault через airflow.cfg, Variable.get inside tasks
11 — Poolsspark_submission_pool — limits Spark submissions
14 — OpenLineageinlets/outlets, auto-emission через provider, Marquez backend
17.02 — IdempotencyAtomic S3 writes (tmp+rename), TRUNCATE+INSERT ClickHouse partitions
17.05 — Backfill-safetydata_interval_start/end (not datetime.now), deterministic S3 paths
17.07 — OL-awareinlets/outlets explicit, dataset-driven
17.08 — Error handlingAirflowFailException, exponential backoff, Listener-based alerting

Production gotchas в этой implementation

Spark job submission через boto3.client('eks') simplified. Production: используйте KubernetesPodOperator с custom SparkApplication CRD template, или SparkKubernetesOperator from apache-airflow-providers-cncf-kubernetes.

Kafka consumer logic embedded в task. Production: вынести в shared library (dags/lib/kafka_consumer.py), unit test thoroughly.

Hardcoded S3_BUCKET = "airflow-xcom". Make it configurable через env var: os.environ.get("AIRFLOW_XCOM_S3_BUCKET").

Slack notification только on success. Production: add Listener API (модуль 12) для on_failure events + on_retry events с tiered alerting.

No SLA, no SLA miss callback. Add @task(sla=timedelta(minutes=30)) для monitoring data freshness.

Kafka consumer group_id includes ds — но не handles re-runs. При re-run за тот же date — consumer group уже committed offsets, не consume from earliest. Solution: unique group_id per try_number, OR explicit offset reset.


Проверка знанийKnowledge check
Capstone использует custom XCom backend (S3) для large dataframes. Какие потенциальные проблемы у этого подхода и как их митигировать?
ОтветAnswer
S3 custom XCom backend — powerful tool, но имеет tradeoffs. Problems: (1) **Performance latency** — S3 PUT/GET добавляет 50-200ms per XCom операция vs <5ms для DB XCom. На DAG с 50 tasks pushing/pulling XComs — 5-15s additional latency. Mitigation: SIZE_THRESHOLD=4KB (small XComs остаются в DB), reserve S3 только для >4KB; (2) **Cost** — S3 requests $0.0004 per 1000 PUT (Standard tier). 1M XCom operations/month = $0.40 — negligible. PLUS storage cost: pickled objects ~10x raw size, accumulates. Mitigation: S3 Lifecycle policy — DELETE objects after 14 days; (3) **Pickle deserialize security risk** — pickle может execute arbitrary code on deserialize. Если attacker может write к S3 bucket с XCom — RCE possible. Mitigation: bucket policies restrict write только к airflow service account, S3 versioning, audit log; (4) **Cleanup gaps** — DAG deletes TaskInstance, но S3 object orphan. Mitigation: lifecycle rule (delete >14 days), separate cleanup DAG ('orphan XCom remover'), or use S3 events trigger cleanup; (5) **No transactional consistency** — если task fails between S3 put и DB write, S3 object orphan. Mitigation: write order — first S3 put, then DB reference (если fail в DB, retry overwrites S3); (6) **Debugging harder** — XCom value в Airflow UI shows `{_s3_xcom: True, key: '...'}` reference, не actual value. Mitigation: custom UI plugin для preview, OR airflow CLI `airflow tasks states xcom --include-s3`; (7) **Cross-region S3** — workers в us-east, S3 bucket в us-west = 100+ ms latency. Mitigation: same-region S3 bucket. **Алternative**: JSON XCom backend (JSON file на disk) — для very small clusters. Snowflake/Bigquery XCom backend — для cloud DW shops. (8) **Migration path к 3.x** — XCom backend API сохраняется в 3.x, no changes нужны. Production trade-off: S3 XCom backend ускоряет DAG (no DB column limit), but adds operational complexity. Recommend только если actually need >48KB XComs (large pandas dataframes). Для most workloads — default DB XCom sufficient.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Custom XCom backend (S3) — когда оправдан?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 10