Learning Platform
Глоссарий Troubleshooting
Урок 03.05 · 18 мин
Продвинутый
DAG Serializationserialized_dagJSONPerformance

DAG Serialization

Один из ключевых performance механизмов Airflow 2.x — DAG Serialization. Когда DagFileProcessor парсит .py файл, он не только проверяет syntax, но и сериализует Python DAG-object в JSON и пишет в БД (таблица serialized_dag). После этого scheduler и webserver работают только с сериализованным представлением — не парсят .py повторно.

Это критично для production scale: webserver workers могут не иметь access к dags/ папке, scheduler не блокируется на parsing при scheduling decisions.


Зачем serialization

До Airflow 1.10.7 каждый component (scheduler, webserver, workers) парсил .py файлы независимо. Это давало:

  • Memory bloat в webserver (DagBag cache на каждый worker)
  • Inconsistency — разные components могли видеть разные версии DAG если файл менялся
  • Coupling — webserver требовал access к dags/ папке

В 2.0 DAG Serialization стал mandatory. Архитектура:

DagFileProcessor parses .py

   creates DAG object in memory

   serializes to JSON

   INSERT/UPDATE serialized_dag table

Scheduler, Webserver — read from serialized_dag
DAG serialization pipeline + hash invalidation
dag.pyPython source file в /opt/airflow/dags/. Содержит @dag + tasks definitions. Изменения файла обнаруживаются DagFileProcessor через mtime check.
DagFileProcessor parses (every min_file_process_interval)
DAG object in memoryРезультат importlib.import_module() + DAG(...) instantiation. Содержит всю structure: tasks, dependencies, schedule, default_args. Существует временно в parsing child process.
DAG.to_json() — structure only, no code
JSON dataСериализованная structure: task_ids, python_callable_name (НЕ код!), arguments, dependencies, schedule. Compress опционально с zstandard (2.7+). НЕ включает: function bodies, Variables, Connections.
dag_hash = MD5(data)MD5 от sorted JSON. Используется для invalidation check: если hash совпадает с existing, никаких DB writes — огромная экономия при больших installations.
compare hash with existing row
hash unchanged → skipЕсли dag_hash в DB совпадает с newly computed — UPDATE statement не выполняется. Это критично для performance: при 10k DAGs без изменений нагрузка на DB минимальна.
hash changed → UPSERTINSERT ON CONFLICT UPDATE в serialized_dag table. Обновляются data, dag_hash, last_updated. Через min_serialized_dag_fetch_interval scheduler перечитает новую версию.
Scheduler reads serialized_dagSchedulerJob использует SerializedDagModel.read_all_dags() с TTL cache (min_serialized_dag_fetch_interval, default 10s). Не парсит .py — работает с JSON.
Webserver reads serialized_dagПри request graph view — lazy load конкретного DAG через SerializedDagModel. Deserialize JSON → DAG object для render. НЕ нужен access к dags/ folder.
Workers import .py at runtimeПри execute() task workers делают actual importlib import .py файла — потому что нужно реальное body Python function (его НЕТ в serialized JSON, только имя). Это причина почему workers всё ещё нуждаются в dags/ access.

Таблица serialized_dag

\d serialized_dag

Column            | Type
dag_id            | varchar(250)     PRIMARY KEY
fileloc           | varchar(2000)
fileloc_hash      | bigint
data              | jsonb
last_updated      | timestamp
dag_hash          | varchar(32)      -- MD5 of data
processor_subdir  | varchar(2000)

data — JSONB с полной структурой DAG.

-- Размер сериализации
SELECT dag_id, length(data::text) AS json_size_bytes, last_updated
FROM serialized_dag
ORDER BY json_size_bytes DESC LIMIT 10;

-- DAGs изменённые недавно
SELECT dag_id, last_updated, dag_hash
FROM serialized_dag
WHERE last_updated > now() - interval '1 hour';

Что попадает в JSON

Типичная структура data:

{
  "__version": 1,
  "dag": {
    "dag_id": "my_etl",
    "fileloc": "/opt/airflow/dags/my_etl.py",
    "schedule": "@daily",
    "start_date": "2026-01-01T00:00:00+00:00",
    "catchup": false,
    "max_active_runs": 16,
    "tags": ["etl"],
    "tasks": [
      {
        "task_id": "extract",
        "_task_type": "PythonOperator",
        "_task_module": "airflow.operators.python",
        "python_callable_name": "extract",
        "owner": "data-team",
        "retries": 2,
        "retry_delay": 300,
        "downstream_task_ids": ["transform"],
        "_inlets": [],
        "_outlets": []
      },
      ...
    ],
    "task_group": {...},
    "params": {...},
    "edge_info": {...},
    "doc_md": "..."
  }
}

Ключевое: task structure + dependencies + schedule + metadata сериализуется.

Что НЕ сериализуется:

  • Python code функций (python_callable_name — только имя, не сам код!)
  • Pickle objects (если только не enabled через dag_serialization_legacy_pickle=True)
  • External state — Variables, Connections не embedded в DAG JSON

Это значит — workers всё равно должны иметь access к .py файлам для actual execution (загрузка функции по python_callable_name через import). Webserver — нет (только structure для рендеринга).


Когда происходит сериализация

DagFileProcessor сериализует DAG после каждого успешного parse:

# Псевдокод DagFileProcessor
def process_file(file_path):
    module = importlib.import_module(file_path)
    dags = find_dags_in_module(module)

    for dag in dags:
        # Validate
        validate_dag(dag)

        # Compute hash
        new_hash = compute_dag_hash(dag.to_json())

        # Compare with existing
        existing = session.query(SerializedDag).filter_by(dag_id=dag.dag_id).first()
        if existing and existing.dag_hash == new_hash:
            # No changes — skip update
            continue

        # Upsert
        serialized = SerializedDag(
            dag_id=dag.dag_id,
            data=dag.to_json(),
            dag_hash=new_hash,
            last_updated=now(),
        )
        session.merge(serialized)

    session.commit()

Хеш — критическая оптимизация: если DAG не изменился, никаких UPDATE statements (огромная экономия на больших installations).


DAG Hash — invalidation

dag_hash — MD5 сериализованного JSON. При каждом parse hash recomputed:

import hashlib
new_hash = hashlib.md5(json.dumps(serialized_data, sort_keys=True).encode()).hexdigest()

Изменения которые меняют hash:

  • Добавление/удаление tasks
  • Изменение task arguments (retries, pool, etc.)
  • Изменение default_args
  • Изменение schedule, start_date
  • Изменение dependencies (>>)

Изменения которые НЕ меняют hash:

  • Изменение implementation Python функций (только их names recorded)
  • Изменение комментариев
  • Изменение docstrings (если только не передаются как doc_md)

Поэтому изменение кода функции не вызывает re-serialization — scheduler видит ту же structure. Workers load actual code through import при run.


Lazy loading в Webserver

Webserver не держит все DAGs в memory. Когда вы открываете graph view для конкретного DAG:

# Псевдокод webserver view
def graph_view(dag_id):
    serialized = session.query(SerializedDag).filter_by(dag_id=dag_id).first()
    if not serialized:
        return 404
    dag = deserialize_dag(serialized.data)
    return render_graph(dag)

Lazy load per DAG при request. Это позволяет webserver scale на огромное число DAGs (10k+) без memory bloat.

В 2.x DagBag cache всё равно есть в worker processes — для cross-page operations (list DAGs, search). Cache invalidation через worker_refresh_interval (Module 01 lesson 03).


Скоро vs DB load

-- Production observability
SELECT
    COUNT(*) AS total_dags,
    pg_size_pretty(SUM(length(data::text))) AS total_size,
    AVG(length(data::text)) AS avg_size_bytes,
    MAX(length(data::text)) AS max_size_bytes
FROM serialized_dag;

Типичные значения:

  • Small DAG (3-5 tasks): 2-5 KB
  • Medium DAG (10-30 tasks): 10-50 KB
  • Large DAG (100+ tasks через mapping): 100-500 KB
  • 10k DAGs total: ~50-200 MB в БД

Это намного меньше чем DagBag cache в memory webserver (~500MB-2GB) — благодаря compact JSON и lazy loading.


Edge cases

Large DAGs

Если single DAG имеет 1000+ tasks (через generator pattern, не через .expand()), сериализация может стать большой (>1MB JSON). Scheduler может тормозить:

SELECT dag_id, length(data::text) AS size
FROM serialized_dag
WHERE length(data::text) > 1000000  -- > 1MB
ORDER BY size DESC;

Fix: Dynamic Task Mapping (expand()) — Module 07 — производит mapped TI runtime, не embedded в DAG structure.

Изменения custom operator

Если вы изменили MyCustomOperator.__init__ signature — нужен re-parse чтобы сериализация обновилась. По default это произойдёт автоматически при следующем parse cycle (min_file_process_interval секунд).

Force re-parse:

airflow dags reserialize  # CLI команда — 2.7+

Или просто перезаписать .py файл (touch).

Backfill и serialization

Когда вы делаете backfill, scheduler читает serialized_dag по текущей версии — не той, что была в момент history. В 3.x (AIP-63) добавили DAG Versioning — каждый DagRun привязан к версии bundle. В 2.x этого нет — backfill использует current code.

Это значит:

  • Если DAG topology менялась — backfill может вести себя странно (новые tasks без upstream)
  • Best practice — не менять DAG structure агрессивно, или менять postponed

Manual deserialization

Можно работать с serialized DAG из своего кода:

from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils.session import provide_session

@provide_session
def get_dag_from_serialized(dag_id, session=None):
    serialized = session.query(SerializedDagModel).filter_by(dag_id=dag_id).first()
    if not serialized:
        return None
    return serialized.dag  # ← property десериализует JSON в DAG object

Полезно для tools / scripts, которые хотят inspect DAGs без import .py.


Production tuning

[core]
# min_serialized_dag_update_interval — frequency обновлений (default 30s)
min_serialized_dag_update_interval = 30

# min_serialized_dag_fetch_interval — frequency reads (default 10s)
min_serialized_dag_fetch_interval = 10

# Compress serialized data (2.7+):
compress_serialized_dags = True  # Save ~70% disk и DB IO

compress_serialized_dags = True (2.7+) — сериализация compressed с zstandard. Save DB space, faster read/write.


Проверка знанийKnowledge check
Вы изменили implementation Python функции (тело `def extract():`), задеплоили DAG, но scheduler не видит изменений. Почему?
ОтветAnswer
DAG Serialization записывает только **structure** DAG, не реальный код функций. В сериализации хранятся: task_id, task_type, python_callable_name (имя функции, не код), arguments, dependencies. Когда вы меняете implementation внутри `def extract():`, structure не меняется → dag_hash остаётся тем же → serialized_dag не обновляется → scheduler не делает re-import. Workers, однако, при execute() делают actual import .py файла и используют новый код. Так что код **сработает на новых runs**, но не отражается в UI до перезапуска webserver workers (DagBag cache). Force re-parse: `airflow dags reserialize` (2.7+) или touch .py файла.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Что попадает в serialized_dag.data (JSONB)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8