Custom XCom Backend — S3/GCS offload
Это самая практически важная тема модуля. Если в production вы передаёте через XCom что-то крупнее 1KB pandas/numpy/Parquet — нужен Custom XCom Backend. Это не optional optimization, это must-have для любого data engineering workflow выше “Hello World”.
В этом уроке мы разберём механизм Custom Backend от первого принципа, напишем полнофункциональный S3 backend, обсудим reference vs full storage patterns и security implications.
Зачем нужен Custom Backend
Default backend хранит XCom value прямо в metadata DB. Это:
- Удобно (no extra infra)
- Дёшево для маленьких значений
- Катастрофа для большого payload
Custom Backend позволяет переместить storage в специализированное место (S3/GCS/Redis/любое), а в DB хранить только pointer/metadata. Преимущества:
| Aspect | Default (DB) | Custom (S3) |
|---|---|---|
| Max size | ~48KB practical | TB (object storage limits) |
| Cost | DB storage (expensive) | S3 storage (cheap) |
| Read perf large | Bad (ORM hydration) | Good (streaming) |
| DB load | High XCom IO | Low — только metadata |
| Format | JSON/pickle bytes | Custom (Parquet, Arrow, etc) |
| Lifecycle | Manual cleanup | S3 lifecycle policies |
BaseXCom contract
Custom Backend — это subclass airflow.models.xcom.BaseXCom. Override два static method:
from airflow.models.xcom import BaseXCom
from typing import Any
class MyXComBackend(BaseXCom):
@staticmethod
def serialize_value(
value: Any,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> bytes:
"""Convert Python value → bytes для хранения в xcom.value column."""
...
@staticmethod
def deserialize_value(result) -> Any:
"""Convert XCom DB row → Python value."""
...
Что важно:
serialize_valueполучает Python object + metadata о task → возвращаетbytes(хранится в DB)deserialize_valueполучает SQLAlchemyXComrow → возвращает Python object- Метаданные (
task_id,dag_id,run_id,map_index) полезны для строения S3 path
Pattern 1: Full storage в S3 (reference в DB)
Самый common pattern: actual value хранится в S3, в DB колонка value содержит S3 URI (или JSON с metadata).
Full code: S3XComBackend
Production-grade пример:
# plugins/s3_xcom_backend.py
import json
import io
import uuid
from typing import Any
import logging
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from airflow.models.xcom import BaseXCom
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
log = logging.getLogger(__name__)
S3_CONN_ID = "aws_default"
S3_BUCKET = "airflow-xcom-storage"
SIZE_THRESHOLD = 32 * 1024 # 32KB — ниже храним в DB как JSON
class S3XComBackend(BaseXCom):
"""
Custom XCom backend для offload больших значений в S3.
Стратегия:
- Малые (< 32KB) JSON-serializable → хранить в DB (как default)
- Большие или сложные типы (pandas, numpy) → Parquet в S3, ref в DB
"""
PREFIX = "s3_xcom://"
@staticmethod
def _build_s3_key(dag_id, task_id, run_id, map_index, key):
# Детерминированный путь (без uuid) для idempotent rerun
safe_run = run_id.replace(":", "_").replace("+", "_")
return (
f"xcom/{dag_id}/{safe_run}/"
f"{task_id}/map_{map_index if map_index is not None else -1}/"
f"{key}.parquet"
)
@staticmethod
def serialize_value(
value: Any,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> bytes:
# Case 1: pandas DataFrame → Parquet → S3
if isinstance(value, pd.DataFrame):
s3_key = S3XComBackend._build_s3_key(dag_id, task_id, run_id, map_index, key)
buffer = io.BytesIO()
table = pa.Table.from_pandas(value)
pq.write_table(table, buffer, compression="snappy")
buffer.seek(0)
hook = S3Hook(aws_conn_id=S3_CONN_ID)
hook.load_bytes(
bytes_data=buffer.getvalue(),
key=s3_key,
bucket_name=S3_BUCKET,
replace=True,
)
ref = {
"_xcom_backend": "s3",
"_xcom_type": "pandas.DataFrame",
"uri": f"s3://{S3_BUCKET}/{s3_key}",
"rows": len(value),
"size_bytes": len(buffer.getvalue()),
}
log.info("XCom S3 upload: %s (rows=%d, size=%d)", s3_key, ref["rows"], ref["size_bytes"])
return json.dumps(ref).encode("utf-8")
# Case 2: try default JSON serialization
try:
serialized = BaseXCom.serialize_value(value, key=key, task_id=task_id, dag_id=dag_id, run_id=run_id, map_index=map_index)
if len(serialized) <= SIZE_THRESHOLD:
# Маленькое → как обычно в DB
return serialized
# Большое JSON → тоже в S3
s3_key = S3XComBackend._build_s3_key(dag_id, task_id, run_id, map_index, key) + ".json"
hook = S3Hook(aws_conn_id=S3_CONN_ID)
hook.load_bytes(bytes_data=serialized, key=s3_key, bucket_name=S3_BUCKET, replace=True)
ref = {
"_xcom_backend": "s3",
"_xcom_type": "json",
"uri": f"s3://{S3_BUCKET}/{s3_key}",
"size_bytes": len(serialized),
}
return json.dumps(ref).encode("utf-8")
except (TypeError, ValueError) as e:
raise TypeError(
f"Cannot serialize {type(value)} via S3XComBackend. "
f"Add explicit handler or use supported type. Original: {e}"
)
@staticmethod
def deserialize_value(result) -> Any:
raw_bytes = result.value
if not raw_bytes:
return None
# Попробовать как ref JSON
try:
obj = json.loads(raw_bytes.decode("utf-8"))
except (json.JSONDecodeError, UnicodeDecodeError):
# Не наш формат — fallback на default
return BaseXCom.deserialize_value(result)
if isinstance(obj, dict) and obj.get("_xcom_backend") == "s3":
uri = obj["uri"]
xcom_type = obj["_xcom_type"]
bucket, key = uri.replace("s3://", "").split("/", 1)
hook = S3Hook(aws_conn_id=S3_CONN_ID)
if xcom_type == "pandas.DataFrame":
data = hook.read_key(key, bucket_name=bucket)
buffer = io.BytesIO(data.encode() if isinstance(data, str) else data)
table = pq.read_table(buffer)
return table.to_pandas()
elif xcom_type == "json":
data = hook.read_key(key, bucket_name=bucket)
return json.loads(data)
else:
raise ValueError(f"Unknown _xcom_type: {xcom_type}")
# Это обычный JSON value (был < SIZE_THRESHOLD)
return obj
Configuration
Чтобы Airflow использовал ваш backend:
# airflow.cfg
[core]
xcom_backend = plugins.s3_xcom_backend.S3XComBackend
Или через env variable:
export AIRFLOW__CORE__XCOM_BACKEND=plugins.s3_xcom_backend.S3XComBackend
После перезапуска scheduler+webserver+workers — все XCom операции идут через ваш backend.
Backend class должен быть importable во ВСЕХ компонентах Airflow: scheduler, webserver, workers, triggerer. Если plugin находится в plugins/, убедитесь что plugins/ в PYTHONPATH. В Helm chart — добавьте в values для всех components.
Pattern 2: Reference-only (no DB blob)
Альтернативный pattern — хранить в DB только S3 reference, без копирования small values:
@staticmethod
def serialize_value(value, **meta):
# Всегда в S3, даже маленькие
s3_key = S3XComBackend._build_s3_key(**meta)
hook = S3Hook(aws_conn_id=S3_CONN_ID)
hook.load_string(string_data=json.dumps(value), key=s3_key, bucket_name=S3_BUCKET)
return f"s3://{S3_BUCKET}/{s3_key}".encode()
Pros:
- Uniform behavior
- DB совсем маленькая
Cons:
- S3 latency на каждый pull (~50-200ms)
- Cost S3 PUT/GET requests
- При S3 outage — все XCom unreadable
Hybrid (recommended): small → DB, large/complex → S3. См. код выше с SIZE_THRESHOLD.
Pattern 3: Specialized format
Для pandas/numpy/Arrow — нет смысла JSON. Используйте native format:
| Type | Format | Why |
|---|---|---|
pandas.DataFrame | Parquet (snappy/zstd) | Columnar, типы preserved, compression |
numpy.ndarray | npy binary | Native, fast |
pyarrow.Table | Arrow IPC stream | Zero-copy в downstream |
dict/list < 1MB | JSON | Простой, debug-friendly |
Polars DataFrame | Parquet или IPC | Zero-copy Arrow |
Это даёт 5-50× меньше bytes на S3 vs JSON, и быстрее sed/des.
Security implications
Custom Backend — это код в pipeline trust chain. Несколько важных аспектов:
1. Service account permissions
S3 bucket airflow-xcom-storage должен быть доступен только Airflow workers/scheduler. Никаких public reads, никаких cross-account без явных policies.
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"AWS": "arn:aws:iam::123456789012:role/airflow-worker"},
"Action": ["s3:GetObject", "s3:PutObject", "s3:DeleteObject"],
"Resource": "arn:aws:s3:::airflow-xcom-storage/*"
}]
}
2. Encryption at rest
hook.load_bytes(
bytes_data=...,
key=s3_key,
bucket_name=S3_BUCKET,
encrypt=True, # SSE-S3 minimum
)
Для sensitive data — SSE-KMS с per-DAG KMS key.
3. Reference integrity
Простое JSON ref {"uri": "s3://..."} уязвимо к path traversal, если кто-то может modify DB:
# Атакующий вставляет:
{"_xcom_backend": "s3", "uri": "s3://airflow-xcom/../secrets-bucket/key"}
Mitigation: validate URI начинается с expected prefix:
def deserialize_value(result):
obj = json.loads(result.value)
uri = obj["uri"]
if not uri.startswith(f"s3://{S3_BUCKET}/xcom/"):
raise ValueError("Invalid XCom S3 path")
4. PII leakage
XCom может содержать PII (user emails, IDs). S3 bucket должен иметь:
- Lifecycle policy — auto-delete после 90 дней
- CloudTrail logging
- Backup encryption
Reference vs full storage trade-off
Альтернативный подход: вместо положить data в S3, pass URI as XCom value, а task сам читает.
# Без Custom Backend, ручной reference passing:
@task
def producer() -> str: # ← возвращает только URI
df = generate_huge_df()
uri = f"s3://staging/output/{run_id}.parquet"
df.to_parquet(uri)
return uri
@task
def consumer(input_uri: str):
df = pd.read_parquet(input_uri)
...
Pros vs Custom Backend:
- Явный data lifecycle (видно в коде)
- Нет magic — debugging easier
- Task code знает о storage layer (можно tune compression, partitioning)
Cons:
- Boilerplate в каждом DAG
- Lifecycle management ручной
- Нет uniform pattern across team
Когда что:
- Если у вас много больших XCom в разных DAGs → Custom Backend (DRY)
- Если pipelines specifically работают с files/datasets → reference passing (явность лучше)
Production gotchas
Gotcha 1: backend в plugin не в PYTHONPATH workers
Класс должен импортироваться identically в scheduler, webserver, workers. На K8s часто scheduler видит plugins/, но worker pod — нет.
Fix: embed backend в airflow-extensions Python package, install via pip install в Dockerfile workers.
Gotcha 2: S3 outage = DAG-стоп
Если S3 down, все XCom операции fail. Workflow:
- Producer fail → retry → fail → DAG run failed
- Consumer не может pull → DAG run failed
- Backfill blocked
Mitigation: для critical DAGs — fallback на DB blob если S3 недоступен > N retries.
Gotcha 3: S3 cost storm
Каждый XCom = S3 PUT request. 10k DAG-ов × 10 tasks × 1 push/run × 24 runs/day = 2.4M PUT/day = $12/day только requests, плюс GB storage.
Mitigation: SIZE_THRESHOLD — small XCom в DB, S3 только для large. Lifecycle policy delete после 30 дней.
Gotcha 4: serialize_value на mapped tasks
process.expand(file=get_files()) # 100 mapped TI
Каждый из 100 TI пишет свой XCom в S3. S3 path должен быть unique per map_index. В нашем backend это уже сделано через _build_s3_key с map_index. Если забыли — все 100 пишут в один S3 key, ovewriting.
Gotcha 5: webserver не может deserialize
Webserver UI показывает XCom value в task details. Если backend требует S3 access — webserver pod нужны те же IAM permissions, что и worker. Иначе UI показывает error.
Hands-on: тестирование backend
# tests/test_s3_xcom_backend.py
import pytest
import pandas as pd
from unittest.mock import MagicMock, patch
from plugins.s3_xcom_backend import S3XComBackend
@patch("plugins.s3_xcom_backend.S3Hook")
def test_serialize_pandas_df(mock_hook):
df = pd.DataFrame({"a": [1, 2, 3], "b": ["x", "y", "z"]})
result = S3XComBackend.serialize_value(
df,
key="return_value"
task_id="producer"
dag_id="test_dag"
run_id="2026-05-12T00:00:00"
map_index=None,
)
# Should produce JSON ref
import json
ref = json.loads(result)
assert ref["_xcom_backend"] == "s3"
assert ref["_xcom_type"] == "pandas.DataFrame"
assert ref["rows"] == 3
mock_hook.return_value.load_bytes.assert_called_once()
def test_serialize_small_dict_stays_in_db():
value = {"small": "value"}
result = S3XComBackend.serialize_value(value, key="k", task_id="t", dag_id="d", run_id="r", map_index=None)
# Should NOT be S3 ref (below threshold)
assert b"_xcom_backend" not in result