Learning Platform
Глоссарий Troubleshooting
Урок 09.04 · 28 мин
Продвинутый
Dataset InternalsMetadata DBSQLSchedulerDebugging

Dataset internals — таблицы и SQL inspection

Datasets — это не магия. Под капотом — пять обычных PostgreSQL-таблиц и десяток SQL-запросов в scheduler-е, который раз в 5 секунд проверяет накопленные events. Этот урок — препарирование structure metadata DB, queries для debugging, и алгоритма scheduler-а для обработки dataset events.

Если в production видишь «consumer DAG не запустился, хотя producer эмитил dataset», единственный путь к истине — прямой SQL в metadata. Заучи эти таблицы.


Таблицы — обзор

ТаблицаНазначениеКардинальность
datasetRegistry всех известных datasets (URI → id)~ количество unique URIs
dataset_eventAppend-only log всех emissionsрастёт линейно (требует cleanup)
dataset_dag_run_queuePending events per consumer DAGephemeral, очищается при создании DagRun
dag_schedule_dataset_referenceSubscription: какие DAG зависят от каких datasets~ DAGs × datasets
task_outlet_dataset_referenceDeclaration: какие tasks эмитят какие datasets~ tasks × datasets
dataset_aliasRegistry aliases (с 2.9+)~ unique alias names
dataset_alias_dataset_referenceMapping alias → concrete datasetsruntime-built

OLTP — фундамент metadata DB Airflow

dataset table

Простой registry:

CREATE TABLE dataset (
    id              BIGSERIAL PRIMARY KEY,
    uri             VARCHAR(3000) NOT NULL,
    extra           JSONB,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    is_orphaned     BOOLEAN NOT NULL DEFAULT FALSE,
    CONSTRAINT dataset_uri_unique UNIQUE (uri)
);
  • uri — UNIQUE constraint. Один URI = один row.
  • is_orphaned — TRUE, если ни один DAG больше не объявляет outlet/schedule с этим URI. Scheduler housekeeping job сам помечает orphans, но не удаляет — для исторических dataset_event references.
  • extra — изначальная metadata, но обычно пустая. Реальная metadata в dataset_event.extra.
-- Сколько datasets зарегистрировано?
SELECT COUNT(*) FROM dataset WHERE NOT is_orphaned;

-- Когда впервые увиден данный URI?
SELECT uri, created_at FROM dataset
WHERE uri = 's3://warehouse/orders/processed/';

dataset_event — главная таблица

CREATE TABLE dataset_event (
    id                  BIGSERIAL PRIMARY KEY,
    dataset_id          BIGINT NOT NULL REFERENCES dataset(id),
    extra               JSONB,
    source_task_id      VARCHAR(250),
    source_dag_id       VARCHAR(250),
    source_run_id       VARCHAR(250),
    source_map_index    INTEGER DEFAULT -1,
    timestamp           TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX idx_dataset_event_dataset_id ON dataset_event (dataset_id, timestamp);
CREATE INDEX idx_dataset_event_source ON dataset_event (source_dag_id, source_task_id, source_run_id);

Каждая успешная task с outlets=[ds] создаёт один INSERT в эту таблицу. source_* колонки определяют кто эмитил event. extra хранит user metadata (через Metadata(ds, extra={...}) API).

WARNING

Эта таблица растёт неограниченно. На large deployment (1000 DAGs, 50 datasets, 24 emissions/day each) — 1.2M rows/day. Через год — 400M rows. Нужен retention policy: airflow db clean --table dataset_event --clean-before-timestamp '30 days ago'.

-- Все events для конкретного dataset за последний час
SELECT
    de.timestamp,
    de.source_dag_id,
    de.source_task_id,
    de.source_run_id,
    de.extra
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/'
  AND de.timestamp > now() - interval '1 hour'
ORDER BY de.timestamp DESC;

-- Top datasets по emission frequency за день
SELECT
    d.uri,
    COUNT(*) AS emissions_24h,
    MAX(de.timestamp) AS last_emission
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
WHERE de.timestamp > now() - interval '24 hours'
GROUP BY d.uri
ORDER BY emissions_24h DESC
LIMIT 20;

dataset_dag_run_queue — ephemeral queue

Самая интересная таблица — она изменяется на каждый scheduler tick:

CREATE TABLE dataset_dag_run_queue (
    dataset_id      BIGINT NOT NULL REFERENCES dataset(id),
    target_dag_id   VARCHAR(250) NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (dataset_id, target_dag_id)
);

CREATE INDEX idx_ddrq_target_dag ON dataset_dag_run_queue (target_dag_id);

Когда producer эмитит dataset event, scheduler делает fan-out write:

INSERT INTO dataset_dag_run_queue (dataset_id, target_dag_id)
SELECT
    :new_event_dataset_id,
    sdr.dag_id
FROM dag_schedule_dataset_reference sdr
WHERE sdr.dataset_id = :new_event_dataset_id
ON CONFLICT (dataset_id, target_dag_id) DO NOTHING;

PRIMARY KEY уникальный — повторные events для одного dataset/consumer не дублируются. Это очередь “по крайней мере один event”, а не «количество events».

-- Сейчас в queue
SELECT
    d.uri,
    ddrq.target_dag_id,
    ddrq.created_at,
    now() - ddrq.created_at AS queue_age
FROM dataset_dag_run_queue ddrq
JOIN dataset d ON ddrq.dataset_id = d.id
ORDER BY ddrq.created_at;

-- Какие DAG ждут больше 10 минут?
-- Это часто признак zombie subscription
SELECT
    ddrq.target_dag_id,
    array_agg(d.uri) AS pending_datasets,
    MIN(ddrq.created_at) AS oldest
FROM dataset_dag_run_queue ddrq
JOIN dataset d ON ddrq.dataset_id = d.id
WHERE ddrq.created_at < now() - interval '10 minutes'
GROUP BY ddrq.target_dag_id;

dag_schedule_dataset_reference — subscriptions

Какие DAG-и подписаны на какие datasets:

CREATE TABLE dag_schedule_dataset_reference (
    dataset_id      BIGINT NOT NULL REFERENCES dataset(id),
    dag_id          VARCHAR(250) NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL,
    updated_at      TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (dataset_id, dag_id)
);

Обновляется при DAG parse. Если кто-то изменил schedule=[ds_a] на schedule=[ds_b] — следующий parse сделает DELETE+INSERT.

-- Все DAG, ждущие конкретного dataset
SELECT sdr.dag_id, sdr.updated_at
FROM dag_schedule_dataset_reference sdr
JOIN dataset d ON sdr.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/';

-- Все datasets, на которые подписан DAG
SELECT d.uri
FROM dag_schedule_dataset_reference sdr
JOIN dataset d ON sdr.dataset_id = d.id
WHERE sdr.dag_id = 'orders_aggregator';

task_outlet_dataset_reference — producers

Кто эмитит данные:

CREATE TABLE task_outlet_dataset_reference (
    dataset_id      BIGINT NOT NULL REFERENCES dataset(id),
    dag_id          VARCHAR(250) NOT NULL,
    task_id         VARCHAR(250) NOT NULL,
    created_at      TIMESTAMPTZ NOT NULL,
    updated_at      TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (dataset_id, dag_id, task_id)
);

Это критично для lineage: какая task в каком DAG-е “owns” этот dataset. UI использует эту таблицу для рисования graph dependencies между DAG-ами.

-- Кто пишет в данный dataset?
SELECT tdr.dag_id, tdr.task_id
FROM task_outlet_dataset_reference tdr
JOIN dataset d ON tdr.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/';

-- Producers + consumers (для построения lineage граф)
SELECT
    d.uri,
    array_agg(DISTINCT tdr.dag_id || '.' || tdr.task_id) AS producers,
    array_agg(DISTINCT sdr.dag_id) AS consumers
FROM dataset d
LEFT JOIN task_outlet_dataset_reference tdr ON tdr.dataset_id = d.id
LEFT JOIN dag_schedule_dataset_reference sdr ON sdr.dataset_id = d.id
GROUP BY d.uri
ORDER BY d.uri;

Scheduler dataset processing phase

Каждый scheduler tick (default 5s) включает фазу обработки datasets. Pseudo-code:

def _process_dataset_triggered_dags(self, session):
    # Step 1: Найти DAG-и, чей schedule satisfied
    # Это SQL: для каждого DAG в dag_schedule_dataset_reference
    # проверить, что все datasets имеют row в dataset_dag_run_queue
    candidates = (
        session.query(DagModel)
        .filter(DagModel.is_paused == False)
        .filter(DagModel.has_dataset_schedule == True)
        .all()
    )

    for dag in candidates:
        if self._dataset_schedule_satisfied(dag, session):
            # Step 2: Создать DagRun
            self._create_dataset_dag_run(dag, session)

            # Step 3: Очистить queue rows для этого DAG
            session.query(DatasetDagRunQueue).filter(
                DatasetDagRunQueue.target_dag_id == dag.dag_id
            ).delete()

_dataset_schedule_satisfied — это где обрабатываются boolean expressions (DatasetAll, DatasetAny). Для simple list (AND) — просто SQL:

-- Удовлетворены ли все schedule deps для DAG?
WITH required AS (
    SELECT dataset_id FROM dag_schedule_dataset_reference
    WHERE dag_id = 'my_consumer_dag'
),
satisfied AS (
    SELECT dataset_id FROM dataset_dag_run_queue
    WHERE target_dag_id = 'my_consumer_dag'
)
SELECT
    (SELECT COUNT(*) FROM required) AS required_count,
    (SELECT COUNT(*) FROM satisfied) AS satisfied_count,
    (SELECT COUNT(*) FROM required) =
    (SELECT COUNT(*) FROM satisfied) AS is_satisfied;

Lifecycle scheduling — детальная диаграмма

Dataset processing в scheduler main loop
Phase 1: Process new eventsScheduler читает dataset_event rows, созданные с последнего processed timestamp. Для каждого нового event делает fan-out INSERT в dataset_dag_run_queue для всех подписчиков. ON CONFLICT DO NOTHING — повторные events для одного consumer не дублируются.
Phase 2: Check satisfactionДля каждого DAG с dataset schedule: SELECT FROM dag_schedule_dataset_reference vs dataset_dag_run_queue. Если все required datasets имеют queue row — DAG ready to trigger. Поддерживает AND/OR через DatasetAll/DatasetAny expressions.
Phase 3: Create DagRunsДля satisfied DAGs: INSERT в dag_run с run_type='dataset_triggered', external_trigger=false, conf=metadata о triggering events. Учитывает max_active_runs_per_dag — если уже лимит достигнут, queue rows остаются для следующего tick.
Phase 4: Cleanup queueДля каждого созданного DagRun: DELETE FROM dataset_dag_run_queue WHERE target_dag_id = ?. Очистка происходит ВСЕХ rows для DAG-а (не только тех, что удовлетворили условие). Дальнейшие emissions начинают накапливать с нуля.

Debugging cookbook

”Consumer не запустился, хотя producer эмитил"

-- 1. Проверить, что event действительно был
SELECT de.*, d.uri
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
WHERE d.uri = 's3://warehouse/orders/processed/'
ORDER BY de.timestamp DESC LIMIT 5;

-- 2. Проверить subscription consumer-а
SELECT * FROM dag_schedule_dataset_reference
WHERE dag_id = 'orders_aggregator';

-- 3. Проверить queue
SELECT ddrq.*, d.uri
FROM dataset_dag_run_queue ddrq
JOIN dataset d ON ddrq.dataset_id = d.id
WHERE ddrq.target_dag_id = 'orders_aggregator';

-- 4. Проверить, не зависает ли DAG на max_active_runs
SELECT COUNT(*) FROM dag_run
WHERE dag_id = 'orders_aggregator' AND state = 'running';

"Очередь queue растёт, ничего не запускается”

Обычно симптом одного из:

  • Consumer DAG paused (is_paused = TRUE в dag table)
  • max_active_runs_per_dag exceeded
  • DAG имеет DatasetAll(ds_a, ds_b), но эмитится только ds_a (никогда ds_b)
-- Это «зависшая» подписка — DAG paused, но queue копит
SELECT dm.dag_id, dm.is_paused, COUNT(ddrq.*) AS queue_size
FROM dag dm
JOIN dataset_dag_run_queue ddrq ON ddrq.target_dag_id = dm.dag_id
WHERE dm.is_paused = TRUE
GROUP BY dm.dag_id, dm.is_paused;

“Слишком много dataset_event rows”

-- Распределение по dataset (топ-эмитеры)
SELECT
    d.uri,
    COUNT(*) AS event_count,
    pg_size_pretty(SUM(pg_column_size(de.extra))) AS extra_size
FROM dataset_event de
JOIN dataset d ON de.dataset_id = d.id
GROUP BY d.uri
ORDER BY event_count DESC
LIMIT 10;

-- Cleanup старых events
DELETE FROM dataset_event
WHERE timestamp < now() - interval '30 days';
-- или через CLI: airflow db clean --table dataset_event ...

Performance: indexes и table bloat

dataset_event — самая «горячая» таблица в Datasets. Production-deployment рекомендации:

  1. Partition by timestamp (PostgreSQL native partitioning):

    CREATE TABLE dataset_event (...) PARTITION BY RANGE (timestamp);
    CREATE TABLE dataset_event_y2026m05 PARTITION OF dataset_event
        FOR VALUES FROM ('2026-05-01') TO ('2026-06-01');

    Airflow не делает это автоматически, но manual partitioning поддерживается. Cleanup через DROP PARTITION намного быстрее, чем DELETE.

  2. Регулярный VACUUM. После DELETE остаются dead tuples. PostgreSQL autovacuum обычно справляется, но для high-emission instances настройте более агрессивно:

    ALTER TABLE dataset_event SET (
        autovacuum_vacuum_scale_factor = 0.05,  -- default 0.2
        autovacuum_analyze_scale_factor = 0.02
    );
  3. Индексы — стандартные индексы (dataset_id, source_dag_id) обычно достаточны. Если query patterns показывают slow ORDER BY timestamp DESC — добавьте (dataset_id, timestamp DESC).


Production gotchas

  1. is_orphaned flag не auto-cleanup datasets. После удаления всех DAG-ов с outlets/schedule на dataset — он остаётся в table с is_orphaned=TRUE. Это нормально (для исторических references), но не путайтесь при count.

  2. dataset_event никогда не UPDATE, только INSERT/DELETE. Это immutable log. Если хотите correction (например, retroactively изменить extra metadata) — emit новый event с updated extra.

  3. Queue NOT honoring max_active_runs strictly. Если max_active_runs=1 и DagRun running, scheduler удерживает queue rows (не удаляет). Когда running run завершается, все накопленные queue rows triggered ONE DagRun (не серия). Это design choice — не дублировать одинаковые dataset-triggered runs.

  4. session_factory не разделяет transactions для multi-statement updates. SchedulerJob делает all-or-nothing transactions. Если в середине fan-out write случается DB error — никаких partial queue rows не остаётся.

  5. Sentry/alerting on growing queue. Поставьте метрику: count(dataset_dag_run_queue) WHERE created_at &lt; now() - 1h. Если non-zero — есть stuck subscriptions.

  6. DAG rename ломает subscriptions. Если изменить dag_iddag_schedule_dataset_reference записан под старым именем, scheduler перестанет триггерить. Решение: full DAG migration с миграционным скриптом или удалением старых references.

  7. Connection-level isolation. Если используете READ COMMITTED (default Postgres) и делаете SQL inspection — между двумя SELECT может произойти scheduler commit. Для consistent snapshot используйте REPEATABLE READ.


Проверка знанийKnowledge check
В production Airflow vidишь: dataset_dag_run_queue имеет 50 rows для consumer DAG за последние 2 часа. Но в dag_run этот consumer не запускался ни разу. Какие гипотезы и как валидировать?
ОтветAnswer
Возможные причины (с SQL для проверки): (1) Consumer paused — SELECT is_paused FROM dag WHERE dag_id=?. Решение: unpause. (2) max_active_runs_per_dag exceeded — SELECT COUNT(*) FROM dag_run WHERE dag_id=? AND state='running'. Если >=max — scheduler не создаёт новый run пока running не завершится. (3) Не все required datasets имеют queue rows (DatasetAll satisfaction not met) — JOIN dag_schedule_dataset_reference vs dataset_dag_run_queue, посчитать required vs satisfied count. (4) Scheduler health — SELECT latest_heartbeat FROM job WHERE job_type='SchedulerJob'. Если stale — scheduler не processing. (5) DAG не загружается parse-time error — проверить airflow logs DAG processor. (6) В 2.10.x был known bug с queue accumulation при race condition между scheduler restart и event emission — workaround: airflow db clean queue table, restart scheduler.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Producer эмитил dataset_event 50 раз за 2 часа, consumer max_active_runs=1. Что в dataset_dag_run_queue?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6