Dataset internals — таблицы и SQL inspection
Datasets — это не магия. Под капотом — пять обычных PostgreSQL-таблиц и десяток SQL-запросов в scheduler-е, который раз в 5 секунд проверяет накопленные events. Этот урок — препарирование structure metadata DB, queries для debugging, и алгоритма scheduler-а для обработки dataset events.
Если в production видишь «consumer DAG не запустился, хотя producer эмитил dataset», единственный путь к истине — прямой SQL в metadata. Заучи эти таблицы.
Таблицы — обзор
| Таблица | Назначение | Кардинальность |
|---|---|---|
dataset | Registry всех известных datasets (URI → id) | ~ количество unique URIs |
dataset_event | Append-only log всех emissions | растёт линейно (требует cleanup) |
dataset_dag_run_queue | Pending events per consumer DAG | ephemeral, очищается при создании DagRun |
dag_schedule_dataset_reference | Subscription: какие DAG зависят от каких datasets | ~ DAGs × datasets |
task_outlet_dataset_reference | Declaration: какие tasks эмитят какие datasets | ~ tasks × datasets |
dataset_alias | Registry aliases (с 2.9+) | ~ unique alias names |
dataset_alias_dataset_reference | Mapping alias → concrete datasets | runtime-built |
OLTP — фундамент metadata DB Airflow
dataset table
Простой registry:
CREATE TABLE dataset (
id BIGSERIAL PRIMARY KEY,
uri VARCHAR(3000) NOT NULL,
extra JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
is_orphaned BOOLEAN NOT NULL DEFAULT FALSE,
CONSTRAINT dataset_uri_unique UNIQUE (uri)
);
uri— UNIQUE constraint. Один URI = один row.is_orphaned— TRUE, если ни один DAG больше не объявляет outlet/schedule с этим URI. Scheduler housekeeping job сам помечает orphans, но не удаляет — для исторических dataset_event references.extra— изначальная metadata, но обычно пустая. Реальная metadata вdataset_event.extra.
-- Сколько datasets зарегистрировано?
SELECT COUNT(*) FROM dataset WHERE NOT is_orphaned;
-- Когда впервые увиден данный URI?
SELECT uri, created_at FROM dataset
WHERE uri = 's3://warehouse/orders/processed/';
dataset_event — главная таблица
CREATE TABLE dataset_event (
id BIGSERIAL PRIMARY KEY,
dataset_id BIGINT NOT NULL REFERENCES dataset(id),
extra JSONB,
source_task_id VARCHAR(250),
source_dag_id VARCHAR(250),
source_run_id VARCHAR(250),
source_map_index INTEGER DEFAULT -1,
timestamp TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_dataset_event_dataset_id ON dataset_event (dataset_id, timestamp);
CREATE INDEX idx_dataset_event_source ON dataset_event (source_dag_id, source_task_id, source_run_id);
Каждая успешная task с outlets=[ds] создаёт один INSERT в эту таблицу. source_* колонки определяют кто эмитил event. extra хранит user metadata (через Metadata(ds, extra={...}) API).
Эта таблица растёт неограниченно. На large deployment (1000 DAGs, 50 datasets, 24 emissions/day each) — 1.2M rows/day. Через год — 400M rows. Нужен retention policy: airflow db clean --table dataset_event --clean-before-timestamp '30 days ago'.
-- Все events для конкретного dataset за последний час
SELECT
de.timestamp,
de.source_dag_id,
de.source_task_id,
de.source_run_id,
de.extra
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/'
AND de.timestamp > now() - interval '1 hour'
ORDER BY de.timestamp DESC;
-- Top datasets по emission frequency за день
SELECT
d.uri,
COUNT(*) AS emissions_24h,
MAX(de.timestamp) AS last_emission
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
WHERE de.timestamp > now() - interval '24 hours'
GROUP BY d.uri
ORDER BY emissions_24h DESC
LIMIT 20;
dataset_dag_run_queue — ephemeral queue
Самая интересная таблица — она изменяется на каждый scheduler tick:
CREATE TABLE dataset_dag_run_queue (
dataset_id BIGINT NOT NULL REFERENCES dataset(id),
target_dag_id VARCHAR(250) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (dataset_id, target_dag_id)
);
CREATE INDEX idx_ddrq_target_dag ON dataset_dag_run_queue (target_dag_id);
Когда producer эмитит dataset event, scheduler делает fan-out write:
INSERT INTO dataset_dag_run_queue (dataset_id, target_dag_id)
SELECT
:new_event_dataset_id,
sdr.dag_id
FROM dag_schedule_dataset_reference sdr
WHERE sdr.dataset_id = :new_event_dataset_id
ON CONFLICT (dataset_id, target_dag_id) DO NOTHING;
PRIMARY KEY уникальный — повторные events для одного dataset/consumer не дублируются. Это очередь “по крайней мере один event”, а не «количество events».
-- Сейчас в queue
SELECT
d.uri,
ddrq.target_dag_id,
ddrq.created_at,
now() - ddrq.created_at AS queue_age
FROM dataset_dag_run_queue ddrq
JOIN dataset d ON ddrq.dataset_id = d.id
ORDER BY ddrq.created_at;
-- Какие DAG ждут больше 10 минут?
-- Это часто признак zombie subscription
SELECT
ddrq.target_dag_id,
array_agg(d.uri) AS pending_datasets,
MIN(ddrq.created_at) AS oldest
FROM dataset_dag_run_queue ddrq
JOIN dataset d ON ddrq.dataset_id = d.id
WHERE ddrq.created_at < now() - interval '10 minutes'
GROUP BY ddrq.target_dag_id;
dag_schedule_dataset_reference — subscriptions
Какие DAG-и подписаны на какие datasets:
CREATE TABLE dag_schedule_dataset_reference (
dataset_id BIGINT NOT NULL REFERENCES dataset(id),
dag_id VARCHAR(250) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (dataset_id, dag_id)
);
Обновляется при DAG parse. Если кто-то изменил schedule=[ds_a] на schedule=[ds_b] — следующий parse сделает DELETE+INSERT.
-- Все DAG, ждущие конкретного dataset
SELECT sdr.dag_id, sdr.updated_at
FROM dag_schedule_dataset_reference sdr
JOIN dataset d ON sdr.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/';
-- Все datasets, на которые подписан DAG
SELECT d.uri
FROM dag_schedule_dataset_reference sdr
JOIN dataset d ON sdr.dataset_id = d.id
WHERE sdr.dag_id = 'orders_aggregator';
task_outlet_dataset_reference — producers
Кто эмитит данные:
CREATE TABLE task_outlet_dataset_reference (
dataset_id BIGINT NOT NULL REFERENCES dataset(id),
dag_id VARCHAR(250) NOT NULL,
task_id VARCHAR(250) NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (dataset_id, dag_id, task_id)
);
Это критично для lineage: какая task в каком DAG-е “owns” этот dataset. UI использует эту таблицу для рисования graph dependencies между DAG-ами.
-- Кто пишет в данный dataset?
SELECT tdr.dag_id, tdr.task_id
FROM task_outlet_dataset_reference tdr
JOIN dataset d ON tdr.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/';
-- Producers + consumers (для построения lineage граф)
SELECT
d.uri,
array_agg(DISTINCT tdr.dag_id || '.' || tdr.task_id) AS producers,
array_agg(DISTINCT sdr.dag_id) AS consumers
FROM dataset d
LEFT JOIN task_outlet_dataset_reference tdr ON tdr.dataset_id = d.id
LEFT JOIN dag_schedule_dataset_reference sdr ON sdr.dataset_id = d.id
GROUP BY d.uri
ORDER BY d.uri;
Scheduler dataset processing phase
Каждый scheduler tick (default 5s) включает фазу обработки datasets. Pseudo-code:
def _process_dataset_triggered_dags(self, session):
# Step 1: Найти DAG-и, чей schedule satisfied
# Это SQL: для каждого DAG в dag_schedule_dataset_reference
# проверить, что все datasets имеют row в dataset_dag_run_queue
candidates = (
session.query(DagModel)
.filter(DagModel.is_paused == False)
.filter(DagModel.has_dataset_schedule == True)
.all()
)
for dag in candidates:
if self._dataset_schedule_satisfied(dag, session):
# Step 2: Создать DagRun
self._create_dataset_dag_run(dag, session)
# Step 3: Очистить queue rows для этого DAG
session.query(DatasetDagRunQueue).filter(
DatasetDagRunQueue.target_dag_id == dag.dag_id
).delete()
_dataset_schedule_satisfied — это где обрабатываются boolean expressions (DatasetAll, DatasetAny). Для simple list (AND) — просто SQL:
-- Удовлетворены ли все schedule deps для DAG?
WITH required AS (
SELECT dataset_id FROM dag_schedule_dataset_reference
WHERE dag_id = 'my_consumer_dag'
),
satisfied AS (
SELECT dataset_id FROM dataset_dag_run_queue
WHERE target_dag_id = 'my_consumer_dag'
)
SELECT
(SELECT COUNT(*) FROM required) AS required_count,
(SELECT COUNT(*) FROM satisfied) AS satisfied_count,
(SELECT COUNT(*) FROM required) =
(SELECT COUNT(*) FROM satisfied) AS is_satisfied;
Lifecycle scheduling — детальная диаграмма
Debugging cookbook
”Consumer не запустился, хотя producer эмитил"
-- 1. Проверить, что event действительно был
SELECT de.*, d.uri
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/'
ORDER BY de.timestamp DESC LIMIT 5;
-- 2. Проверить subscription consumer-а
SELECT * FROM dag_schedule_dataset_reference
WHERE dag_id = 'orders_aggregator';
-- 3. Проверить queue
SELECT ddrq.*, d.uri
FROM dataset_dag_run_queue ddrq
JOIN dataset d ON ddrq.dataset_id = d.id
WHERE ddrq.target_dag_id = 'orders_aggregator';
-- 4. Проверить, не зависает ли DAG на max_active_runs
SELECT COUNT(*) FROM dag_run
WHERE dag_id = 'orders_aggregator' AND state = 'running';
"Очередь queue растёт, ничего не запускается”
Обычно симптом одного из:
- Consumer DAG paused (
is_paused = TRUEвdagtable) - max_active_runs_per_dag exceeded
- DAG имеет
DatasetAll(ds_a, ds_b), но эмитится толькоds_a(никогдаds_b)
-- Это «зависшая» подписка — DAG paused, но queue копит
SELECT dm.dag_id, dm.is_paused, COUNT(ddrq.*) AS queue_size
FROM dag dm
JOIN dataset_dag_run_queue ddrq ON ddrq.target_dag_id = dm.dag_id
WHERE dm.is_paused = TRUE
GROUP BY dm.dag_id, dm.is_paused;
“Слишком много dataset_event rows”
-- Распределение по dataset (топ-эмитеры)
SELECT
d.uri,
COUNT(*) AS event_count,
pg_size_pretty(SUM(pg_column_size(de.extra))) AS extra_size
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
GROUP BY d.uri
ORDER BY event_count DESC
LIMIT 10;
-- Cleanup старых events
DELETE FROM dataset_event
WHERE timestamp < now() - interval '30 days';
-- или через CLI: airflow db clean --table dataset_event ...
Performance: indexes и table bloat
dataset_event — самая «горячая» таблица в Datasets. Production-deployment рекомендации:
-
Partition by timestamp (PostgreSQL native partitioning):
CREATE TABLE dataset_event (...) PARTITION BY RANGE (timestamp); CREATE TABLE dataset_event_y2026m05 PARTITION OF dataset_event FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');Airflow не делает это автоматически, но manual partitioning поддерживается. Cleanup через
DROP PARTITIONнамного быстрее, чемDELETE. -
Регулярный VACUUM. После DELETE остаются dead tuples. PostgreSQL autovacuum обычно справляется, но для high-emission instances настройте более агрессивно:
ALTER TABLE dataset_event SET ( autovacuum_vacuum_scale_factor = 0.05, -- default 0.2 autovacuum_analyze_scale_factor = 0.02 ); -
Индексы — стандартные индексы (
dataset_id,source_dag_id) обычно достаточны. Если query patterns показывают slow ORDER BY timestamp DESC — добавьте(dataset_id, timestamp DESC).
Production gotchas
-
is_orphanedflag не auto-cleanup datasets. После удаления всех DAG-ов с outlets/schedule на dataset — он остаётся в table сis_orphaned=TRUE. Это нормально (для исторических references), но не путайтесь при count. -
dataset_event никогда не UPDATE, только INSERT/DELETE. Это immutable log. Если хотите correction (например, retroactively изменить extra metadata) — emit новый event с updated extra.
-
Queue NOT honoring max_active_runs strictly. Если max_active_runs=1 и DagRun running, scheduler удерживает queue rows (не удаляет). Когда running run завершается, все накопленные queue rows triggered ONE DagRun (не серия). Это design choice — не дублировать одинаковые dataset-triggered runs.
-
session_factoryне разделяет transactions для multi-statement updates. SchedulerJob делает all-or-nothing transactions. Если в середине fan-out write случается DB error — никаких partial queue rows не остаётся. -
Sentry/alerting on growing queue. Поставьте метрику:
count(dataset_dag_run_queue) WHERE created_at < now() - 1h. Если non-zero — есть stuck subscriptions. -
DAG rename ломает subscriptions. Если изменить
dag_id—dag_schedule_dataset_referenceзаписан под старым именем, scheduler перестанет триггерить. Решение: full DAG migration с миграционным скриптом или удалением старых references. -
Connection-level isolation. Если используете
READ COMMITTED(default Postgres) и делаете SQL inspection — между двумя SELECT может произойти scheduler commit. Для consistent snapshot используйтеREPEATABLE READ.