Learning Platform
Глоссарий Troubleshooting
Урок 07.02 · 28 мин
Продвинутый
XComMetadata DBSerializationJSONPickle

Default XCom backend — DB storage и serialization

XCom (cross-communication) — это первичный механизм передачи данных между tasks в Airflow. На surface уровне он выглядит обманчиво просто: ti.xcom_push("key", value) положил, ti.xcom_pull(task_ids="...") достал. Но под капотом скрывается DB-таблица, сериализация и практический лимит ~48KB, который ломает наивные подходы при работе с pandas DataFrame или большими JSON.

Этот урок препарирует default backend Airflow 2.x: schema таблицы, как scheduler/worker push/pull, какой serializer используется, и почему 48KB — это не cap, а soft limit с реальными performance implications.


Таблица xcom — что внутри metadata DB

XCom хранится в metadata DB (PostgreSQL рекомендован) в таблице xcom. Полная schema из Airflow 2.10/2.11:

CREATE TABLE xcom (
    dag_run_id      INTEGER NOT NULL,
    task_id         VARCHAR(250) NOT NULL,
    map_index       INTEGER NOT NULL DEFAULT -1,
    key             VARCHAR(512) NOT NULL,
    dag_id          VARCHAR(250) NOT NULL,
    run_id          VARCHAR(250) NOT NULL,
    value           BYTEA,             -- PostgreSQL: bytea, MySQL: mediumblob
    timestamp       TIMESTAMP WITH TIME ZONE NOT NULL,

    PRIMARY KEY (dag_run_id, task_id, map_index, key),
    FOREIGN KEY (dag_run_id) REFERENCES dag_run(id) ON DELETE CASCADE
);

CREATE INDEX idx_xcom_key ON xcom (key);
CREATE INDEX idx_xcom_task_id ON xcom (dag_id, task_id, run_id);

Разбор полей:

КолонкаНазначение
dag_run_idFK на конкретный DagRun (внутренний INT id) — определяет per-run isolation
task_idКакая task запушила
map_index-1 для обычных tasks; 0..N-1 для Dynamic Task Mapping (см. урок 06)
keyИмя XCom. По default return_value для return из @task
dag_id, run_idДенормализация для быстрого fetch без join
valueСериализованный blob (по default JSON UTF-8 bytes)
timestampКогда запушен

Primary key (dag_run_id, task_id, map_index, key) означает: для одной TI с одним key в одном DagRun возможен только один XCom. Повторный push с тем же key — это UPSERT (delete + insert).


Lifecycle: push → store → pull

XCom push/pull через metadata DB
Worker — execute(producer)Worker запускает TaskInstance producer. Внутри callable выполняется return value (или ti.xcom_push). _PythonDecoratedOperator после return автоматически делает push.
serialize_value()
XComEncoder.encode → JSON bytesDefault — JSON через airflow.serialization.serde. Поддерживает dict/list/str/int/float/bool/datetime/Decimal. Custom типы — через encoder hooks или pickle (legacy).
INSERT INTO xcom
metadata DBWorker делает INSERT в xcom с PK (dag_run_id, task_id, map_index, key). Транзакция commit. Размер blob в bytea — практический лимит ~48KB до проблем с perf.
downstream worker — xcom_pull
Worker — execute(consumer)Другой worker (может на другой машине) запускает downstream task. Делает SELECT FROM xcom WHERE task_id='producer' AND run_id=...
deserialize_value()
XComDecoder.decode → Python objJSON bytes → Python dict. Если custom XCom backend — вызывается user-defined deserialize_value. TaskFlow подставляет результат в function argument.

Ключевое: XCom transit через DB. Это значит DB hit на каждый push и pull. Для маленьких payload (< 1KB) — незаметно. Для больших — DB становится bottleneck.


Сериализация: JSON (default) vs Pickle (legacy)

В Airflow 2.x по default используется JSON через модуль airflow.serialization.serde. Это breaking change от 1.x, где default был pickle.

JSON serializer (default)

# airflow/serialization/serde.py (упрощённо)
class XComEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return {"__type__": "datetime", "value": obj.isoformat()}
        if isinstance(obj, Decimal):
            return {"__type__": "decimal", "value": str(obj)}
        if hasattr(obj, "__serialize__"):
            return obj.__serialize__()
        raise TypeError(f"Cannot serialize {type(obj)}")

Поддерживаемые типы:

  • Primitives: str, int, float, bool, None
  • Containers: list, tuple, dict (keys должны быть str/int)
  • Special: datetime, date, Decimal, UUID
  • Custom: классы с __serialize__ / __deserialize__ методами

Не сериализуются по default:

  • pandas.DataFrame, numpy.ndarray
  • Arbitrary Python objects (любой class без явного контракта)
  • bytes (только текст)
  • Generators, file handles, sockets

При попытке push неподдерживаемого типа — TypeError или escalation до airflow.exceptions.AirflowException.

Pickle (legacy, security risk)

# airflow.cfg
[core]
enable_xcom_pickling = True

Pickle поддерживает любой Python object, но имеет два killer-проблемы:

  1. Deserialization attack: pickle.loads() на untrusted bytes может выполнить arbitrary code (__reduce__ magic). Если в multi-tenant Airflow один tenant пушит злонамеренный XCom — другой при pull выполняет код.
  2. Version coupling: pickle blob привязан к Python version и class definitions. Upgrade Python 3.10 → 3.12 может сделать старые XCom unreadable.
WARNING

Никогда не включайте enable_xcom_pickling=True в production с user-submitted DAG кодом. Pickle deserialization — известный RCE vector. Если нужны custom Python objects — используйте Custom XCom Backend (урок 04) с собственным контрактом сериализации.


Практический лимит ~48KB

В docs Airflow часто упоминается лимит “около 48KB”. Откуда эта цифра?

Это не hard limit в схеме (PostgreSQL bytea допускает до 1GB). Это soft practical threshold, состоящий из нескольких факторов:

ФакторОбъяснение
MySQL mediumblobДо 16MB. Но row size MySQL ~64KB → XCom blob 48KB + metadata fits
DB performanceSELECT с большими bytea — медленно. Scheduler делает много XCom queries
Network roundtripWorker pull через ORM hydrates весь row в Python memory
UI renderingWebserver показывает XCom value в task instance details — большие blobs ломают UI
LoggingAirflow логирует XCom value в task log (truncated, но дорого)

Что происходит при 100MB XCom (например, pandas DataFrame):

1. Producer: pickle/json.dumps(huge_df) — может занять seconds, eat memory
2. INSERT 100MB blob в xcom table — DB IO spike, replication lag
3. Consumer pull: ORM hydrates Python bytes — RAM spike на worker
4. Если 10 mapped tasks pushing 100MB each → 1GB через DB

Реальный production incident: команда положила pandas df 200MB через @task return. DAG parsing замедлился (scheduler читает recent XCom для UI hint), web UI стал отваливаться на 10s timeout, replication lag PG replica — 5min.

Rule of thumb:

  • < 1KB — OK, нет вопросов
  • 1KB-48KB — OK, не задумывайтесь
  • 48KB-1MB — работает, но review (может быть anti-pattern)
  • 1MB — migrate на Custom Backend (S3/GCS), см. урок 04


XCom push API

Implicit push через TaskFlow return

from airflow.decorators import dag, task

@task
def producer() -> dict:
    return {"items": [1, 2, 3], "total": 6}  # ← auto-push с key='return_value'

После execute() _PythonDecoratedOperator вызывает:

context["ti"].xcom_push(key="return_value", value=return_val)

Explicit push в classic operator

from airflow.operators.python import PythonOperator

def my_callable(**context):
    ti = context["ti"]
    ti.xcom_push(key="status", value="ok")
    ti.xcom_push(key="count", value=42)
    return "another_value"  # → key='return_value' если do_xcom_push=True

PythonOperator(
    task_id="multi_push"
    python_callable=my_callable,
    do_xcom_push=True,  # default
)

Несколько pushes с разными keys → несколько строк в xcom для одной TI.

do_xcom_push=False — отключить return XCom

PythonOperator(
    task_id="no_xcom"
    python_callable=lambda: "huge_string"
    do_xcom_push=False,  # ← return НЕ попадёт в XCom
)

Полезно когда callable возвращает большой объект, но downstream его не использует. Default True — частая причина раздутого XCom.


Comparison: storage характеристики

ТипJSON-defaultPickle (deprecated)Custom backend (урок 04)
Любые Python objectsНетДаЗависит
БезопасностьБезопасноRCE riskЗависит
Performance largeПлохоПлохоХорошо (S3 offload)
Backward compatJSON portablePython+class coupledЗависит
Default Airflow 2.xДаПо opt-inЧерез config

Production gotchas

Gotcha 1: do_xcom_push раздувает таблицу

Каждый PythonOperator по default пушит return. Если у вас 10k DAG-ов с 10 tasks по 1KB return — это 100MB в xcom table per run. Через год — xcom table > 1TB, scheduler queries тормозят.

Mitigation:

  • do_xcom_push=False для tasks где return не используется
  • [core] xcom_cleanup_on_dagrun_delete = True (default) — XCom удаляется при cleanup DagRun
  • Cron job для DELETE FROM xcom WHERE timestamp &lt; now() - interval '90 days'

Gotcha 2: complex types через TaskFlow

@task
def get_df() -> pd.DataFrame:
    return pd.DataFrame({"a": [1, 2, 3]})  # ❌ JSON serializer не знает pd.DataFrame

Это упадёт на serialize. Workaround:

@task
def get_df() -> dict:
    df = pd.DataFrame({"a": [1, 2, 3]})
    return df.to_dict("records")  # JSON-safe

Или Custom Backend с pandas-aware serializer.

Gotcha 3: xcom_pull без task_ids — expensive

ti.xcom_pull()  # ❌ pulls all XCom from upstream tasks

Это делает SELECT * FROM xcom WHERE run_id=... AND task_id IN (upstream_list). Если upstream — 100 mapped tasks по 10KB — 1MB в Python memory worker-а. Лучше всегда явно task_ids=....

Gotcha 4: XCom не cross-DAG

XCom isolated per DagRun. Чтобы передать данные между DAG-ами — используйте Datasets (Module 08) или TriggerDagRunOperator с conf={}.


Hands-on: посмотреть XCom в DB

В живой PostgreSQL Airflow:

-- Top 10 largest XCom
SELECT
    dag_id,
    task_id,
    run_id,
    key,
    map_index,
    octet_length(value) AS size_bytes,
    timestamp
FROM xcom
ORDER BY octet_length(value) DESC
LIMIT 10;

-- Распределение размеров
SELECT
    CASE
        WHEN octet_length(value) < 1024 THEN '< 1KB'
        WHEN octet_length(value) < 48 * 1024 THEN '1-48KB'
        WHEN octet_length(value) < 1024 * 1024 THEN '48KB-1MB'
        ELSE '> 1MB'
    END AS size_bucket,
    COUNT(*) AS cnt,
    pg_size_pretty(SUM(octet_length(value))) AS total
FROM xcom
GROUP BY 1
ORDER BY 1;

Если видите много > 1MB — пора на Custom Backend.


Проверка знанийKnowledge check
Почему рекомендуется лимит ~48KB на XCom value, если PostgreSQL bytea допускает до 1GB? Какие конкретные performance impacts?
ОтветAnswer
48KB — soft practical threshold, состоит из нескольких факторов. (1) MySQL row size ~64KB — XCom 48KB + metadata fits в один row, иначе TOAST/overflow pages. (2) DB performance: SELECT с большими bytea медленно через ORM hydration; scheduler делает много XCom queries при scheduling, UI rendering. (3) Network roundtrip: worker hydrates весь row в Python memory при pull. (4) Логирование: Airflow логирует XCom value в task log (truncated до 1KB, но дорого считать size). (5) UI: webserver показывает XCom blob в task instance details — большие blobs ломают rendering. Реальные impact на 100MB: producer тратит seconds на JSON serialize, DB IO spike + replication lag, worker RAM spike при pull. Production rule: < 1KB OK, 1-48KB OK без думать, 48KB-1MB review pattern, > 1MB обязательно Custom Backend на S3/GCS.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Где физически хранится XCom value в Airflow 2.x с default backend?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6