Learning Platform
Глоссарий Troubleshooting
Урок 07.05 · 24 мин
Продвинутый
XComObject Storagefsspeccommon.ioS3GCS

Object Storage XCom — готовый backend через fsspec

В Airflow 2.8 community добавил готовый Custom XCom Backend на основе Object Storage abstraction через провайдер apache-airflow-providers-common-io. Это spares вас от написания собственного backend (см. урок 04) — вы получаете S3/GCS/Azure/local FS unified через fsspec, с конфигурацией одной env переменной.

В этом уроке разберём XComObjectStorageBackend: что внутри, как настраивать, чем отличается от custom backend, performance и production considerations.


Что такое common.io provider

apache-airflow-providers-common-io — это пакет, который добавляет Object Storage abstraction в Airflow. Под капотом использует fsspec + специализированные filesystem implementations (s3fs, gcsfs, adlfs).

Установка:

pip install apache-airflow-providers-common-io
# Для S3:
pip install s3fs
# Для GCS:
pip install gcsfs
# Для Azure:
pip install adlfs

После установки доступен класс:

from airflow.providers.common.io.xcom.backend import XComObjectStorageBackend

Архитектура: fsspec уровень

Object Storage XCom — слой fsspec
Airflow XCom APIStandard XCom calls: xcom_push, xcom_pull. Не меняется — клиент-код такой же как для default backend.
XComObjectStorageBackendBackend класс из common.io. Override serialize_value / deserialize_value. Решает: маленькое → JSON в DB, большое → uri в Object Storage.
airflow.io.path.ObjectStoragePathPathlib-style abstraction поверх fsspec. URI form: s3://conn_id@bucket/key или gs://conn_id@bucket/key. Connection из Airflow connections.
fsspec AbstractFileSystemPython library для unified filesystem API. Stable interface: open(), exists(), rm(), ls(), get(), put(). Хранит state per connection.
s3fsS3-specific fsspec implementation. Под капотом boto3. Поддерживает multipart upload, retries, throttling.
gcsfsGCS-specific. Под капотом google-cloud-storage. ADC auth или service account JSON.
adlfsAzure Data Lake Storage Gen2. Под капотом azure-storage-file-datalake.
LocalFileSystemЛокальная FS для dev/testing. URI form file:///path.

Это значит один backend класс работает с любым storage. Хотите switch с S3 на GCS — измените одну env variable.

Колоночное vs построчное хранение: ментальная модель

Configuration

Два конфиг параметра:

# airflow.cfg
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend

[common.io]
# URL формат: <scheme>://<conn_id>@<bucket>/<prefix>
xcom_objectstorage_path = s3://aws_default@airflow-xcom-bucket/xcom

# Опционально: размер threshold (bytes) — больше идёт в Object Storage
xcom_objectstorage_threshold = -1   # -1 = всегда в Object Storage; 1024 = >1KB
xcom_objectstorage_compression = zstd   # gzip, zstd, lz4, snappy, или пусто

Через env vars:

export AIRFLOW__CORE__XCOM_BACKEND=airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=s3://aws_default@airflow-xcom-bucket/xcom
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_THRESHOLD=1024
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_COMPRESSION=zstd

URL format detailed

s3://aws_default@airflow-xcom-bucket/xcom
│   │             │                  │
│   │             │                  └─ key prefix (folder)
│   │             └─ S3 bucket
│   └─ Airflow Connection ID (для creds)
└─ scheme (s3, gs, abfs, file)

Connection aws_default — обычная Airflow Connection с типом AWS. Здесь живут creds (access key, secret, или IAM role). fsspec auto-loads их из connection.

Эквивалентные:

  • s3://aws_default@bucket/prefix — Airflow Connection
  • s3://bucket/prefix — fallback на default boto3 chain (env, IAM role, ~/.aws/credentials)

Threshold pattern

Параметр xcom_objectstorage_threshold контролирует hybrid behavior:

ThresholdBehavior
-1 (default)Всегда в Object Storage, никогда в DB
0Любой XCom > 0 bytes — в Object Storage (всё)
1024XCom < 1KB — в DB, >= 1KB — в Object Storage
48 * 1024”Like default DB” — < 48KB в DB, иначе в S3

Production recommendation: xcom_objectstorage_threshold = 1024 или 4096 — small JSON status в DB (fast pull), large data в Object Storage.


Code: что внутри backend

Упрощённая версия (реальная в airflow.providers.common.io.xcom.backend):

from airflow.io.path import ObjectStoragePath
from airflow.models.xcom import BaseXCom
from airflow.configuration import conf
import json
import uuid


class XComObjectStorageBackend(BaseXCom):
    @staticmethod
    def _get_path() -> ObjectStoragePath:
        path_str = conf.get("common.io", "xcom_objectstorage_path")
        return ObjectStoragePath(path_str)

    @staticmethod
    def _get_threshold() -> int:
        return conf.getint("common.io", "xcom_objectstorage_threshold", fallback=-1)

    @staticmethod
    def _get_compression() -> str:
        return conf.get("common.io", "xcom_objectstorage_compression", fallback="")

    @staticmethod
    def serialize_value(value, *, key=None, task_id=None, dag_id=None, run_id=None, map_index=None):
        # Default JSON serialization first
        s = BaseXCom.serialize_value(value, key=key, task_id=task_id, dag_id=dag_id, run_id=run_id, map_index=map_index)

        threshold = XComObjectStorageBackend._get_threshold()
        if threshold >= 0 and len(s) < threshold:
            # Малое — в DB как обычно
            return s

        # Большое — в Object Storage
        base_path = XComObjectStorageBackend._get_path()
        compression = XComObjectStorageBackend._get_compression()

        # Уникальный путь
        ext = f".{compression}" if compression else ""
        file_path = base_path / dag_id / run_id / task_id / f"{key}_{map_index or -1}_{uuid.uuid4()}{ext}"

        # Запись с compression
        if compression:
            import zstandard, gzip  # и т.д.
            compressed = compress(s, compression)
            with file_path.open("wb") as f:
                f.write(compressed)
        else:
            with file_path.open("wb") as f:
                f.write(s)

        # В DB сохраняется только URI
        return BaseXCom.serialize_value({"uri": str(file_path)}).encode()

    @staticmethod
    def deserialize_value(result):
        raw = BaseXCom.deserialize_value(result)
        # Если это dict с uri — fetch из Object Storage
        if isinstance(raw, dict) and "uri" in raw and len(raw) == 1:
            file_path = ObjectStoragePath(raw["uri"])
            with file_path.open("rb") as f:
                data = f.read()
            # Decompress если нужно
            if file_path.suffix in {".gzip", ".gz", ".zstd", ".zst", ".lz4", ".snappy"}:
                data = decompress(data, file_path.suffix.lstrip("."))
            return json.loads(data.decode())
        return raw

Это простой, универсальный backend — работает с любыми JSON-serializable values.


XComObjectStorageBackend vs custom backend

AspectXComObjectStorageBackendCustom (урок 04)
SetupOne env varНаписать класс, deploy
Multi-cloudYes (fsspec)Manual
FormatJSON (with compression)Любой (Parquet, Arrow, etc)
Threshold logicBuilt-in (config)Manual
pandas DataFrameЧерез JSON (slow, lossy)Native Parquet
MaintenanceMaintained by Airflow communityВаша ответственность
Compressiongzip/zstd/lz4/snappyManual

Когда использовать готовый:

  • Нужен generic backend для JSON-safe Python objects
  • Не хочется maintain код
  • Multi-cloud
  • В команде нет deep Airflow expertise

Когда писать свой:

  • Pandas/numpy/Arrow large objects — нужны native formats для perf
  • Custom serialization logic (encryption, signing)
  • Specific storage layout (например, Hive partitioning)
  • Custom security (per-tenant KMS keys)

Performance

Benchmarks (synthetic, single worker, network local):

PayloadDefault DB (Postgres)XComObjectStorageBackend S3
1KB JSON5ms push, 3ms pull50ms push, 80ms pull (S3 latency)
100KB JSON25ms / 15ms80ms / 120ms
10MB JSONDB chokes — ~5s + replication impact500ms / 800ms
100MB JSONЧасто OOM / timeout3s / 4s, OK
1GB JSONНе работает30s, не рекомендовано

Ключевое:

  • Small XCom: DB быстрее (no network)
  • Medium: ~equivalent
  • Large: Object Storage сильно быстрее
  • Very large: Object Storage — только вариант

С threshold=1024 получаете best of both: small JSON fast в DB, large в S3.


Compression analysis

[common.io]
xcom_objectstorage_compression = zstd
CompressionSpeedRatio (text)Notes
(none)fastest1xOK для small
gzipmedium3-5xuniversal, slow на write
zstdfast4-7xrecommended general purpose
lz4fastest compress2-3xlow CPU, низкий ratio
snappyvery fast2-3xgood для column data

Для JSON XCom — zstd хороший default. Для бинарных blob — lz4.


Multi-cloud example

Один Airflow deployment, разные XCom storage per environment:

# Dev — local FS
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=file:///tmp/airflow-xcom

# Staging — GCS
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=gs://gcp_default@staging-airflow-xcom/xcom

# Prod — S3 prod
export AIRFLOW__COMMON_IO__XCOM_OBJECTSTORAGE_PATH=s3://aws_prod@prod-airflow-xcom/xcom

Никаких изменений в DAG code — backend сам routes в правильное место. Это огромное value для CI/CD.


Production gotchas

Gotcha 1: fsspec dep mismatch

Разные versions s3fs и aiobotocore несовместимы. Часто:

ImportError: cannot import name 'AioConfig' from 'aiobotocore'

Fix: pin versions в requirements.txt:

apache-airflow-providers-common-io==1.4.0
s3fs==2024.6.0
aiobotocore==2.13.0

Test в isolated venv перед deploy.

Gotcha 2: Connection ID не resolved

s3://aws_default@bucket/xcom

Если aws_default connection не существует — fsspec падает в default boto3 chain (env, IAM role). На K8s с IRSA — это work, но на dev может ломать.

Fix: явно define aws_default в Airflow connections даже если пустой (для clarity).

Gotcha 3: cleanup orphan files

XCom в Object Storage не удаляется автоматически при cleanup DagRun (в отличие от DB row). После 6 месяцев — S3 bucket с миллионами orphan blobs.

Fix: S3 lifecycle policy:

{
  "Rules": [{
    "Status": "Enabled",
    "Filter": {"Prefix": "xcom/"},
    "Expiration": {"Days": 90}
  }]
}

Или DAG для cleanup:

@dag(schedule="@daily")
def xcom_storage_cleanup():
    @task
    def cleanup():
        from airflow.io.path import ObjectStoragePath
        from airflow.models.dagrun import DagRun
        from sqlalchemy import select
        from datetime import datetime, timedelta

        cutoff = datetime.utcnow() - timedelta(days=30)
        # Список active DagRun
        # ... compare с blobs в S3
        # ... delete orphans

Gotcha 4: list/glob операции дорогие

Webserver UI делает ls для показа XCom. На S3 это LIST request — может быть slow и costly если много объектов.

Fix: xcom_objectstorage_path должен быть dedicated bucket, не shared. И prefix структура должна быть hierarchical (xcom/<dag_id>/<run_id>/), не flat.

Gotcha 5: threshold confusion

threshold = -1 означает всегда в Object Storage (включая 10-byte status). Если хотите hybrid — установите положительное число (например 1024).

Gotcha 6: backend на mapped tasks

При mapped task process.expand(file=[...]) — каждый mapped TI пишет separate file в Object Storage. uuid в pathwithin backend гарантирует unique. OK для consistency, но если 1000 mapped TI — 1000 S3 PUT.


Migration: default → ObjectStorageBackend

Подход для production:

  1. Setup: создайте S3 bucket, IAM policy, Airflow Connection
  2. Install: pip install apache-airflow-providers-common-io s3fs
  3. Test в dev: один DAG с small XCom — verify works
  4. Switch threshold conservatively: начните с 48 * 1024 (только >= 48KB в S3) — small XCom остаются в DB unchanged
  5. Monitor: pg_size DB xcom table должна перестать расти, S3 cost grows controlled
  6. Tune: можно опускать threshold (4KB, 1KB) если хотите больше offload

Что НЕ переезжает автоматически: existing XCom в DB остаются там. Новые pushes идут через backend. Это OK — старые DagRun завершены.


Hands-on: пример end-to-end

# dags/test_object_xcom.py
from airflow.decorators import dag, task
from datetime import datetime
import pandas as pd

@dag(
    schedule=None,
    start_date=datetime(2026, 1, 1),
    catchup=False,
)
def test_object_xcom():
    @task
    def produce() -> dict:
        # Большой dict (~ 2MB)
        return {
            "items": [{"id": i, "name": f"item_{i}"} for i in range(50000)]
        }

    @task
    def consume(data: dict):
        print(f"Got {len(data['items'])} items")

    consume(produce())

test_object_xcom()

С threshold=1024 → 2MB dict уйдёт в S3. В DB — только ref. Pull скачает с S3 → передаст в consumer. Никаких изменений в коде user-а.


Проверка знанийKnowledge check
Когда использовать XComObjectStorageBackend (2.8+) vs custom backend на pandas Parquet (урок 04)? Какие 3 production-критичных настройки?
ОтветAnswer
**XComObjectStorageBackend** хорош когда: (1) JSON-serializable values достаточно, (2) нужен multi-cloud (S3/GCS/Azure) без code change, (3) команда не хочет maintain свой код, (4) preferred general-purpose решение. **Custom backend** обязателен когда: (1) pandas/numpy/Arrow large objects — JSON serialization slow и lossy (теряет dtypes), Parquet native сохраняет столбцы и сжимает в 5-10x; (2) custom security (per-tenant KMS, signing); (3) специфический storage layout (Hive partitioning); (4) специфические performance optimizations (chunking, async streaming). **3 production-критичные настройки**: (1) **xcom_objectstorage_threshold** — конкретно `1024-4096` (не дефолт -1!) — small JSON остаётся в DB для low latency, large идёт в Object Storage. Дефолт -1 значит ВСЕ XCom в S3, включая 10-byte status — overkill и latency hit. (2) **S3 lifecycle policy** — auto-delete после N дней. Backend НЕ удаляет файлы при DagRun cleanup → bucket растёт infinitely. Без policy — S3 cost storm. (3) **fsspec deps version pinning** — `s3fs`/`aiobotocore` версии конфликтуют между релизами. Pin в requirements.txt, test в isolated env. Иначе ImportError в production deploy. **Бонус**: compression=zstd для JSON XCom — 4-7x ratio, fast CPU.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое XComObjectStorageBackend (2.8+) и через какой layer работает?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6