Зачем мониторить data pipeline
В классической backend-разработке observability фокусируется на сервисе: latency запросов, error rate, CPU, memory. В data engineering нужна та же база, плюс ещё один пласт — состояние данных. Pipeline может технически работать (все задачи зелёные), но данные при этом протухли (последняя загрузка — три дня назад) или пришли неполные (вчера было 1M строк, сегодня 50K).
В DE без observability ты не узнаёшь о проблемах до того, как менеджер пишет “почему дашборд пустой”. С нормальным мониторингом — проблема видна на pipeline-уровне за секунды до того, как дойдёт до бизнеса.
Observability в DE стоит на трёх китах:
- Логи — что произошло в задаче (события, ошибки, контекст).
- Метрики — численные характеристики (сколько строк, сколько секунд, сколько ошибок).
- Lineage — какой dataset из чего получился, что от него зависит.
Логи
Логи — это записи о событиях в задаче. В DE логи обычно живут в оркестраторе (Airflow Task Logs), в облаке (CloudWatch, Stackdriver) или в централизованных системах (ELK, Loki, Datadog).
Что писать в логи DE-задач:
import logging
log = logging.getLogger(__name__)
def load_orders(ds):
log.info("start_load", extra={"event_date": ds, "source": "api.partner.com"})
rows = extract(ds)
log.info("extracted", extra={"event_date": ds, "row_count": len(rows)})
if len(rows) == 0:
log.warning("zero_rows", extra={"event_date": ds})
inserted = load_to_dwh(rows)
log.info("loaded", extra={"event_date": ds, "inserted": inserted, "skipped": len(rows) - inserted})
Ключевые принципы:
- Структурированные логи (JSON) — машинно-парсимые, можно фильтровать и агрегировать.
- Контекст — всегда добавляй
event_date,task_id,pipeline_name. Без контекста лог бесполезен. - Уровни —
infoдля нормальных событий,warningдля подозрительных,errorдля сбоев. Не злоупотребляйdebugв production.
Антипаттерн — print и logger.info("ok"). Без контекста и структуры лог в проде превращается в шум.
Метрики
Метрики — численные показатели, которые можно агрегировать и нарисовать на дашборде. Бывают двух типов: метрики pipeline (как работает задача) и метрики данных (что в данных).
Метрики pipeline
| Метрика | Что измеряет | Пример SLO |
|---|---|---|
| Latency | Время выполнения задачи | p95 меньше 30 минут |
| Throughput | Сколько строк/событий в секунду | более 10k rows/s |
| Error rate | Доля упавших запусков | меньше 1% за неделю |
| Success rate | Доля успешных запусков | более 99% за месяц |
Эти метрики собираются автоматически в Airflow (через метаданные), Prefect, Dagster. Дополнительно — экспортируются в Prometheus / Datadog / CloudWatch.
Метрики данных
Это уникальный для DE класс. Они описывают что в данных:
| Метрика | Что измеряет | Пример SLO |
|---|---|---|
| Freshness | Сколько времени прошло с последнего обновления | менее 1h для critical tables |
| Volume | Количество строк/байт за период | более 0.8 × moving average |
| Schema drift | Изменения в колонках/типах | 0 unexpected changes |
| Null rate | Доля NULL в required-полях | менее 1% |
| Distinct count | Кардинальность ключевых колонок | более 0.95 × expected |
Логи рассказывают что было, метрики показывают сколько, lineage показывает связи
События, ошибки
JSON + contextЛоги: события, ошибки, контекст. Хранятся в Airflow / CloudWatch / Loki / ELK. Структурированный JSON-формат + контекст task_id/event_date — обязательны
Численные показатели
Freshness, latency, volumeМетрики: численные показатели pipeline и данных. Freshness, latency, throughput, error rate, row count. Графики и алерты на превышение SLO
Граф зависимостей
OpenLineage стандартLineage: граф зависимостей dataset-ов. Какой stg_X из чего получился, на чём базируется mart_Y. OpenLineage — открытый стандарт, native в Airflow 2.7+, dbt, Spark
Как считать метрики данных
Простейший вариант — на каждом запуске SQL-запросом в DWH:
-- freshness
SELECT
table_name,
MAX(updated_at) AS last_update,
EXTRACT(EPOCH FROM (NOW() - MAX(updated_at))) AS seconds_since_update
FROM stats.table_metrics
GROUP BY table_name;
-- volume drop detection
SELECT
event_date,
COUNT(*) AS row_count,
COUNT(*) - LAG(COUNT(*)) OVER (ORDER BY event_date) AS delta
FROM orders
WHERE event_date >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY event_date
ORDER BY event_date;
Эти запросы регулярно (раз в час/день) выполняются мониторинговой задачей, результат пишется в metrics-table или экспортируется в Prometheus. Алерт срабатывает, если значение выходит за пределы.
Профессиональные инструменты для метрик данных: Monte Carlo, Great Expectations, Soda, Anomalo, dbt tests (для пайплайнов на dbt). Они автоматизируют сбор и анализ, добавляют ML-detection аномалий.
Lineage
Lineage (data lineage) — это граф связей между датасетами. Какие таблицы являются источниками для какой производной таблицы. Зачем это нужно:
- Impact analysis. “Если я изменю схему
stg_orders, какие отчёты сломаются?” — без lineage это поиск по всему коду. - Root cause analysis. “В отчёте за вчера выручка занижена. Где сломалось?” — lineage показывает upstream-таблицы и их freshness.
- Compliance. GDPR/HIPAA требуют знать, где живут персональные данные после трансформаций.
- Onboarding. Новый аналитик видит граф зависимостей вместо тысячи SQL-файлов.
OpenLineage — открытый стандарт
OpenLineage — это spec для описания событий lineage. Каждый job (task) при запуске и завершении эмитит событие с входами/выходами и meta:
{
"eventType": "COMPLETE",
"eventTime": "2026-05-17T03:14:22Z",
"run": {"runId": "abc-123", "facets": {...}},
"job": {"namespace": "etl", "name": "load_orders"},
"inputs": [
{"namespace": "api", "name": "partner.orders"}
],
"outputs": [
{"namespace": "dwh", "name": "stg_orders", "facets": {
"schema": {"fields": [{"name": "order_id", "type": "int64"}, ...]},
"outputStatistics": {"rowCount": 12345, "size": 5678901}
}}
]
}
События отправляются в бэкенд — типичные реализации: Marquez (open-source референс), Datahub, Atlan, OpenMetadata. Они собирают граф и показывают UI.
Поддержка OpenLineage встроена в:
- Airflow 2.7+ — native, через provider
apache-airflow-providers-openlineage. - dbt — через
dbt-olwrapper, события эмитятся из каждой модели. - Spark — через
openlineage-sparkJava agent. - Flink — через listener.
Lineage — это страховка на годы вперёд. Когда pipeline-у будет 100+ таблиц и команда вырастет, без lineage любое изменение схемы превратится в страшный квест. Начинать стоит рано — даже простой dbt-граф уже даёт 80% пользы.
Alerting
Метрики бесполезны, если о проблеме не узнают вовремя. Alerting — настроенные правила: если метрика вышла за SLO, кому и как сообщать.
Принципы хороших алертов:
- Actionable — алерт должен говорить, что сломано и что делать. “Pipeline failed” — плохой алерт. “DAG
daily_ordersfailed at taskload_to_dwh: ConnectionTimeoutError to api.partner.com — likely API outage” — хороший. - Right severity — page (звонок) только для critical, Slack для non-critical, email для FYI. Перегиб с pager -> alert fatigue.
- Routing — алерт идёт владельцу pipeline-а, не всему
data-team@. - Throttling — если задача упала 100 раз, не присылать 100 алертов. Group + dedup.
В Airflow стандартный путь — email_on_failure, on_failure_callback, SlackAPIPostOperator. Внешние системы — PagerDuty, OpsGenie, Datadog Monitors.
Полный observability-стек
Production-pipeline в 2026 году обычно выглядит так:
Сбор -> хранение -> визуализация -> alerting
Observability — это не “поставить инструмент”. Это дисциплина. Без культуры (писать логи с контекстом, ревьюить алерты раз в квартал, проверять SLO) даже самый дорогой стек бесполезен. На джуна-DE никто не ждёт, что ты построишь стек. Но ждут, что ты в своём коде пишешь правильные логи и видишь метрики своей задачи.
Попробуй сам
- Открой Airflow в любом open-source-репо. Найди задачу. Какие логи она пишет? Есть ли контекст (event_date, task_id)? Есть ли структура?
- Напиши простую функцию, которая в конце задачи вызывает SQL
SELECT COUNT(*) FROM targetи сравнивает с moving average за неделю. Если падение >20%, raise — это volume drop alert. - Посмотри документацию OpenLineage. Какие 3-5 facet-ов кажутся самыми полезными? Чем Marquez отличается от DataHub?
Recovery: что делать, когда pipeline упал
Observability без плана восстановления — только полдела. Когда алерт сработал, у команды должен быть готовый набор реакций. Базовый recovery-набор: (1) alert на владельца pipeline с runbook-ссылкой (что за задача, какая ошибка, куда смотреть); (2) automated retry policy — для transient ошибок (timeout, 429, 503) с exp backoff, чтобы не дёргать человека на каждый сетевой моргнул; (3) manual rerun для bugfix-сценариев — починил код, перезапустил задачу за нужные даты (тут опять выручает идемпотентность); (4) blast radius — нужно понимать, какие downstream-таблицы и дашборды задеты упавшей задачей (через lineage), чтобы коммуницировать stakeholders до того, как они увидят пустой график.
Полный набор pipeline-паттернов (backfill стратегии, incremental processing patterns, recovery playbook) разбираем в будущем advanced-de-patterns deep-dive.