Learning Platform
Глоссарий Troubleshooting
Урок 02.05 · 22 мин
Продвинутый
Metadata DBPostgreSQLSchemaSQL

Metadata DB — схема и ключевые таблицы

Metadata database — сердце Airflow. Все state живёт здесь: какие DAGs существуют, что они делают, в каком state каждый run, какие connections, variables, XCom values. Понимание схемы — один из самых ценных navигационных skills для debugging production Airflow.

Этот урок — препарирование основных таблиц с реальными SQL запросами, которые вы будете писать ежедневно в production.


Архитектура схемы

В Airflow 2.x metadata DB имеет ~25 ключевых таблиц. Они делятся на пять групп:

Группы таблиц metadata DB
DAG statedag, serialized_dag, dag_run, task_instance, task_fail, task_reschedule, log, xcom — основной набор для tracking what runs.
Concurrency & resourcesslot_pool — pools, dag_concurrency_check, max_active_tasks. Critical section защищена row-level lock на slot_pool.
Configurationconnection (encrypted Fernet), variable (encrypted), import_error, dag_code, dag_pickle (legacy).
Datasets (2.4+) & Triggersdataset, dataset_event, dataset_dag_run_queue для data-aware scheduling. trigger для deferrable operators (asyncio queue). dag_schedule_dataset_reference, task_outlet_dataset_reference — связи.
Auth & jobsab_user, ab_role, ab_permission_view — FAB auth. job — heartbeat scheduler/triggerer/worker для HA detection.

Core DAG/Task state tables

dag — DAG metadata

SELECT dag_id, is_active, is_paused,
       last_parsed_time, next_dagrun, next_dagrun_create_after,
       max_active_runs, max_active_tasks, fileloc
FROM dag
WHERE is_active = true
ORDER BY last_parsed_time DESC;

Поля:

  • dag_id — primary key
  • is_active — DAG существует в DagBag (если файл удалён → false)
  • is_paused — toggle через UI
  • last_parsed_time — когда DagFileProcessor парсил последний раз
  • next_dagrun — когда логически следующий run должен быть
  • next_dagrun_create_after — когда scheduler создаст этот run
  • fileloc — путь к .py файлу
  • max_active_runs — concurrency limit на DAG
  • default_view, description, timetable_summary — UI metadata

serialized_dag — DAG structure

DAG объект, сериализованный в JSON. Это то, что читает scheduler и UI — не parse .py заново.

SELECT dag_id, last_updated, dag_hash,
       length(data::text) AS size_bytes
FROM serialized_dag
ORDER BY last_updated DESC;

data column — JSONB с полной структурой DAG: tasks, dependencies, schedule, default_args.

dag_run — DAG executions

Самая важная таблица для отладки runs.

SELECT dag_id, run_id, state, run_type,
       execution_date, start_date, end_date,
       (end_date - start_date) AS duration
FROM dag_run
WHERE dag_id = 'my_etl'
ORDER BY start_date DESC LIMIT 20;

Поля:

  • state: queued, running, success, failed
  • run_type: scheduled, manual, backfill, dataset_triggered
  • execution_date (2.x), позже переименован в logical_date (с 2.6 deprecated alias) — moment в time, который этот run представляет
  • data_interval_start / data_interval_end — реальные временные границы для DAG
  • conf — JSON с user-provided context для manual triggers

task_instance — отдельные TI

Один row на каждое выполнение task в DagRun.

SELECT dag_id, task_id, run_id, state, try_number,
       queued_dttm, start_date, end_date,
       (end_date - start_date) AS duration,
       map_index
FROM task_instance
WHERE dag_id = 'my_etl'
  AND state IN ('failed', 'up_for_retry', 'running')
ORDER BY start_date DESC LIMIT 50;

Ключевые поля:

  • state: см. полный список в Module 04 lesson 04 (TI state machine)
  • try_number — какая попытка (для retries)
  • map_index — для Dynamic Task Mapping (-1 для обычных, 0+ для mapped TI)
  • queued_dttm — когда scheduler перевёл в queued
  • start_date / end_date — реальное выполнение
  • pool, pool_slots — какой pool занял
  • priority_weight, weight_rule — для priority sorting
  • executor_config — JSON для KubernetesExecutor pod_override

xcom — task results

SELECT dag_id, task_id, run_id, key,
       length(value) AS value_size,
       timestamp
FROM xcom
WHERE dag_id = 'my_etl'
ORDER BY timestamp DESC LIMIT 20;

⚠️ value column — bytea (Postgres) / mediumblob (MySQL). Практический лимит ~48 KB — выше scheduler тормозит из-за раздутых select. Для больших данных — custom XCom backend (Module 06).

Parquet, Avro, ORC: форматы хранения для больших данных

log — UI logs (не файлы task)

Это audit log действий в UI: who triggered/paused/cleared что. Не путать с file logs task instances (они на disk / S3).

SELECT dttm, dag_id, task_id, event, owner, extra
FROM log
WHERE event IN ('trigger', 'clear', 'paused', 'unpaused')
ORDER BY dttm DESC LIMIT 50;

Растёт быстро. Cleanup через airflow db clean --keep-last 30.


Configuration tables

connection — encrypted credentials

SELECT conn_id, conn_type, host, schema, login, port,
       (CASE WHEN password IS NOT NULL THEN '[encrypted]' ELSE '' END) AS pwd_status,
       (CASE WHEN extra IS NOT NULL THEN '[encrypted]' ELSE '' END) AS extra_status
FROM connection;

password и extra — Fernet-encrypted. Без Fernet key — невозможно decrypt.

variable — encrypted variables

SELECT key, length(val) AS value_size, is_encrypted, description
FROM variable
ORDER BY key;

slot_pool — concurrency pools

SELECT pool, slots, used_slots, queued_slots, scheduled_slots, description
FROM slot_pool
ORDER BY pool;

Это критичная таблица для scheduler — на ней row-level lock в critical section (Module 04). Default pool — "default_pool" со 128 slots.

import_error — DAG parse errors

SELECT filename, timestamp, stacktrace
FROM import_error
ORDER BY timestamp DESC;

Первая команда при debugging: «почему мой DAG не появился?». Также airflow dags list-import-errors использует эту таблицу.


Datasets tables (2.4+)

dataset

SELECT id, uri, created_at FROM dataset ORDER BY created_at DESC;

dataset_event — timeline updates

SELECT de.timestamp, d.uri AS dataset_uri,
       de.source_dag_id, de.source_task_id, de.extra
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
ORDER BY de.timestamp DESC LIMIT 50;

dataset_dag_run_queue — pending triggers

SELECT dataset_id, target_dag_id, created_at
FROM dataset_dag_run_queue
ORDER BY created_at;

Это queue для DAGs, которые должны быть triggered когда все их necessary datasets обновлены. Scheduler опустошает это queue в Phase 1.


Triggers (deferrable operators)

SELECT id, classpath, kwargs, created_date, triggerer_id
FROM trigger
ORDER BY created_date DESC;

Each row — активный deferrable trigger. triggerer_id указывает на job_id triggerer-процесса, который его обслуживает. Когда trigger fires — row удаляется.


Jobs — heartbeat для HA

SELECT id, job_type, state, latest_heartbeat,
       (now() - latest_heartbeat) AS heartbeat_lag
FROM job
WHERE state = 'running'
ORDER BY latest_heartbeat DESC;

job_type: SchedulerJob, TriggererJob, LocalTaskJob (per running task), BackfillJob.

Production query — мёртвые scheduler-ы:

SELECT * FROM job
WHERE job_type = 'SchedulerJob'
  AND state = 'running'
  AND latest_heartbeat < now() - interval '60 seconds';

Auth tables (FAB)

SELECT u.username, u.first_name, u.email, r.name AS role
FROM ab_user u
JOIN ab_user_role ur ON u.id = ur.user_id
JOIN ab_role r ON ur.role_id = r.id;

Tables: ab_user, ab_role, ab_permission_view, ab_permission_view_role — стандарт Flask-AppBuilder.


Critical indexes (2.7+)

Composite index, добавленный в 2.7, критичен для performance scheduler:

-- Существует с 2.7:
CREATE INDEX idx_ti_dag_run_task_map_index
ON task_instance(dag_id, run_id, task_id, map_index);

Без этого индекса scheduler tick может занимать секунды на больших installations (>10k TI/day).

Также критичные:

-- На dag_run:
CREATE INDEX idx_dag_run_state_dag_id ON dag_run(state, dag_id);

-- На log:
CREATE INDEX idx_log_dttm ON log(dttm);

Размер metadata DB в production

Типично без cleanup:

  • log — ~70% объёма (если DB log handler)
  • task_instance — ~10-15%
  • xcom — ~5-10% (если default DB backend)
  • task_reschedule, dag_run, task_fail — ~5%

Shopify: ~200-500 GB после 28 дней retention (10k DAGs, 150k TI/day).

Cleanup стратегия

# Cron daily (2.5+):
airflow db clean --keep-last 28

# Или partman partitioning по дате:
SELECT partman.create_parent(
  'public.log', 'dttm', 'native', 'daily'
);

Production-критичные SQL queries

1. Stuck queued tasks

SELECT dag_id, task_id, run_id, queued_dttm,
       (now() - queued_dttm) AS queued_for
FROM task_instance
WHERE state = 'queued'
  AND queued_dttm < now() - interval '10 minutes';

Если такие есть — executor heartbeat missed или broker problems.

2. Long-running tasks

SELECT dag_id, task_id, run_id, start_date,
       (now() - start_date) AS running_for
FROM task_instance
WHERE state = 'running'
  AND start_date < now() - interval '1 hour'
ORDER BY start_date;

3. Pool occupancy

SELECT pool, slots,
       used_slots,
       queued_slots,
       (slots - used_slots) AS available
FROM slot_pool
ORDER BY (used_slots::float / slots) DESC;

Если used_slots = slots — pool exhausted.

4. Failed runs за последний день

SELECT dag_id, COUNT(*) AS failed_runs
FROM dag_run
WHERE state = 'failed'
  AND end_date > now() - interval '24 hours'
GROUP BY dag_id
ORDER BY failed_runs DESC;

5. Top DAGs по run frequency

SELECT dag_id, COUNT(*) AS runs_last_day,
       AVG(EXTRACT(epoch FROM (end_date - start_date))) AS avg_duration_sec
FROM dag_run
WHERE end_date > now() - interval '24 hours'
GROUP BY dag_id
ORDER BY runs_last_day DESC LIMIT 20;

Security: read-only user для production

Никогда не подключайтесь из приложений как airflow user — у него full access. Создайте read-only:

OLTP: транзакции и точечные операции
CREATE USER airflow_readonly WITH PASSWORD '<strong-password>';
GRANT CONNECT ON DATABASE airflow TO airflow_readonly;
GRANT USAGE ON SCHEMA public TO airflow_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO airflow_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT ON TABLES TO airflow_readonly;

Это безопасно для analytics queries, дашбордов, monitoring.


Проверка знанийKnowledge check
Production пользователь жалуется, что в UI Airflow медленно открывается список DagRuns. Какие SQL-запросы помогут диагностировать причину, и что чаще всего оказывается узким местом?
ОтветAnswer
Запросы для диагностики: (1) `SELECT pg_database_size('airflow');` — общий размер DB; (2) `SELECT relname, pg_size_pretty(pg_total_relation_size(oid)) FROM pg_class WHERE relkind='r' ORDER BY pg_total_relation_size(oid) DESC LIMIT 10;` — топ-10 таблиц; (3) `SELECT COUNT(*) FROM log;`, `task_instance`, `xcom`. Чаще всего узкое место — таблица `log` (если используется DB log handler — не рекомендуется в production), может занимать 70% объёма. Также `task_instance` без партиционирования становится bottleneck при >10M rows. Fix: (1) disable DB log handler — use S3/GCS/ES; (2) `airflow db clean --keep-last 28` daily; (3) pg_partman для partitioning log/xcom/task_instance по дате; (4) проверить наличие composite index `idx_ti_dag_run_task_map_index` (добавлен в 2.7); (5) VACUUM FULL weekly.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какая таблица в metadata DB обычно занимает наибольший % объёма в production?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6