Metadata DB — схема и ключевые таблицы
Metadata database — сердце Airflow. Все state живёт здесь: какие DAGs существуют, что они делают, в каком state каждый run, какие connections, variables, XCom values. Понимание схемы — один из самых ценных navигационных skills для debugging production Airflow.
Этот урок — препарирование основных таблиц с реальными SQL запросами, которые вы будете писать ежедневно в production.
Архитектура схемы
В Airflow 2.x metadata DB имеет ~25 ключевых таблиц. Они делятся на пять групп:
Core DAG/Task state tables
dag — DAG metadata
SELECT dag_id, is_active, is_paused,
last_parsed_time, next_dagrun, next_dagrun_create_after,
max_active_runs, max_active_tasks, fileloc
FROM dag
WHERE is_active = true
ORDER BY last_parsed_time DESC;
Поля:
dag_id— primary keyis_active— DAG существует в DagBag (если файл удалён → false)is_paused— toggle через UIlast_parsed_time— когда DagFileProcessor парсил последний разnext_dagrun— когда логически следующий run должен бытьnext_dagrun_create_after— когда scheduler создаст этот runfileloc— путь к .py файлуmax_active_runs— concurrency limit на DAGdefault_view,description,timetable_summary— UI metadata
serialized_dag — DAG structure
DAG объект, сериализованный в JSON. Это то, что читает scheduler и UI — не parse .py заново.
SELECT dag_id, last_updated, dag_hash,
length(data::text) AS size_bytes
FROM serialized_dag
ORDER BY last_updated DESC;
data column — JSONB с полной структурой DAG: tasks, dependencies, schedule, default_args.
dag_run — DAG executions
Самая важная таблица для отладки runs.
SELECT dag_id, run_id, state, run_type,
execution_date, start_date, end_date,
(end_date - start_date) AS duration
FROM dag_run
WHERE dag_id = 'my_etl'
ORDER BY start_date DESC LIMIT 20;
Поля:
state: queued, running, success, failedrun_type: scheduled, manual, backfill, dataset_triggeredexecution_date(2.x), позже переименован вlogical_date(с 2.6 deprecated alias) — moment в time, который этот run представляетdata_interval_start/data_interval_end— реальные временные границы для DAGconf— JSON с user-provided context для manual triggers
task_instance — отдельные TI
Один row на каждое выполнение task в DagRun.
SELECT dag_id, task_id, run_id, state, try_number,
queued_dttm, start_date, end_date,
(end_date - start_date) AS duration,
map_index
FROM task_instance
WHERE dag_id = 'my_etl'
AND state IN ('failed', 'up_for_retry', 'running')
ORDER BY start_date DESC LIMIT 50;
Ключевые поля:
state: см. полный список в Module 04 lesson 04 (TI state machine)try_number— какая попытка (для retries)map_index— для Dynamic Task Mapping (-1 для обычных, 0+ для mapped TI)queued_dttm— когда scheduler перевёл в queuedstart_date/end_date— реальное выполнениеpool,pool_slots— какой pool занялpriority_weight,weight_rule— для priority sortingexecutor_config— JSON для KubernetesExecutor pod_override
xcom — task results
SELECT dag_id, task_id, run_id, key,
length(value) AS value_size,
timestamp
FROM xcom
WHERE dag_id = 'my_etl'
ORDER BY timestamp DESC LIMIT 20;
⚠️ value column — bytea (Postgres) / mediumblob (MySQL). Практический лимит ~48 KB — выше scheduler тормозит из-за раздутых select. Для больших данных — custom XCom backend (Module 06).
log — UI logs (не файлы task)
Это audit log действий в UI: who triggered/paused/cleared что. Не путать с file logs task instances (они на disk / S3).
SELECT dttm, dag_id, task_id, event, owner, extra
FROM log
WHERE event IN ('trigger', 'clear', 'paused', 'unpaused')
ORDER BY dttm DESC LIMIT 50;
Растёт быстро. Cleanup через airflow db clean --keep-last 30.
Configuration tables
connection — encrypted credentials
SELECT conn_id, conn_type, host, schema, login, port,
(CASE WHEN password IS NOT NULL THEN '[encrypted]' ELSE '' END) AS pwd_status,
(CASE WHEN extra IS NOT NULL THEN '[encrypted]' ELSE '' END) AS extra_status
FROM connection;
password и extra — Fernet-encrypted. Без Fernet key — невозможно decrypt.
variable — encrypted variables
SELECT key, length(val) AS value_size, is_encrypted, description
FROM variable
ORDER BY key;
slot_pool — concurrency pools
SELECT pool, slots, used_slots, queued_slots, scheduled_slots, description
FROM slot_pool
ORDER BY pool;
Это критичная таблица для scheduler — на ней row-level lock в critical section (Module 04). Default pool — "default_pool" со 128 slots.
import_error — DAG parse errors
SELECT filename, timestamp, stacktrace
FROM import_error
ORDER BY timestamp DESC;
Первая команда при debugging: «почему мой DAG не появился?». Также airflow dags list-import-errors использует эту таблицу.
Datasets tables (2.4+)
dataset
SELECT id, uri, created_at FROM dataset ORDER BY created_at DESC;
dataset_event — timeline updates
SELECT de.timestamp, d.uri AS dataset_uri,
de.source_dag_id, de.source_task_id, de.extra
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
ORDER BY de.timestamp DESC LIMIT 50;
dataset_dag_run_queue — pending triggers
SELECT dataset_id, target_dag_id, created_at
FROM dataset_dag_run_queue
ORDER BY created_at;
Это queue для DAGs, которые должны быть triggered когда все их necessary datasets обновлены. Scheduler опустошает это queue в Phase 1.
Triggers (deferrable operators)
SELECT id, classpath, kwargs, created_date, triggerer_id
FROM trigger
ORDER BY created_date DESC;
Each row — активный deferrable trigger. triggerer_id указывает на job_id triggerer-процесса, который его обслуживает. Когда trigger fires — row удаляется.
Jobs — heartbeat для HA
SELECT id, job_type, state, latest_heartbeat,
(now() - latest_heartbeat) AS heartbeat_lag
FROM job
WHERE state = 'running'
ORDER BY latest_heartbeat DESC;
job_type: SchedulerJob, TriggererJob, LocalTaskJob (per running task), BackfillJob.
Production query — мёртвые scheduler-ы:
SELECT * FROM job
WHERE job_type = 'SchedulerJob'
AND state = 'running'
AND latest_heartbeat < now() - interval '60 seconds';
Auth tables (FAB)
SELECT u.username, u.first_name, u.email, r.name AS role
FROM ab_user u
JOIN ab_user_role ur ON u.id = ur.user_id
JOIN ab_role r ON ur.role_id = r.id;
Tables: ab_user, ab_role, ab_permission_view, ab_permission_view_role — стандарт Flask-AppBuilder.
Critical indexes (2.7+)
Composite index, добавленный в 2.7, критичен для performance scheduler:
-- Существует с 2.7:
CREATE INDEX idx_ti_dag_run_task_map_index
ON task_instance(dag_id, run_id, task_id, map_index);
Без этого индекса scheduler tick может занимать секунды на больших installations (>10k TI/day).
Также критичные:
-- На dag_run:
CREATE INDEX idx_dag_run_state_dag_id ON dag_run(state, dag_id);
-- На log:
CREATE INDEX idx_log_dttm ON log(dttm);
Размер metadata DB в production
Типично без cleanup:
log— ~70% объёма (если DB log handler)task_instance— ~10-15%xcom— ~5-10% (если default DB backend)task_reschedule,dag_run,task_fail— ~5%
Shopify: ~200-500 GB после 28 дней retention (10k DAGs, 150k TI/day).
Cleanup стратегия
# Cron daily (2.5+):
airflow db clean --keep-last 28
# Или partman partitioning по дате:
SELECT partman.create_parent(
'public.log', 'dttm', 'native', 'daily'
);
Production-критичные SQL queries
1. Stuck queued tasks
SELECT dag_id, task_id, run_id, queued_dttm,
(now() - queued_dttm) AS queued_for
FROM task_instance
WHERE state = 'queued'
AND queued_dttm < now() - interval '10 minutes';
Если такие есть — executor heartbeat missed или broker problems.
2. Long-running tasks
SELECT dag_id, task_id, run_id, start_date,
(now() - start_date) AS running_for
FROM task_instance
WHERE state = 'running'
AND start_date < now() - interval '1 hour'
ORDER BY start_date;
3. Pool occupancy
SELECT pool, slots,
used_slots,
queued_slots,
(slots - used_slots) AS available
FROM slot_pool
ORDER BY (used_slots::float / slots) DESC;
Если used_slots = slots — pool exhausted.
4. Failed runs за последний день
SELECT dag_id, COUNT(*) AS failed_runs
FROM dag_run
WHERE state = 'failed'
AND end_date > now() - interval '24 hours'
GROUP BY dag_id
ORDER BY failed_runs DESC;
5. Top DAGs по run frequency
SELECT dag_id, COUNT(*) AS runs_last_day,
AVG(EXTRACT(epoch FROM (end_date - start_date))) AS avg_duration_sec
FROM dag_run
WHERE end_date > now() - interval '24 hours'
GROUP BY dag_id
ORDER BY runs_last_day DESC LIMIT 20;
Security: read-only user для production
Никогда не подключайтесь из приложений как airflow user — у него full access. Создайте read-only:
CREATE USER airflow_readonly WITH PASSWORD '<strong-password>';
GRANT CONNECT ON DATABASE airflow TO airflow_readonly;
GRANT USAGE ON SCHEMA public TO airflow_readonly;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO airflow_readonly;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO airflow_readonly;
Это безопасно для analytics queries, дашбордов, monitoring.