Default XCom backend — DB storage и serialization
XCom (cross-communication) — это первичный механизм передачи данных между tasks в Airflow. На surface уровне он выглядит обманчиво просто: ti.xcom_push("key", value) положил, ti.xcom_pull(task_ids="...") достал. Но под капотом скрывается DB-таблица, сериализация и практический лимит ~48KB, который ломает наивные подходы при работе с pandas DataFrame или большими JSON.
Этот урок препарирует default backend Airflow 2.x: schema таблицы, как scheduler/worker push/pull, какой serializer используется, и почему 48KB — это не cap, а soft limit с реальными performance implications.
Таблица xcom — что внутри metadata DB
XCom хранится в metadata DB (PostgreSQL рекомендован) в таблице xcom. Полная schema из Airflow 2.10/2.11:
CREATE TABLE xcom (
dag_run_id INTEGER NOT NULL,
task_id VARCHAR(250) NOT NULL,
map_index INTEGER NOT NULL DEFAULT -1,
key VARCHAR(512) NOT NULL,
dag_id VARCHAR(250) NOT NULL,
run_id VARCHAR(250) NOT NULL,
value BYTEA, -- PostgreSQL: bytea, MySQL: mediumblob
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
PRIMARY KEY (dag_run_id, task_id, map_index, key),
FOREIGN KEY (dag_run_id) REFERENCES dag_run(id) ON DELETE CASCADE
);
CREATE INDEX idx_xcom_key ON xcom (key);
CREATE INDEX idx_xcom_task_id ON xcom (dag_id, task_id, run_id);
Разбор полей:
| Колонка | Назначение |
|---|---|
dag_run_id | FK на конкретный DagRun (внутренний INT id) — определяет per-run isolation |
task_id | Какая task запушила |
map_index | -1 для обычных tasks; 0..N-1 для Dynamic Task Mapping (см. урок 06) |
key | Имя XCom. По default return_value для return из @task |
dag_id, run_id | Денормализация для быстрого fetch без join |
value | Сериализованный blob (по default JSON UTF-8 bytes) |
timestamp | Когда запушен |
Primary key (dag_run_id, task_id, map_index, key) означает: для одной TI с одним key в одном DagRun возможен только один XCom. Повторный push с тем же key — это UPSERT (delete + insert).
Lifecycle: push → store → pull
Ключевое: XCom transit через DB. Это значит DB hit на каждый push и pull. Для маленьких payload (< 1KB) — незаметно. Для больших — DB становится bottleneck.
Сериализация: JSON (default) vs Pickle (legacy)
В Airflow 2.x по default используется JSON через модуль airflow.serialization.serde. Это breaking change от 1.x, где default был pickle.
JSON serializer (default)
# airflow/serialization/serde.py (упрощённо)
class XComEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return {"__type__": "datetime", "value": obj.isoformat()}
if isinstance(obj, Decimal):
return {"__type__": "decimal", "value": str(obj)}
if hasattr(obj, "__serialize__"):
return obj.__serialize__()
raise TypeError(f"Cannot serialize {type(obj)}")
Поддерживаемые типы:
- Primitives:
str,int,float,bool,None - Containers:
list,tuple,dict(keys должны быть str/int) - Special:
datetime,date,Decimal,UUID - Custom: классы с
__serialize__/__deserialize__методами
Не сериализуются по default:
pandas.DataFrame,numpy.ndarray- Arbitrary Python objects (любой class без явного контракта)
bytes(только текст)- Generators, file handles, sockets
При попытке push неподдерживаемого типа — TypeError или escalation до airflow.exceptions.AirflowException.
Pickle (legacy, security risk)
# airflow.cfg
[core]
enable_xcom_pickling = True
Pickle поддерживает любой Python object, но имеет два killer-проблемы:
- Deserialization attack:
pickle.loads()на untrusted bytes может выполнить arbitrary code (__reduce__magic). Если в multi-tenant Airflow один tenant пушит злонамеренный XCom — другой при pull выполняет код. - Version coupling: pickle blob привязан к Python version и class definitions. Upgrade Python 3.10 → 3.12 может сделать старые XCom unreadable.
Никогда не включайте enable_xcom_pickling=True в production с user-submitted DAG кодом. Pickle deserialization — известный RCE vector. Если нужны custom Python objects — используйте Custom XCom Backend (урок 04) с собственным контрактом сериализации.
Практический лимит ~48KB
В docs Airflow часто упоминается лимит “около 48KB”. Откуда эта цифра?
Это не hard limit в схеме (PostgreSQL bytea допускает до 1GB). Это soft practical threshold, состоящий из нескольких факторов:
| Фактор | Объяснение |
|---|---|
MySQL mediumblob | До 16MB. Но row size MySQL ~64KB → XCom blob 48KB + metadata fits |
| DB performance | SELECT с большими bytea — медленно. Scheduler делает много XCom queries |
| Network roundtrip | Worker pull через ORM hydrates весь row в Python memory |
| UI rendering | Webserver показывает XCom value в task instance details — большие blobs ломают UI |
| Logging | Airflow логирует XCom value в task log (truncated, но дорого) |
Что происходит при 100MB XCom (например, pandas DataFrame):
1. Producer: pickle/json.dumps(huge_df) — может занять seconds, eat memory
2. INSERT 100MB blob в xcom table — DB IO spike, replication lag
3. Consumer pull: ORM hydrates Python bytes — RAM spike на worker
4. Если 10 mapped tasks pushing 100MB each → 1GB через DB
Реальный production incident: команда положила pandas df 200MB через @task return. DAG parsing замедлился (scheduler читает recent XCom для UI hint), web UI стал отваливаться на 10s timeout, replication lag PG replica — 5min.
Rule of thumb:
- < 1KB — OK, нет вопросов
- 1KB-48KB — OK, не задумывайтесь
- 48KB-1MB — работает, но review (может быть anti-pattern)
-
1MB — migrate на Custom Backend (S3/GCS), см. урок 04
XCom push API
Implicit push через TaskFlow return
from airflow.decorators import dag, task
@task
def producer() -> dict:
return {"items": [1, 2, 3], "total": 6} # ← auto-push с key='return_value'
После execute() _PythonDecoratedOperator вызывает:
context["ti"].xcom_push(key="return_value", value=return_val)
Explicit push в classic operator
from airflow.operators.python import PythonOperator
def my_callable(**context):
ti = context["ti"]
ti.xcom_push(key="status", value="ok")
ti.xcom_push(key="count", value=42)
return "another_value" # → key='return_value' если do_xcom_push=True
PythonOperator(
task_id="multi_push"
python_callable=my_callable,
do_xcom_push=True, # default
)
Несколько pushes с разными keys → несколько строк в xcom для одной TI.
do_xcom_push=False — отключить return XCom
PythonOperator(
task_id="no_xcom"
python_callable=lambda: "huge_string"
do_xcom_push=False, # ← return НЕ попадёт в XCom
)
Полезно когда callable возвращает большой объект, но downstream его не использует. Default True — частая причина раздутого XCom.
Comparison: storage характеристики
| Тип | JSON-default | Pickle (deprecated) | Custom backend (урок 04) |
|---|---|---|---|
| Любые Python objects | Нет | Да | Зависит |
| Безопасность | Безопасно | RCE risk | Зависит |
| Performance large | Плохо | Плохо | Хорошо (S3 offload) |
| Backward compat | JSON portable | Python+class coupled | Зависит |
| Default Airflow 2.x | Да | По opt-in | Через config |
Production gotchas
Gotcha 1: do_xcom_push раздувает таблицу
Каждый PythonOperator по default пушит return. Если у вас 10k DAG-ов с 10 tasks по 1KB return — это 100MB в xcom table per run. Через год — xcom table > 1TB, scheduler queries тормозят.
Mitigation:
do_xcom_push=Falseдля tasks где return не используется[core] xcom_cleanup_on_dagrun_delete = True(default) — XCom удаляется при cleanup DagRun- Cron job для
DELETE FROM xcom WHERE timestamp < now() - interval '90 days'
Gotcha 2: complex types через TaskFlow
@task
def get_df() -> pd.DataFrame:
return pd.DataFrame({"a": [1, 2, 3]}) # ❌ JSON serializer не знает pd.DataFrame
Это упадёт на serialize. Workaround:
@task
def get_df() -> dict:
df = pd.DataFrame({"a": [1, 2, 3]})
return df.to_dict("records") # JSON-safe
Или Custom Backend с pandas-aware serializer.
Gotcha 3: xcom_pull без task_ids — expensive
ti.xcom_pull() # ❌ pulls all XCom from upstream tasks
Это делает SELECT * FROM xcom WHERE run_id=... AND task_id IN (upstream_list). Если upstream — 100 mapped tasks по 10KB — 1MB в Python memory worker-а. Лучше всегда явно task_ids=....
Gotcha 4: XCom не cross-DAG
XCom isolated per DagRun. Чтобы передать данные между DAG-ами — используйте Datasets (Module 08) или TriggerDagRunOperator с conf={}.
Hands-on: посмотреть XCom в DB
В живой PostgreSQL Airflow:
-- Top 10 largest XCom
SELECT
dag_id,
task_id,
run_id,
key,
map_index,
octet_length(value) AS size_bytes,
timestamp
FROM xcom
ORDER BY octet_length(value) DESC
LIMIT 10;
-- Распределение размеров
SELECT
CASE
WHEN octet_length(value) < 1024 THEN '< 1KB'
WHEN octet_length(value) < 48 * 1024 THEN '1-48KB'
WHEN octet_length(value) < 1024 * 1024 THEN '48KB-1MB'
ELSE '> 1MB'
END AS size_bucket,
COUNT(*) AS cnt,
pg_size_pretty(SUM(octet_length(value))) AS total
FROM xcom
GROUP BY 1
ORDER BY 1;
Если видите много > 1MB — пора на Custom Backend.