Learning Platform
Глоссарий Troubleshooting
Урок 18.03 · 25 мин
Начальный
observabilitymetricslogslineageopenlineagemonitoring

Зачем мониторить data pipeline

В классической backend-разработке observability фокусируется на сервисе: latency запросов, error rate, CPU, memory. В data engineering нужна та же база, плюс ещё один пласт — состояние данных. Pipeline может технически работать (все задачи зелёные), но данные при этом протухли (последняя загрузка — три дня назад) или пришли неполные (вчера было 1M строк, сегодня 50K).

В DE без observability ты не узнаёшь о проблемах до того, как менеджер пишет “почему дашборд пустой”. С нормальным мониторингом — проблема видна на pipeline-уровне за секунды до того, как дойдёт до бизнеса.

Observability в DE стоит на трёх китах:

  1. Логи — что произошло в задаче (события, ошибки, контекст).
  2. Метрики — численные характеристики (сколько строк, сколько секунд, сколько ошибок).
  3. Lineage — какой dataset из чего получился, что от него зависит.

Логи

Логи — это записи о событиях в задаче. В DE логи обычно живут в оркестраторе (Airflow Task Logs), в облаке (CloudWatch, Stackdriver) или в централизованных системах (ELK, Loki, Datadog).

Что писать в логи DE-задач:

import logging
log = logging.getLogger(__name__)

def load_orders(ds):
    log.info("start_load", extra={"event_date": ds, "source": "api.partner.com"})

    rows = extract(ds)
    log.info("extracted", extra={"event_date": ds, "row_count": len(rows)})

    if len(rows) == 0:
        log.warning("zero_rows", extra={"event_date": ds})

    inserted = load_to_dwh(rows)
    log.info("loaded", extra={"event_date": ds, "inserted": inserted, "skipped": len(rows) - inserted})

Ключевые принципы:

  • Структурированные логи (JSON) — машинно-парсимые, можно фильтровать и агрегировать.
  • Контекст — всегда добавляй event_date, task_id, pipeline_name. Без контекста лог бесполезен.
  • Уровниinfo для нормальных событий, warning для подозрительных, error для сбоев. Не злоупотребляй debug в production.

Антипаттерн — print и logger.info("ok"). Без контекста и структуры лог в проде превращается в шум.

Метрики

Метрики — численные показатели, которые можно агрегировать и нарисовать на дашборде. Бывают двух типов: метрики pipeline (как работает задача) и метрики данных (что в данных).

Метрики pipeline

МетрикаЧто измеряетПример SLO
LatencyВремя выполнения задачиp95 меньше 30 минут
ThroughputСколько строк/событий в секундуболее 10k rows/s
Error rateДоля упавших запусковменьше 1% за неделю
Success rateДоля успешных запусковболее 99% за месяц

Эти метрики собираются автоматически в Airflow (через метаданные), Prefect, Dagster. Дополнительно — экспортируются в Prometheus / Datadog / CloudWatch.

Метрики данных

Это уникальный для DE класс. Они описывают что в данных:

МетрикаЧто измеряетПример SLO
FreshnessСколько времени прошло с последнего обновленияменее 1h для critical tables
VolumeКоличество строк/байт за периодболее 0.8 × moving average
Schema driftИзменения в колонках/типах0 unexpected changes
Null rateДоля NULL в required-поляхменее 1%
Distinct countКардинальность ключевых колонокболее 0.95 × expected
Три кита observability в DE

Логи рассказывают что было, метрики показывают сколько, lineage показывает связи

Logs
События, ошибки
JSON + contextЛоги: события, ошибки, контекст. Хранятся в Airflow / CloudWatch / Loki / ELK. Структурированный JSON-формат + контекст task_id/event_date — обязательны
Metrics
Численные показатели
Freshness, latency, volumeМетрики: численные показатели pipeline и данных. Freshness, latency, throughput, error rate, row count. Графики и алерты на превышение SLO
Lineage
Граф зависимостей
OpenLineage стандартLineage: граф зависимостей dataset-ов. Какой stg_X из чего получился, на чём базируется mart_Y. OpenLineage — открытый стандарт, native в Airflow 2.7+, dbt, Spark

Как считать метрики данных

Простейший вариант — на каждом запуске SQL-запросом в DWH:

-- freshness
SELECT
  table_name,
  MAX(updated_at) AS last_update,
  EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) AS seconds_since_update
FROM stats.table_metrics
GROUP BY table_name;

-- volume drop detection
SELECT
  event_date,
  COUNT(*) AS row_count,
  COUNT(*) - LAG(COUNT(*)) OVER (ORDER BY event_date) AS delta
FROM orders
WHERE event_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY event_date
ORDER BY event_date;

Эти запросы регулярно (раз в час/день) выполняются мониторинговой задачей, результат пишется в metrics-table или экспортируется в Prometheus. Алерт срабатывает, если значение выходит за пределы.

Профессиональные инструменты для метрик данных: Monte Carlo, Great Expectations, Soda, Anomalo, dbt tests (для пайплайнов на dbt). Они автоматизируют сбор и анализ, добавляют ML-detection аномалий.

Lineage

Lineage (data lineage) — это граф связей между датасетами. Какие таблицы являются источниками для какой производной таблицы. Зачем это нужно:

  1. Impact analysis. “Если я изменю схему stg_orders, какие отчёты сломаются?” — без lineage это поиск по всему коду.
  2. Root cause analysis. “В отчёте за вчера выручка занижена. Где сломалось?” — lineage показывает upstream-таблицы и их freshness.
  3. Compliance. GDPR/HIPAA требуют знать, где живут персональные данные после трансформаций.
  4. Onboarding. Новый аналитик видит граф зависимостей вместо тысячи SQL-файлов.
dbt lineage: автоматический граф зависимостей моделей через dbt docs

OpenLineage — открытый стандарт

OpenLineage — это spec для описания событий lineage. Каждый job (task) при запуске и завершении эмитит событие с входами/выходами и meta:

{
  "eventType": "COMPLETE",
  "eventTime": "2026-05-17T03:14:22Z",
  "run": {"runId": "abc-123", "facets": {...}},
  "job": {"namespace": "etl", "name": "load_orders"},
  "inputs": [
    {"namespace": "api", "name": "partner.orders"}
  ],
  "outputs": [
    {"namespace": "dwh", "name": "stg_orders", "facets": {
      "schema": {"fields": [{"name": "order_id", "type": "int64"}, ...]},
      "outputStatistics": {"rowCount": 12345, "size": 5678901}
    }}
  ]
}

События отправляются в бэкенд — типичные реализации: Marquez (open-source референс), Datahub, Atlan, OpenMetadata. Они собирают граф и показывают UI.

Поддержка OpenLineage встроена в:

  • Airflow 2.7+ — native, через provider apache-airflow-providers-openlineage.
  • dbt — через dbt-ol wrapper, события эмитятся из каждой модели.
  • Spark — через openlineage-spark Java agent.
  • Flink — через listener.
TIP

Lineage — это страховка на годы вперёд. Когда pipeline-у будет 100+ таблиц и команда вырастет, без lineage любое изменение схемы превратится в страшный квест. Начинать стоит рано — даже простой dbt-граф уже даёт 80% пользы.

Alerting

Метрики бесполезны, если о проблеме не узнают вовремя. Alerting — настроенные правила: если метрика вышла за SLO, кому и как сообщать.

Принципы хороших алертов:

  1. Actionable — алерт должен говорить, что сломано и что делать. “Pipeline failed” — плохой алерт. “DAG daily_orders failed at task load_to_dwh: ConnectionTimeoutError to api.partner.com — likely API outage” — хороший.
  2. Right severity — page (звонок) только для critical, Slack для non-critical, email для FYI. Перегиб с pager -> alert fatigue.
  3. Routing — алерт идёт владельцу pipeline-а, не всему data-team@.
  4. Throttling — если задача упала 100 раз, не присылать 100 алертов. Group + dedup.

В Airflow стандартный путь — email_on_failure, on_failure_callback, SlackAPIPostOperator. Внешние системы — PagerDuty, OpsGenie, Datadog Monitors.

Airflow production: SLA callbacks, on_failure_callback и alerting-паттерны в деплое

Полный observability-стек

Production-pipeline в 2026 году обычно выглядит так:

Observability-стек production DE

Сбор -> хранение -> визуализация -> alerting

Источники сигналов: Airflow tasks (логи + метрики из метаданных), dbt models (через events.jsonl), Spark applications, OpenLineage agents
Сбор + хранение: Prometheus (метрики), Loki/ELK (логи), Marquez/Datahub (lineage). Cloud-варианты: CloudWatch, Stackdriver, Datadog
Визуализация: Grafana дашборды по метрикам, Kibana по логам, Marquez UI по lineage. Custom-портал поверх всего — для команды и менеджмента
Alerting: Alertmanager / Datadog Monitors / OpsGenie. Маршруты: critical -> PagerDuty звонок, warning -> Slack channel, info -> email digest
WARNING

Observability — это не “поставить инструмент”. Это дисциплина. Без культуры (писать логи с контекстом, ревьюить алерты раз в квартал, проверять SLO) даже самый дорогой стек бесполезен. На джуна-DE никто не ждёт, что ты построишь стек. Но ждут, что ты в своём коде пишешь правильные логи и видишь метрики своей задачи.

Попробуй сам

  1. Открой Airflow в любом open-source-репо. Найди задачу. Какие логи она пишет? Есть ли контекст (event_date, task_id)? Есть ли структура?
  2. Напиши простую функцию, которая в конце задачи вызывает SQL SELECT COUNT(*) FROM target и сравнивает с moving average за неделю. Если падение >20%, raise — это volume drop alert.
  3. Посмотри документацию OpenLineage. Какие 3-5 facet-ов кажутся самыми полезными? Чем Marquez отличается от DataHub?

Recovery: что делать, когда pipeline упал

Observability без плана восстановления — только полдела. Когда алерт сработал, у команды должен быть готовый набор реакций. Базовый recovery-набор: (1) alert на владельца pipeline с runbook-ссылкой (что за задача, какая ошибка, куда смотреть); (2) automated retry policy — для transient ошибок (timeout, 429, 503) с exp backoff, чтобы не дёргать человека на каждый сетевой моргнул; (3) manual rerun для bugfix-сценариев — починил код, перезапустил задачу за нужные даты (тут опять выручает идемпотентность); (4) blast radius — нужно понимать, какие downstream-таблицы и дашборды задеты упавшей задачей (через lineage), чтобы коммуницировать stakeholders до того, как они увидят пустой график.

Полный набор pipeline-паттернов (backfill стратегии, incremental processing patterns, recovery playbook) разбираем в будущем advanced-de-patterns deep-dive.

Проверка знанийKnowledge check
Pipeline ежедневно грузит таблицу orders в Snowflake. Все задачи Airflow зелёные. Однако аналитик утром говорит, что выручка в дашборде "не та" — что-то сломано в данных. Какие 4-5 метрик ты бы настроил, чтобы такая ситуация ловилась автоматически до того, как заметит человек?
ОтветAnswer
Метрики данных (а не pipeline): 1) Freshness — sec since last load (alert if > expected interval × 1.5); 2) Volume — row count vs moving average последних 7 дней (alert if < 0.8 × avg); 3) Null rate в ключевых required-полях, например amount (alert if > 1%); 4) Distinct count в orders.customer_id (alert if изменился > 30% — может быть partition issue); 5) Schema drift — список колонок vs expected (alert if появилась/исчезла колонка). Эти метрики ловят проблемы, которые pipeline сам не замечает: упавший партнёр прислал пустой файл, схема изменилась, фильтр пропустил часть данных. Это data quality observability — отдельный слой от pipeline observability.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. В Airflow все задачи зелёные, но аналитик утром жалуется что выручка в дашборде 'не та'. Какие метрики позволили бы поймать проблему до жалобы?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3