DAG Serialization
Один из ключевых performance механизмов Airflow 2.x — DAG Serialization. Когда DagFileProcessor парсит .py файл, он не только проверяет syntax, но и сериализует Python DAG-object в JSON и пишет в БД (таблица serialized_dag). После этого scheduler и webserver работают только с сериализованным представлением — не парсят .py повторно.
Это критично для production scale: webserver workers могут не иметь access к dags/ папке, scheduler не блокируется на parsing при scheduling decisions.
Зачем serialization
До Airflow 1.10.7 каждый component (scheduler, webserver, workers) парсил .py файлы независимо. Это давало:
- Memory bloat в webserver (DagBag cache на каждый worker)
- Inconsistency — разные components могли видеть разные версии DAG если файл менялся
- Coupling — webserver требовал access к dags/ папке
В 2.0 DAG Serialization стал mandatory. Архитектура:
DagFileProcessor parses .py
↓
creates DAG object in memory
↓
serializes to JSON
↓
INSERT/UPDATE serialized_dag table
↓
Scheduler, Webserver — read from serialized_dag
Таблица serialized_dag
\d serialized_dag
Column | Type
dag_id | varchar(250) PRIMARY KEY
fileloc | varchar(2000)
fileloc_hash | bigint
data | jsonb
last_updated | timestamp
dag_hash | varchar(32) -- MD5 of data
processor_subdir | varchar(2000)
data — JSONB с полной структурой DAG.
-- Размер сериализации
SELECT dag_id, length(data::text) AS json_size_bytes, last_updated
FROM serialized_dag
ORDER BY json_size_bytes DESC LIMIT 10;
-- DAGs изменённые недавно
SELECT dag_id, last_updated, dag_hash
FROM serialized_dag
WHERE last_updated > now() - interval '1 hour';
Что попадает в JSON
Типичная структура data:
{
"__version": 1,
"dag": {
"dag_id": "my_etl",
"fileloc": "/opt/airflow/dags/my_etl.py",
"schedule": "@daily",
"start_date": "2026-01-01T00:00:00+00:00",
"catchup": false,
"max_active_runs": 16,
"tags": ["etl"],
"tasks": [
{
"task_id": "extract",
"_task_type": "PythonOperator",
"_task_module": "airflow.operators.python",
"python_callable_name": "extract",
"owner": "data-team",
"retries": 2,
"retry_delay": 300,
"downstream_task_ids": ["transform"],
"_inlets": [],
"_outlets": []
},
...
],
"task_group": {...},
"params": {...},
"edge_info": {...},
"doc_md": "..."
}
}
Ключевое: task structure + dependencies + schedule + metadata сериализуется.
Что НЕ сериализуется:
- Python code функций (
python_callable_name— только имя, не сам код!) - Pickle objects (если только не enabled через
dag_serialization_legacy_pickle=True) - External state — Variables, Connections не embedded в DAG JSON
Это значит — workers всё равно должны иметь access к .py файлам для actual execution (загрузка функции по python_callable_name через import). Webserver — нет (только structure для рендеринга).
Когда происходит сериализация
DagFileProcessor сериализует DAG после каждого успешного parse:
# Псевдокод DagFileProcessor
def process_file(file_path):
module = importlib.import_module(file_path)
dags = find_dags_in_module(module)
for dag in dags:
# Validate
validate_dag(dag)
# Compute hash
new_hash = compute_dag_hash(dag.to_json())
# Compare with existing
existing = session.query(SerializedDag).filter_by(dag_id=dag.dag_id).first()
if existing and existing.dag_hash == new_hash:
# No changes — skip update
continue
# Upsert
serialized = SerializedDag(
dag_id=dag.dag_id,
data=dag.to_json(),
dag_hash=new_hash,
last_updated=now(),
)
session.merge(serialized)
session.commit()
Хеш — критическая оптимизация: если DAG не изменился, никаких UPDATE statements (огромная экономия на больших installations).
DAG Hash — invalidation
dag_hash — MD5 сериализованного JSON. При каждом parse hash recomputed:
import hashlib
new_hash = hashlib.md5(json.dumps(serialized_data, sort_keys=True).encode()).hexdigest()
Изменения которые меняют hash:
- Добавление/удаление tasks
- Изменение task arguments (retries, pool, etc.)
- Изменение default_args
- Изменение schedule, start_date
- Изменение dependencies (
>>)
Изменения которые НЕ меняют hash:
- Изменение implementation Python функций (только их names recorded)
- Изменение комментариев
- Изменение docstrings (если только не передаются как
doc_md)
Поэтому изменение кода функции не вызывает re-serialization — scheduler видит ту же structure. Workers load actual code through import при run.
Lazy loading в Webserver
Webserver не держит все DAGs в memory. Когда вы открываете graph view для конкретного DAG:
# Псевдокод webserver view
def graph_view(dag_id):
serialized = session.query(SerializedDag).filter_by(dag_id=dag_id).first()
if not serialized:
return 404
dag = deserialize_dag(serialized.data)
return render_graph(dag)
Lazy load per DAG при request. Это позволяет webserver scale на огромное число DAGs (10k+) без memory bloat.
В 2.x DagBag cache всё равно есть в worker processes — для cross-page operations (list DAGs, search). Cache invalidation через worker_refresh_interval (Module 01 lesson 03).
Скоро vs DB load
-- Production observability
SELECT
COUNT(*) AS total_dags,
pg_size_pretty(SUM(length(data::text))) AS total_size,
AVG(length(data::text)) AS avg_size_bytes,
MAX(length(data::text)) AS max_size_bytes
FROM serialized_dag;
Типичные значения:
- Small DAG (3-5 tasks): 2-5 KB
- Medium DAG (10-30 tasks): 10-50 KB
- Large DAG (100+ tasks через mapping): 100-500 KB
- 10k DAGs total: ~50-200 MB в БД
Это намного меньше чем DagBag cache в memory webserver (~500MB-2GB) — благодаря compact JSON и lazy loading.
Edge cases
Large DAGs
Если single DAG имеет 1000+ tasks (через generator pattern, не через .expand()), сериализация может стать большой (>1MB JSON). Scheduler может тормозить:
SELECT dag_id, length(data::text) AS size
FROM serialized_dag
WHERE length(data::text) > 1000000 -- > 1MB
ORDER BY size DESC;
Fix: Dynamic Task Mapping (expand()) — Module 07 — производит mapped TI runtime, не embedded в DAG structure.
Изменения custom operator
Если вы изменили MyCustomOperator.__init__ signature — нужен re-parse чтобы сериализация обновилась. По default это произойдёт автоматически при следующем parse cycle (min_file_process_interval секунд).
Force re-parse:
airflow dags reserialize # CLI команда — 2.7+
Или просто перезаписать .py файл (touch).
Backfill и serialization
Когда вы делаете backfill, scheduler читает serialized_dag по текущей версии — не той, что была в момент history. В 3.x (AIP-63) добавили DAG Versioning — каждый DagRun привязан к версии bundle. В 2.x этого нет — backfill использует current code.
Это значит:
- Если DAG topology менялась — backfill может вести себя странно (новые tasks без upstream)
- Best practice — не менять DAG structure агрессивно, или менять postponed
Manual deserialization
Можно работать с serialized DAG из своего кода:
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.session import provide_session
@provide_session
def get_dag_from_serialized(dag_id, session=None):
serialized = session.query(SerializedDagModel).filter_by(dag_id=dag_id).first()
if not serialized:
return None
return serialized.dag # ← property десериализует JSON в DAG object
Полезно для tools / scripts, которые хотят inspect DAGs без import .py.
Production tuning
[core]
# min_serialized_dag_update_interval — frequency обновлений (default 30s)
min_serialized_dag_update_interval = 30
# min_serialized_dag_fetch_interval — frequency reads (default 10s)
min_serialized_dag_fetch_interval = 10
# Compress serialized data (2.7+):
compress_serialized_dags = True # Save ~70% disk и DB IO
compress_serialized_dags = True (2.7+) — сериализация compressed с zstandard. Save DB space, faster read/write.