Object Storage XCom — готовый backend через fsspec
В Airflow 2.8 community добавил готовый Custom XCom Backend на основе Object Storage abstraction через провайдер apache-airflow-providers-common-io. Это spares вас от написания собственного backend (см. урок 04) — вы получаете S3/GCS/Azure/local FS unified через fsspec, с конфигурацией одной env переменной.
В этом уроке разберём XComObjectStorageBackend: что внутри, как настраивать, чем отличается от custom backend, performance и production considerations.
Что такое common.io provider
apache-airflow-providers-common-io — это пакет, который добавляет Object Storage abstraction в Airflow. Под капотом использует fsspec + специализированные filesystem implementations (s3fs, gcsfs, adlfs).
Установка:
pip install apache-airflow-providers-common-io
# Для S3:
pip install s3fs
# Для GCS:
pip install gcsfs
# Для Azure:
pip install adlfs
После установки доступен класс:
from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend
Архитектура: fsspec уровень
Это значит один backend класс работает с любым storage. Хотите switch с S3 на GCS — измените одну env variable.
Колоночное vs построчное хранение: ментальная модельConfiguration
Два конфиг параметра:
# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
# URL формат: <scheme>://<conn_id>@<bucket>/<prefix>
xcom_objectstorage_path = s3://aws_default@airflow-xcom-bucket/xcom
# Опционально: размер threshold (bytes) — больше идёт в Object Storage
xcom_objectstorage_threshold = -1 # -1 = всегда в Object Storage; 1024 = >1KB
xcom_objectstorage_compression = zstd # gzip, zstd, lz4, snappy, или пусто
Через env vars:
export AIRFLOW__CORE__XCOM_BACKEND=airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=s3://aws_default@airflow-xcom-bucket/xcom
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD=1024
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION=zstd
URL format detailed
s3://aws_default@airflow-xcom-bucket/xcom
│ │ │ │
│ │ │ └─ key prefix (folder)
│ │ └─ S3 bucket
│ └─ Airflow Connection ID (для creds)
└─ scheme (s3, gs, abfs, file)
Connection aws_default — обычная Airflow Connection с типом AWS. Здесь живут creds (access key, secret, или IAM role). fsspec auto-loads их из connection.
Эквивалентные:
s3://aws_default@bucket/prefix— Airflow Connections3://bucket/prefix— fallback на default boto3 chain (env, IAM role, ~/.aws/credentials)
Threshold pattern
Параметр xcom_objectstorage_threshold контролирует hybrid behavior:
| Threshold | Behavior |
|---|---|
-1 (default) | Всегда в Object Storage, никогда в DB |
0 | Любой XCom > 0 bytes — в Object Storage (всё) |
1024 | XCom < 1KB — в DB, >= 1KB — в Object Storage |
48 * 1024 | ”Like default DB” — < 48KB в DB, иначе в S3 |
Production recommendation: xcom_objectstorage_threshold = 1024 или 4096 — small JSON status в DB (fast pull), large data в Object Storage.
Code: что внутри backend
Упрощённая версия (реальная в airflow.providers.common.io.xcom.backend):
from airflow.io.path import ObjectStoragePath
from airflow.models.xcom import BaseXCom
from airflow.configuration import conf
import json
import uuid
class XComObjectStorageBackend(BaseXCom):
@staticmethod
def _get_path() -> ObjectStoragePath:
path_str = conf.get("common.io", "xcom_objectstorage_path")
return ObjectStoragePath(path_str)
@staticmethod
def _get_threshold() -> int:
return conf.getint("common.io", "xcom_objectstorage_threshold", fallback=-1)
@staticmethod
def _get_compression() -> str:
return conf.get("common.io", "xcom_objectstorage_compression", fallback="")
@staticmethod
def serialize_value(value, *, key=None, task_id=None, dag_id=None, run_id=None, map_index=None):
# Default JSON serialization first
s = BaseXCom.serialize_value(value, key=key, task_id=task_id, dag_id=dag_id, run_id=run_id, map_index=map_index)
threshold = XComObjectStorageBackend._get_threshold()
if threshold >= 0 and len(s) < threshold:
# Малое — в DB как обычно
return s
# Большое — в Object Storage
base_path = XComObjectStorageBackend._get_path()
compression = XComObjectStorageBackend._get_compression()
# Уникальный путь
ext = f".{compression}" if compression else ""
file_path = base_path / dag_id / run_id / task_id / f"{key}_{map_index or -1}_{uuid.uuid4()}{ext}"
# Запись с compression
if compression:
import zstandard, gzip # и т.д.
compressed = compress(s, compression)
with file_path.open("wb") as f:
f.write(compressed)
else:
with file_path.open("wb") as f:
f.write(s)
# В DB сохраняется только URI
return BaseXCom.serialize_value({"uri": str(file_path)}).encode()
@staticmethod
def deserialize_value(result):
raw = BaseXCom.deserialize_value(result)
# Если это dict с uri — fetch из Object Storage
if isinstance(raw, dict) and "uri" in raw and len(raw) == 1:
file_path = ObjectStoragePath(raw["uri"])
with file_path.open("rb") as f:
data = f.read()
# Decompress если нужно
if file_path.suffix in {".gzip", ".gz", ".zstd", ".zst", ".lz4", ".snappy"}:
data = decompress(data, file_path.suffix.lstrip("."))
return json.loads(data.decode())
return raw
Это простой, универсальный backend — работает с любыми JSON-serializable values.
XComObjectStorageBackend vs custom backend
| Aspect | XComObjectStorageBackend | Custom (урок 04) |
|---|---|---|
| Setup | One env var | Написать класс, deploy |
| Multi-cloud | Yes (fsspec) | Manual |
| Format | JSON (with compression) | Любой (Parquet, Arrow, etc) |
| Threshold logic | Built-in (config) | Manual |
| pandas DataFrame | Через JSON (slow, lossy) | Native Parquet |
| Maintenance | Maintained by Airflow community | Ваша ответственность |
| Compression | gzip/zstd/lz4/snappy | Manual |
Когда использовать готовый:
- Нужен generic backend для JSON-safe Python objects
- Не хочется maintain код
- Multi-cloud
- В команде нет deep Airflow expertise
Когда писать свой:
- Pandas/numpy/Arrow large objects — нужны native formats для perf
- Custom serialization logic (encryption, signing)
- Specific storage layout (например, Hive partitioning)
- Custom security (per-tenant KMS keys)
Performance
Benchmarks (synthetic, single worker, network local):
| Payload | Default DB (Postgres) | XComObjectStorageBackend S3 |
|---|---|---|
| 1KB JSON | 5ms push, 3ms pull | 50ms push, 80ms pull (S3 latency) |
| 100KB JSON | 25ms / 15ms | 80ms / 120ms |
| 10MB JSON | DB chokes — ~5s + replication impact | 500ms / 800ms |
| 100MB JSON | Часто OOM / timeout | 3s / 4s, OK |
| 1GB JSON | Не работает | 30s, не рекомендовано |
Ключевое:
- Small XCom: DB быстрее (no network)
- Medium: ~equivalent
- Large: Object Storage сильно быстрее
- Very large: Object Storage — только вариант
С threshold=1024 получаете best of both: small JSON fast в DB, large в S3.
Compression analysis
[common.io]
xcom_objectstorage_compression = zstd
| Compression | Speed | Ratio (text) | Notes |
|---|---|---|---|
| (none) | fastest | 1x | OK для small |
| gzip | medium | 3-5x | universal, slow на write |
| zstd | fast | 4-7x | recommended general purpose |
| lz4 | fastest compress | 2-3x | low CPU, низкий ratio |
| snappy | very fast | 2-3x | good для column data |
Для JSON XCom — zstd хороший default. Для бинарных blob — lz4.
Multi-cloud example
Один Airflow deployment, разные XCom storage per environment:
# Dev — local FS
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=file:///tmp/airflow-xcom
# Staging — GCS
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=gs://gcp_default@staging-airflow-xcom/xcom
# Prod — S3 prod
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=s3://aws_prod@prod-airflow-xcom/xcom
Никаких изменений в DAG code — backend сам routes в правильное место. Это огромное value для CI/CD.
Production gotchas
Gotcha 1: fsspec dep mismatch
Разные versions s3fs и aiobotocore несовместимы. Часто:
ImportError: cannot import name 'AioConfig' from 'aiobotocore'
Fix: pin versions в requirements.txt:
apache-airflow-providers-common-io==1.4.0
s3fs==2024.6.0
aiobotocore==2.13.0
Test в isolated venv перед deploy.
Gotcha 2: Connection ID не resolved
s3://aws_default@bucket/xcom
Если aws_default connection не существует — fsspec падает в default boto3 chain (env, IAM role). На K8s с IRSA — это work, но на dev может ломать.
Fix: явно define aws_default в Airflow connections даже если пустой (для clarity).
Gotcha 3: cleanup orphan files
XCom в Object Storage не удаляется автоматически при cleanup DagRun (в отличие от DB row). После 6 месяцев — S3 bucket с миллионами orphan blobs.
Fix: S3 lifecycle policy:
{
"Rules": [{
"Status": "Enabled",
"Filter": {"Prefix": "xcom/"},
"Expiration": {"Days": 90}
}]
}
Или DAG для cleanup:
@dag(schedule="@daily")
def xcom_storage_cleanup():
@task
def cleanup():
from airflow.io.path import ObjectStoragePath
from airflow.models.dagrun import DagRun
from sqlalchemy import select
from datetime import datetime, timedelta
cutoff = datetime.utcnow() - timedelta(days=30)
# Список active DagRun
# ... compare с blobs в S3
# ... delete orphans
Gotcha 4: list/glob операции дорогие
Webserver UI делает ls для показа XCom. На S3 это LIST request — может быть slow и costly если много объектов.
Fix: xcom_objectstorage_path должен быть dedicated bucket, не shared. И prefix структура должна быть hierarchical (xcom/<dag_id>/<run_id>/), не flat.
Gotcha 5: threshold confusion
threshold = -1 означает всегда в Object Storage (включая 10-byte status). Если хотите hybrid — установите положительное число (например 1024).
Gotcha 6: backend на mapped tasks
При mapped task process.expand(file=[...]) — каждый mapped TI пишет separate file в Object Storage. uuid в pathwithin backend гарантирует unique. OK для consistency, но если 1000 mapped TI — 1000 S3 PUT.
Migration: default → ObjectStorageBackend
Подход для production:
- Setup: создайте S3 bucket, IAM policy, Airflow Connection
- Install:
pip install apache-airflow-providers-common-io s3fs - Test в dev: один DAG с small XCom — verify works
- Switch threshold conservatively: начните с
48 * 1024(только >= 48KB в S3) — small XCom остаются в DB unchanged - Monitor: pg_size DB xcom table должна перестать расти, S3 cost grows controlled
- Tune: можно опускать threshold (4KB, 1KB) если хотите больше offload
Что НЕ переезжает автоматически: existing XCom в DB остаются там. Новые pushes идут через backend. Это OK — старые DagRun завершены.
Hands-on: пример end-to-end
# dags/test_object_xcom.py
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd
@dag(
schedule=None,
start_date=datetime(2026, 1, 1),
catchup=False,
)
def test_object_xcom():
@task
def produce() -> dict:
# Большой dict (~ 2MB)
return {
"items": [{"id": i, "name": f"item_{i}"} for i in range(50000)]
}
@task
def consume(data: dict):
print(f"Got {len(data['items'])} items")
consume(produce())
test_object_xcom()
С threshold=1024 → 2MB dict уйдёт в S3. В DB — только ref. Pull скачает с S3 → передаст в consumer. Никаких изменений в коде user-а.