Learning Platform
Глоссарий Troubleshooting
Урок 14.05 · 25 мин
Средний
Source freshnessProduction jobsdbt retryFail-fast

Source freshness как gate в production jobs

В production dbt-jobs самая частая причина «грязных» данных — stale sources. Fivetran завис, Stripe API упал, кто-то отключил cron, и в warehouse продолжают лежать данные за вчера. dbt build проходит зелёным, но dashboard показывает старые цифры.

dbt source freshness решает эту проблему: проверка свежести source-таблиц до запуска моделей. Если данные устарели — фейлим job сразу, не пересчитываем модели на устаревших данных. Это fail-fast gate.

Airflow: Dataset freshness — координация между pipeline и consumers

В этом уроке — как настроить freshness, как использовать как gate, и как делать retry-on-failure (через dbt retry).


Зачем freshness gate

Без gate типичный сценарий:

03:00 — Fivetran должен загрузить orders из Stripe. Не загрузил (API timeout).
05:00 — Cron запустил `dbt build`. Sources те же что вчера, но dbt не знает.
05:30 — dbt build завершился [x]. fct_orders, fct_revenue построены на старых данных.
06:00 — Pipeline отправил email "all green".
09:00 — Бизнес смотрит dashboards. Выручка за вчера = 0 (новые orders отсутствуют).
09:15 — Паника. Звонят data engineer.

С freshness gate:

03:00 — Fivetran должен загрузить. Не загрузил.
05:00 — Cron запустил dbt source freshness.
05:01 — freshness fail: stripe.orders 30 hours old (threshold 24 hours).
05:01 — Pipeline сразу fail. dbt build НЕ запустился. Алерт в Slack.
05:02 — On-call инженер видит алерт, проверяет Fivetran, перезапускает.
06:00 — Sources свежие, pipeline retry, все модели построены на свежих данных.
09:00 — Бизнес видит правильные числа.

Принцип — проверяй данные на входе, а не на выходе.


freshness декларация в _sources.yml

# models/_sources.yml
sources:
  - name: stripe
    database: raw
    schema: stripe_raw
    loaded_at_field: _loaded_at
    freshness:
      warn_after:
        count: 12
        period: hour
      error_after:
        count: 24
        period: hour
    tables:
      - name: orders
        freshness:
          warn_after:
            count: 6
            period: hour
          error_after:
            count: 12
            period: hour
      - name: customers
        # Использует source-level freshness (24h error)

Разбор:

ПолеЗначение
loaded_at_fieldКолонка с timestamp последнего обновления строки. dbt смотрит на MAX этой колонки.
freshness.warn_after.count/periodЕсли данные старше этого — WARN.
freshness.error_after.count/periodЕсли старше — ERROR (fail).

Можно задавать на уровне source (default для всех таблиц) и переопределять на уровне table (более строго для критичных).

В примере: stripe.orders — должны быть свежее 12 часов (12h error). stripe.customers — fallback на source level (24h error).

loaded_at_field

Это критичное поле. dbt вычислит:

SELECT MAX(_loaded_at) FROM stripe_raw.orders

И сравнит с CURRENT_TIMESTAMP - 12 hours.

Если в source нет колонки с временем загрузки (например, immutable dump):

  • Можно не указать loaded_at_field — но тогда freshness не работает.
  • Можно использовать псевдоколонку (_dbt_loaded_at = CURRENT_TIMESTAMP через Fivetran metadata).
  • Можно явно отключить: freshness: null.

Команда dbt source freshness

dbt source freshness

Проверяет все sources с freshness блоком. Результаты:

Sources freshness check:
  PASS - stripe.orders (max _loaded_at: 2026-05-19 04:00:00, 4 hours ago)
  WARN - salesforce.accounts (max _loaded_at: 2026-05-18 21:00:00, 12 hours ago)
  FAIL - shopify.products (max _loaded_at: 2026-05-17 12:00:00, 47 hours ago)

Exit code: 1 (have errors)

Также генерируется target/sources.json — JSON-отчёт со всеми результатами. Полезен для downstream tooling (dashboards, alerts).

Селекторы

Проверить только конкретные:

# Один источник
dbt source freshness --select source:stripe

# Несколько
dbt source freshness --select source:stripe source:salesforce

# Конкретная таблица
dbt source freshness --select source:stripe.orders

В CI/cron используют --select для критичных sources — те которые обязаны быть свежими для production reporting.


Использование как gate в workflow

# .github/workflows/dbt-prod.yml
name: dbt Production

on:
  schedule:
    - cron: '0 5 * * *'
  workflow_dispatch:

jobs:
  freshness-gate:
    runs-on: ubuntu-latest
    outputs:
      should-proceed: ${'{{'} steps.freshness.outputs.proceed {'}}'}

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: '3.12'
      - run: pip install dbt-snowflake==1.10.0
      - run: dbt deps

      - name: Check source freshness
        id: freshness
        run: |
          dbt source freshness --target prod || {
            echo "Source freshness failed — sending alert and blocking pipeline"
            echo "proceed=false" >> $GITHUB_OUTPUT
            exit 1
          }
          echo "proceed=true" >> $GITHUB_OUTPUT
        env:
          SNOWFLAKE_PASSWORD: ${'{{'} secrets.SNOWFLAKE_PASSWORD {'}}'}

      - name: Notify on freshness fail
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          channel-id: ${'{{'} secrets.SLACK_DATA_CHANNEL {'}}'}
          slack-message: "ALERT: Source freshness check failed in prod. dbt build skipped."
        env:
          SLACK_BOT_TOKEN: ${'{{'} secrets.SLACK_BOT_TOKEN {'}}'}

  dbt-build:
    needs: freshness-gate
    if: needs.freshness-gate.outputs.should-proceed == 'true'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install dbt-snowflake==1.10.0
      - run: dbt deps
      - run: dbt build --target prod
        env:
          SNOWFLAKE_PASSWORD: ${'{{'} secrets.SNOWFLAKE_PASSWORD {'}}'}

Ключевая часть — if: needs.freshness-gate.outputs.should-proceed == 'true'. Если freshness не прошёл, dbt-build job не запустится. И всё это раннее. Без compute расходов на бесполезный build.


Severity: warn vs error

freshness:
  warn_after: {count: 6, period: hour}    # Soft warning
  error_after: {count: 24, period: hour}  # Hard fail
  • warndbt source freshness показывает WARN, но exit code = 0. Pipeline продолжает.
  • error — exit code = 1, pipeline останавливается.

Реалистичная стратегия:

  • warn at typical SLA — например, 1.5x normal load time. Если ETL обычно занимает 4 часа, warn после 6h. Это signal: «что-то медленнее обычного, посмотреть».
  • error at hard SLA — где данные становятся непригодны для бизнеса. Например 24h — данные старше суток непригодны для daily reporting.

В _sources.yml:

freshness:
  warn_after: {count: 6, period: hour}
  error_after: {count: 24, period: hour}

Можно использовать filter если данные с историей:

loaded_at_field: created_at
freshness:
  filter: created_at > current_date - interval '7 days'
  error_after: {count: 24, period: hour}

filter ограничивает что dbt считает «свежим». Если в источнике 5 лет данных, без фильтра MAX(created_at) — для свежих записей. С фильтром — только последние 7 дней, эффективнее.


Selective freshness — не все sources одинаково критичны

sources:
  raw.stripe.* — critical (revenue depends)
  raw.salesforce.* — important (sales depends, refresh daily)
  raw.archive.* — not critical (historical, можно без freshness)

В YAML:

sources:
  - name: stripe
    freshness:
      error_after: {count: 24, period: hour}
    tables:
      - name: orders
      - name: customers

  - name: salesforce
    freshness:
      warn_after: {count: 24, period: hour}
      error_after: {count: 48, period: hour}
    tables:
      - name: accounts

  - name: archive_data
    freshness: null              # disable freshness check
    tables:
      - name: legacy_data_2020

freshness: null — отключает проверку. Полезно для immutable historical sources.


dbt retry — retry-on-failure

После dbt build чего-то прошло, что-то нет. Можно перезапустить только упавшие модели:

# Первый запуск
dbt build --target prod
# Падает на fct_revenue с error (warehouse timeout)

# Retry — только то что упало (и все downstream)
dbt retry --target prod

dbt retry смотрит на target/run_results.json от последнего run, находит модели со статусом error, и перезапускает их вместе с downstream.

Это критично для transient errors (warehouse hiccup, network issue) которые повторяются раз в 1000 запусков. Без retry — приходится либо перезапускать весь build (дорого), либо вручную селектить упавшие модели.

Retry в CI

- name: dbt build with retry
  run: |
    dbt build --target prod || (
      echo "First run failed, retrying once"
      sleep 60
      dbt retry --target prod
    )

Один retry с задержкой 60 секунд покрывает большинство transient errors. Если и retry упал — это реальная проблема, нужно человеку посмотреть.

WARNING

Retry — для transient errors, не для логических. Если SQL в модели неправильный, retry поможет ровно ничему. Бесконечный retry-loop = маскировка реальных проблем. Один retry — норма, больше — анти-паттерн.


Конкретные periods и SLA

Sourceperioderror_afterwarn_afterКомментарий
Real-time ordersminute10 minutes5 minutesStrict SLA, alerting в Slack/PagerDuty.
Daily customershour24 hours12 hoursDaily batch, должно быть свежее сегодня.
CRM accountshour48 hours24 hoursСлабый SLA.
Historical eventsday7 days3 daysНе критично.
Reference dataday30 days14 daysСтабильные dimension таблицы.

Эти SLA — отражение business needs, не технических. Узнать у consumers (BI users, analysts): «что критично иметь свежее?».


Source freshness + dashboards

Результаты dbt source freshness сохраняются в sources.json. Можно прокидывать в:

  • dbt docsdbt docs generate подхватывает sources.json, в Explorer видна свежесть.
  • Datadog/Grafana — экспорт через парсинг sources.json.
  • dbt Cloud Explorer — нативная интеграция.
  • Custom dashboard — Snowflake таблица _dbt_source_freshness_log, обновляется через on-run-end hook.

Пример макроса для логирования:

-- macros/log_freshness.sql
{% macro log_freshness() %}
  {% if execute %}
    {% set sources = graph.sources.values() | list %}
    -- INSERT INTO snapshots.source_freshness_log ...
  {% endif %}
{% endmacro %}

В dbt_project.yml:

on-run-end:
  - "{'{{ log_freshness() }}'}"

Каждый run пишется в табличку — historical track.


Best practice: split production jobs

В реальном production не один большой dbt build, а несколько jobs:

01:00 — source_freshness_job
   ↓ (если pass)
02:00 — staging_job (dbt build --select staging.*)

03:00 — marts_job (dbt build --select marts.*)

04:00 — exposure_refresh_job (notify BI tools)

Каждый job — отдельный workflow. Они зависят последовательно. Если freshness падает — staging/marts не запускаются. Если staging падает — marts не запускается.

Это defense in depth и fail-fast. Не пересчитываем marts на сломанном staging.


Попробуй сам

В вашем dbt-проекте:

  1. Добавьте freshness в models/_sources.yml:
sources:
  - name: jaffle_shop
    loaded_at_field: _loaded_at
    freshness:
      warn_after: {count: 12, period: hour}
      error_after: {count: 24, period: hour}
    tables:
      - name: orders
      - name: customers
  1. Если у вас в source нет _loaded_at — добавьте его через seed или mock:
-- В DuckDB через инлайн
CREATE TABLE main.orders AS
SELECT *, CURRENT_TIMESTAMP - INTERVAL '30 hours' AS _loaded_at
FROM main.orders
  1. Запустите:
dbt source freshness

Получите WARN/ERROR (зависит от вашего mock).

  1. Сделайте данные «свежими» (обновите _loaded_at в одной из таблиц), снова запустите. Увидите PASS.

  2. Посмотрите target/sources.json — JSON отчёт со всеми результатами.

Бонус: добавьте filter: created_at > current_date - interval '7 days' в freshness блок. Посмотрите как меняется поведение.


Ключевые выводы

  1. Source freshness — gate в production pipelines. Проверяет что raw данные не устарели. Останавливает build до моделей, не после.
  2. Конфиг в _sources.yml: loaded_at_field (колонка с timestamp), freshness.warn_after/error_after (пороги).
  3. Команда dbt source freshness: проверяет, выводит PASS/WARN/FAIL, exit code 1 на error. Селекторы для конкретных sources (--select source:stripe).
  4. CI gate: jobs с needs: и if: — если freshness fail, downstream jobs не запускаются. Экономия compute + раннее обнаружение проблемы.
  5. warn vs error: warn для soft SLA (что-то медленнее обычного), error для hard SLA (данные непригодны). Реалистичные значения 1.5x typical для warn, 24h для error на daily batch.
  6. freshness: null — отключить проверку (для historical/immutable sources).
  7. dbt retry: перезапуск только упавших моделей через run_results.json. Полезен для transient errors. Один retry — норма, бесконечный — анти-паттерн.
  8. Best practice: разделить production на jobs (freshness -> staging -> marts -> exposures). Каждый зависит от предыдущего через needs:.
Проверка знанийKnowledge check
Команда настроила freshness на 24h error для stripe.orders. Каждый день в 6:00 запускается прод-pipeline. В понедельник freshness fail в 6:00 — данные за выходные не загрузились. В команде паника, потому что 20 моделей не обновились. Как структурно решить проблему?
ОтветAnswer
Реакция на freshness failure требует продуманного процесса, не паники. **Структурные решения:**\n\n**1. Alerting в Slack/PagerDuty сразу при fail.**\n\n```yaml\n- name: Source freshness\n id: freshness\n run: dbt source freshness --target prod\n\n- name: Alert on fail\n if: failure()\n uses: slackapi/slack-github-action@v1\n with:\n slack-message: |\n ALERT: Source freshness FAIL\n Build: ${{ github.run_id }}\n Details: <link to logs>\n Action: On-call инженер проверь Fivetran/CDC.\n```\n\nКоманда узнаёт о проблеме за 1-2 минуты, не утром через бизнес.\n\n**2. Runbook для on-call.**\n\nДокумент 'что делать когда падает freshness':\n- Проверить Fivetran status (UI или API).\n- Если Fivetran upalivaetся — manually trigger Fivetran sync.\n- Когда данные приехали — manual rerun pipeline.\n- Если Fivetran ОК — проверить source-системы (Stripe API status page).\n\nЭто SOP, не каждый раз думать заново.\n\n**3. Retry-on-failure через cron.**\n\nЕсли freshness fail транзиентный (Fivetran запоздал на час) — retry через час часто решает:\n\n```yaml\non:\n schedule:\n - cron: '0 6 * * *' # primary 6am\n - cron: '0 7 * * *' # retry 7am\n - cron: '0 8 * * *' # final retry 8am\n\njobs:\n build:\n runs-on: ubuntu-latest\n steps:\n - run: |\n dbt source freshness --target prod || exit 1\n dbt build --target prod\n```\n\n3 попытки. Если все три fail — реальная проблема, остаётся вмешательство.\n\n**4. Graceful fallback: partial build.**\n\nЕсли застрял ОДИН источник из 10 — не блочить всё. Селективно сборка только моделей которые НЕ зависят от broken source:\n\n```yaml\n- name: Identify broken sources\n id: broken\n run: |\n dbt source freshness --target prod > freshness.log 2>&1 || true\n BROKEN=$(parse_broken_sources freshness.log)\n echo "broken=$BROKEN" >> $GITHUB_OUTPUT\n\n- name: Partial build (exclude broken)\n run: dbt build --target prod --exclude source:${{ steps.broken.outputs.broken }}+\n```\n\nЭто **продвинуто** — частичный build чтобы 90% моделей обновились. Но требует кастомного scripting.\n\n**5. Time-window awareness.**\n\nFreshness threshold должен учитывать **business hours**:\n\n- 24h не учитывает выходные. После пятницы 18:00 до понедельника 9:00 — 63 часа.\n- Если бизнес работает только Mon-Fri — увеличить threshold для выходных, или disable freshness на weekends.\n\nВ dbt 1.10+ можно через variables и custom freshness.\n\n**6. SLA-based escalation.**\n\n- Freshness fail менее 2h: low priority, авто-retry.\n- Freshness fail 2-6h: medium, ping в Slack.\n- Freshness fail > 6h: high, PagerDuty + manager escalation.\n\nЭто **operational maturity** — реакция пропорциональна severity, не паника на любой fail.\n\n**Главный урок**: freshness gate — это не страшилка для команды, а контролируемый сигнал. Хорошо настроенный гейт + alerting + runbook + retry = команда не паникует, реагирует системно.
Проверка знанийKnowledge check
Senior настроил freshness gate ПЕРЕД dbt build в production cron. Junior спрашивает: 'А почему бы не делать dbt build сразу — если данные не свежие, тесты упадут на not_null и т.д., узнаем тогда же'. Что объяснить junior?
ОтветAnswer
Аргументы за **freshness gate ДО** build (а не после):\n\n**1. Fail-fast principle.**\n\nЕсли source устарел, нет смысла пересчитывать 200 моделей. Это compute waste:\n- На Snowflake — кредиты warehouse.\n- На BigQuery — TB scanned.\n- Время — 30 минут впустую.\n\nFreshness check занимает 5 секунд. Если fail — экономим 30 минут компьюта.\n\n**2. Чище error.**\n\nЕсли запустим build на устаревших данных, мы получим caskaded ошибки:\n- not_null fail на колонке которая обычно non-null (потому что новые строки не приехали).\n- relationships fail (orders ссылаются на customers которых ещё нет).\n- unique fail (incremental переписал старую версию строки).\n\nЭти ошибки **симптомы**, а не корневая причина. Команда тратит час на разбор почему 5 разных тестов упали, а реально — просто Fivetran завис.\n\nС freshness gate — один понятный error 'shopify.orders is 30h old, expected менее 24h'. Сразу понятно куда смотреть.\n\n**3. Защита downstream consumers.**\n\nЕсли build технически прошёл (тесты warn-level, не error-level), но данные устарели — таблицы обновлены, но в них старые числа. Dashboards показывают вчерашние числа как сегодняшние. Бизнес делает решения на устаревших данных.\n\nFreshness gate **блокирует обновление таблиц** при stale sources. Старые таблицы лежат — лучше старые-но-старые числа (с timestamp 2 дня назад), чем 'кажется свежие' (build prošёл сегодня, но данные внутри вчерашние).\n\n**4. Тесты не покрывают stale.**\n\nTests like `not_null`, `unique`, `relationships` проверяют **внутреннюю консистентность** данных, не **актуальность**. Если данные просто старые — внутренне они валидны. Тесты пройдут зелёно. Но данные устарели.\n\nFreshness — **специально про актуальность**. Это другой класс проверки, дополняет tests.\n\n**5. SLA как контракт.**\n\nFreshness threshold = **SLA с consumers**. 'Если данные свежее 24h — данные пригодны для решений. Иначе нет'. Это договор:\n- Data team гарантирует 24h SLA.\n- Consumers полагаются на это.\n- Freshness gate — enforcement этого SLA.\n\nБез gate нет contractual freshness — кто-то построит dashboard, увидит данные, не задумается о возрасте.\n\n**Когда build БЕЗ freshness gate валиден:**\n\n- В CI на PR — там source freshness не релевантен (testing logic, not data freshness).\n- В dev среде — пишем код, не тестируем production data.\n- При full-rebuild от seeds (sources не используются) — нет смысла checking sources.\n\nВ **production scheduled jobs** — freshness gate обязателен.\n\n**Главный урок**: tests проверяют валидность данных, freshness проверяет актуальность. Это разные класcа. Freshness gate ДО build — defense-in-depth pattern: ловим проблему на самом раннем уровне, экономим compute, чистый error message, защита SLA.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 6. Почему source freshness check ДО dbt build лучше чем полагаться на тесты внутри моделей?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5