Learning Platform
Глоссарий Troubleshooting
Урок 07.04 · 35 мин
Продвинутый
XComCustom BackendS3GCSBaseXCom

Custom XCom Backend — S3/GCS offload

Это самая практически важная тема модуля. Если в production вы передаёте через XCom что-то крупнее 1KB pandas/numpy/Parquet — нужен Custom XCom Backend. Это не optional optimization, это must-have для любого data engineering workflow выше “Hello World”.

В этом уроке мы разберём механизм Custom Backend от первого принципа, напишем полнофункциональный S3 backend, обсудим reference vs full storage patterns и security implications.


Зачем нужен Custom Backend

Default backend хранит XCom value прямо в metadata DB. Это:

  • Удобно (no extra infra)
  • Дёшево для маленьких значений
  • Катастрофа для большого payload

Custom Backend позволяет переместить storage в специализированное место (S3/GCS/Redis/любое), а в DB хранить только pointer/metadata. Преимущества:

AspectDefault (DB)Custom (S3)
Max size~48KB practicalTB (object storage limits)
CostDB storage (expensive)S3 storage (cheap)
Read perf largeBad (ORM hydration)Good (streaming)
DB loadHigh XCom IOLow — только metadata
FormatJSON/pickle bytesCustom (Parquet, Arrow, etc)
LifecycleManual cleanupS3 lifecycle policies

BaseXCom contract

Custom Backend — это subclass airflow.models.xcom.BaseXCom. Override два static method:

from airflow.models.xcom import BaseXCom
from typing import Any


class MyXComBackend(BaseXCom):
    @staticmethod
    def serialize_value(
        value: Any,
        *,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: int | None = None,
    ) -> bytes:
        """Convert Python value → bytes для хранения в xcom.value column."""
        ...

    @staticmethod
    def deserialize_value(result) -> Any:
        """Convert XCom DB row → Python value."""
        ...

Что важно:

  • serialize_value получает Python object + metadata о task → возвращает bytes (хранится в DB)
  • deserialize_value получает SQLAlchemy XCom row → возвращает Python object
  • Метаданные (task_id, dag_id, run_id, map_index) полезны для строения S3 path

Pattern 1: Full storage в S3 (reference в DB)

Самый common pattern: actual value хранится в S3, в DB колонка value содержит S3 URI (или JSON с metadata).

Custom Backend: full storage в S3
Worker.execute(producer)Producer task возвращает большой объект, например pandas DataFrame 50MB. _PythonDecoratedOperator вызывает xcom_push.
serialize_value(value, **meta)
S3 PUT s3://airflow-xcom/dag/task/run/keyCustom Backend сериализует DataFrame (например в Parquet bytes через pyarrow), кладёт в S3 по детерминированному ключу. Возвращает в bytes — JSON ref {'_xcom_s3': 's3://...'}.
return small reference
INSERT xcom (value = ref bytes)В DB записывается только S3 URI (200 bytes). Сама таблица xcom остаётся компактной — никакой нагрузки на DB.
downstream consumer xcom_pull
SELECT xcom — get refWorker делает SELECT FROM xcom — получает 200 bytes ref. Быстро, без нагрузки на DB.
deserialize_value(ref)
S3 GET → Parquet → DataFrameCustom Backend читает ref, делает S3 GET, deserializes Parquet bytes в pandas DataFrame. Streaming — память не raised до конца чтения.

Full code: S3XComBackend

Production-grade пример:

# plugins/s3_xcom_backend.py
import json
import io
import uuid
from typing import Any
import logging

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

log = logging.getLogger(__name__)

S3_CONN_ID = "aws_default"
S3_BUCKET = "airflow-xcom-storage"
SIZE_THRESHOLD = 32 * 1024  # 32KB — ниже храним в DB как JSON


class S3XComBackend(BaseXCom):
    """
    Custom XCom backend для offload больших значений в S3.

    Стратегия:
    - Малые (< 32KB) JSON-serializable → хранить в DB (как default)
    - Большие или сложные типы (pandas, numpy) → Parquet в S3, ref в DB
    """

    PREFIX = "s3_xcom://"

    @staticmethod
    def _build_s3_key(dag_id, task_id, run_id, map_index, key):
        # Детерминированный путь (без uuid) для idempotent rerun
        safe_run = run_id.replace(":", "_").replace("+", "_")
        return (
            f"xcom/{dag_id}/{safe_run}/"
            f"{task_id}/map_{map_index if map_index is not None else -1}/"
            f"{key}.parquet"
        )

    @staticmethod
    def serialize_value(
        value: Any,
        *,
        key: str | None = None,
        task_id: str | None = None,
        dag_id: str | None = None,
        run_id: str | None = None,
        map_index: int | None = None,
    ) -> bytes:
        # Case 1: pandas DataFrame → Parquet → S3
        if isinstance(value, pd.DataFrame):
            s3_key = S3XComBackend._build_s3_key(dag_id, task_id, run_id, map_index, key)
            buffer = io.BytesIO()
            table = pa.Table.from_pandas(value)
            pq.write_table(table, buffer, compression="snappy")
            buffer.seek(0)

            hook = S3Hook(aws_conn_id=S3_CONN_ID)
            hook.load_bytes(
                bytes_data=buffer.getvalue(),
                key=s3_key,
                bucket_name=S3_BUCKET,
                replace=True,
            )

            ref = {
                "_xcom_backend": "s3",
                "_xcom_type": "pandas.DataFrame",
                "uri": f"s3://{S3_BUCKET}/{s3_key}",
                "rows": len(value),
                "size_bytes": len(buffer.getvalue()),
            }
            log.info("XCom S3 upload: %s (rows=%d, size=%d)", s3_key, ref["rows"], ref["size_bytes"])
            return json.dumps(ref).encode("utf-8")

        # Case 2: try default JSON serialization
        try:
            serialized = BaseXCom.serialize_value(value, key=key, task_id=task_id, dag_id=dag_id, run_id=run_id, map_index=map_index)
            if len(serialized) <= SIZE_THRESHOLD:
                # Маленькое → как обычно в DB
                return serialized
            # Большое JSON → тоже в S3
            s3_key = S3XComBackend._build_s3_key(dag_id, task_id, run_id, map_index, key) + ".json"
            hook = S3Hook(aws_conn_id=S3_CONN_ID)
            hook.load_bytes(bytes_data=serialized, key=s3_key, bucket_name=S3_BUCKET, replace=True)
            ref = {
                "_xcom_backend": "s3",
                "_xcom_type": "json",
                "uri": f"s3://{S3_BUCKET}/{s3_key}",
                "size_bytes": len(serialized),
            }
            return json.dumps(ref).encode("utf-8")
        except (TypeError, ValueError) as e:
            raise TypeError(
                f"Cannot serialize {type(value)} via S3XComBackend. "
                f"Add explicit handler or use supported type. Original: {e}"
            )

    @staticmethod
    def deserialize_value(result) -> Any:
        raw_bytes = result.value
        if not raw_bytes:
            return None

        # Попробовать как ref JSON
        try:
            obj = json.loads(raw_bytes.decode("utf-8"))
        except (json.JSONDecodeError, UnicodeDecodeError):
            # Не наш формат — fallback на default
            return BaseXCom.deserialize_value(result)

        if isinstance(obj, dict) and obj.get("_xcom_backend") == "s3":
            uri = obj["uri"]
            xcom_type = obj["_xcom_type"]
            bucket, key = uri.replace("s3://", "").split("/", 1)
            hook = S3Hook(aws_conn_id=S3_CONN_ID)

            if xcom_type == "pandas.DataFrame":
                data = hook.read_key(key, bucket_name=bucket)
                buffer = io.BytesIO(data.encode() if isinstance(data, str) else data)
                table = pq.read_table(buffer)
                return table.to_pandas()
            elif xcom_type == "json":
                data = hook.read_key(key, bucket_name=bucket)
                return json.loads(data)
            else:
                raise ValueError(f"Unknown _xcom_type: {xcom_type}")

        # Это обычный JSON value (был < SIZE_THRESHOLD)
        return obj

Configuration

Чтобы Airflow использовал ваш backend:

# airflow.cfg
[core]
xcom_backend = plugins.s3_xcom_backend.S3XComBackend

Или через env variable:

export AIRFLOW__CORE__XCOM_BACKEND=plugins.s3_xcom_backend.S3XComBackend

После перезапуска scheduler+webserver+workers — все XCom операции идут через ваш backend.

WARNING

Backend class должен быть importable во ВСЕХ компонентах Airflow: scheduler, webserver, workers, triggerer. Если plugin находится в plugins/, убедитесь что plugins/ в PYTHONPATH. В Helm chart — добавьте в values для всех components.


Pattern 2: Reference-only (no DB blob)

Альтернативный pattern — хранить в DB только S3 reference, без копирования small values:

@staticmethod
def serialize_value(value, **meta):
    # Всегда в S3, даже маленькие
    s3_key = S3XComBackend._build_s3_key(**meta)
    hook = S3Hook(aws_conn_id=S3_CONN_ID)
    hook.load_string(string_data=json.dumps(value), key=s3_key, bucket_name=S3_BUCKET)
    return f"s3://{S3_BUCKET}/{s3_key}".encode()

Pros:

  • Uniform behavior
  • DB совсем маленькая

Cons:

  • S3 latency на каждый pull (~50-200ms)
  • Cost S3 PUT/GET requests
  • При S3 outage — все XCom unreadable

Hybrid (recommended): small → DB, large/complex → S3. См. код выше с SIZE_THRESHOLD.


Pattern 3: Specialized format

Для pandas/numpy/Arrow — нет смысла JSON. Используйте native format:

TypeFormatWhy
pandas.DataFrameParquet (snappy/zstd)Columnar, типы preserved, compression
numpy.ndarraynpy binaryNative, fast
pyarrow.TableArrow IPC streamZero-copy в downstream
dict/list < 1MBJSONПростой, debug-friendly
Polars DataFrameParquet или IPCZero-copy Arrow

Это даёт 5-50× меньше bytes на S3 vs JSON, и быстрее sed/des.


Security implications

Custom Backend — это код в pipeline trust chain. Несколько важных аспектов:

1. Service account permissions

S3 bucket airflow-xcom-storage должен быть доступен только Airflow workers/scheduler. Никаких public reads, никаких cross-account без явных policies.

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": {"AWS": "arn:aws:iam::123456789012:role/airflow-worker"},
    "Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
    "Resource": "arn:aws:s3:::airflow-xcom-storage/*"
  }]
}

2. Encryption at rest

hook.load_bytes(
    bytes_data=...,
    key=s3_key,
    bucket_name=S3_BUCKET,
    encrypt=True,  # SSE-S3 minimum
)

Для sensitive data — SSE-KMS с per-DAG KMS key.

3. Reference integrity

Простое JSON ref {"uri": "s3://..."} уязвимо к path traversal, если кто-то может modify DB:

# Атакующий вставляет:
{"_xcom_backend": "s3", "uri": "s3://airflow-xcom/../secrets-bucket/key"}

Mitigation: validate URI начинается с expected prefix:

def deserialize_value(result):
    obj = json.loads(result.value)
    uri = obj["uri"]
    if not uri.startswith(f"s3://{S3_BUCKET}/xcom/"):
        raise ValueError("Invalid XCom S3 path")

4. PII leakage

XCom может содержать PII (user emails, IDs). S3 bucket должен иметь:

  • Lifecycle policy — auto-delete после 90 дней
  • CloudTrail logging
  • Backup encryption

Reference vs full storage trade-off

Альтернативный подход: вместо положить data в S3, pass URI as XCom value, а task сам читает.

# Без Custom Backend, ручной reference passing:
@task
def producer() -> str:  # ← возвращает только URI
    df = generate_huge_df()
    uri = f"s3://staging/output/{run_id}.parquet"
    df.to_parquet(uri)
    return uri

@task
def consumer(input_uri: str):
    df = pd.read_parquet(input_uri)
    ...

Pros vs Custom Backend:

  • Явный data lifecycle (видно в коде)
  • Нет magic — debugging easier
  • Task code знает о storage layer (можно tune compression, partitioning)

Cons:

  • Boilerplate в каждом DAG
  • Lifecycle management ручной
  • Нет uniform pattern across team

Когда что:

  • Если у вас много больших XCom в разных DAGs → Custom Backend (DRY)
  • Если pipelines specifically работают с files/datasets → reference passing (явность лучше)

Production gotchas

Gotcha 1: backend в plugin не в PYTHONPATH workers

Класс должен импортироваться identically в scheduler, webserver, workers. На K8s часто scheduler видит plugins/, но worker pod — нет.

Fix: embed backend в airflow-extensions Python package, install via pip install в Dockerfile workers.

Gotcha 2: S3 outage = DAG-стоп

Если S3 down, все XCom операции fail. Workflow:

  • Producer fail → retry → fail → DAG run failed
  • Consumer не может pull → DAG run failed
  • Backfill blocked

Mitigation: для critical DAGs — fallback на DB blob если S3 недоступен > N retries.

Gotcha 3: S3 cost storm

Каждый XCom = S3 PUT request. 10k DAG-ов × 10 tasks × 1 push/run × 24 runs/day = 2.4M PUT/day = $12/day только requests, плюс GB storage.

Mitigation: SIZE_THRESHOLD — small XCom в DB, S3 только для large. Lifecycle policy delete после 30 дней.

Gotcha 4: serialize_value на mapped tasks

process.expand(file=get_files())  # 100 mapped TI

Каждый из 100 TI пишет свой XCom в S3. S3 path должен быть unique per map_index. В нашем backend это уже сделано через _build_s3_key с map_index. Если забыли — все 100 пишут в один S3 key, ovewriting.

Gotcha 5: webserver не может deserialize

Webserver UI показывает XCom value в task details. Если backend требует S3 access — webserver pod нужны те же IAM permissions, что и worker. Иначе UI показывает error.


Hands-on: тестирование backend

# tests/test_s3_xcom_backend.py
import pytest
import pandas as pd
from unittest.mock import MagicMock, patch
from plugins.s3_xcom_backend import S3XComBackend


@patch("plugins.s3_xcom_backend.S3Hook")
def test_serialize_pandas_df(mock_hook):
    df = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})
    result = S3XComBackend.serialize_value(
        df,
        key="return_value"
        task_id="producer"
        dag_id="test_dag"
        run_id="2026-05-12T00:00:00"
        map_index=None,
    )
    # Should produce JSON ref
    import json
    ref = json.loads(result)
    assert ref["_xcom_backend"] == "s3"
    assert ref["_xcom_type"] == "pandas.DataFrame"
    assert ref["rows"] == 3
    mock_hook.return_value.load_bytes.assert_called_once()


def test_serialize_small_dict_stays_in_db():
    value = {"small": "value"}
    result = S3XComBackend.serialize_value(value, key="k", task_id="t", dag_id="d", run_id="r", map_index=None)
    # Should NOT be S3 ref (below threshold)
    assert b"_xcom_backend" not in result

Проверка знанийKnowledge check
Опиши hybrid Custom XCom Backend: какие values хранятся в DB, какие в S3, почему именно так? Какие 3 main failure mode и mitigation?
ОтветAnswer
Hybrid backend: малые JSON-serializable (< 32KB, например status dicts, small lists) хранятся прямо в DB (как default XCom) — это zero S3 overhead, fast, low cost. Большие или сложные типы (pandas DataFrame, numpy arrays, > 32KB JSON) идут в S3 как Parquet/native format, в DB остаётся только JSON reference {'uri': 's3://...', 'type': 'pandas'} ~200 bytes. Почему: best of both worlds — DB не раздувается на больших payload, S3 cost минимальный (не каждый XCom туда), latency маленьких pulls остаётся fast (no S3 hit). **3 failure modes**: (1) **S3 outage**: все large XCom unreadable, DAGs stuck. Mitigation: retry с exponential backoff, fallback на error state с явным alerting. (2) **Plugin import failure**: backend класс должен быть в PYTHONPATH ВСЕХ компонентов (scheduler, webserver, workers, triggerer). Mitigation: embed в Python package, install через pip в Dockerfile, не полагаться на plugins/ folder. (3) **Path traversal/PII leak**: атакующий с DB write access может inject malicious S3 URI. Mitigation: validate URI prefix в deserialize_value, IAM scope-down (worker может писать только под xcom/<dag_id>/...), SSE-KMS encryption, S3 lifecycle policy для auto-delete.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Custom XCom Backend — какой contract должен implement класс?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6