Source freshness как gate в production jobs
В production dbt-jobs самая частая причина «грязных» данных — stale sources. Fivetran завис, Stripe API упал, кто-то отключил cron, и в warehouse продолжают лежать данные за вчера. dbt build проходит зелёным, но dashboard показывает старые цифры.
dbt source freshness решает эту проблему: проверка свежести source-таблиц до запуска моделей. Если данные устарели — фейлим job сразу, не пересчитываем модели на устаревших данных. Это fail-fast gate.
В этом уроке — как настроить freshness, как использовать как gate, и как делать retry-on-failure (через dbt retry).
Зачем freshness gate
Без gate типичный сценарий:
03:00 — Fivetran должен загрузить orders из Stripe. Не загрузил (API timeout).
05:00 — Cron запустил `dbt build`. Sources те же что вчера, но dbt не знает.
05:30 — dbt build завершился [x]. fct_orders, fct_revenue построены на старых данных.
06:00 — Pipeline отправил email "all green".
09:00 — Бизнес смотрит dashboards. Выручка за вчера = 0 (новые orders отсутствуют).
09:15 — Паника. Звонят data engineer.
С freshness gate:
03:00 — Fivetran должен загрузить. Не загрузил.
05:00 — Cron запустил dbt source freshness.
05:01 — freshness fail: stripe.orders 30 hours old (threshold 24 hours).
05:01 — Pipeline сразу fail. dbt build НЕ запустился. Алерт в Slack.
05:02 — On-call инженер видит алерт, проверяет Fivetran, перезапускает.
06:00 — Sources свежие, pipeline retry, все модели построены на свежих данных.
09:00 — Бизнес видит правильные числа.
Принцип — проверяй данные на входе, а не на выходе.
freshness декларация в _sources.yml
# models/_sources.yml
sources:
- name: stripe
database: raw
schema: stripe_raw
loaded_at_field: _loaded_at
freshness:
warn_after:
count: 12
period: hour
error_after:
count: 24
period: hour
tables:
- name: orders
freshness:
warn_after:
count: 6
period: hour
error_after:
count: 12
period: hour
- name: customers
# Использует source-level freshness (24h error)
Разбор:
| Поле | Значение |
|---|---|
loaded_at_field | Колонка с timestamp последнего обновления строки. dbt смотрит на MAX этой колонки. |
freshness.warn_after.count/period | Если данные старше этого — WARN. |
freshness.error_after.count/period | Если старше — ERROR (fail). |
Можно задавать на уровне source (default для всех таблиц) и переопределять на уровне table (более строго для критичных).
В примере: stripe.orders — должны быть свежее 12 часов (12h error). stripe.customers — fallback на source level (24h error).
loaded_at_field
Это критичное поле. dbt вычислит:
SELECT MAX(_loaded_at) FROM stripe_raw.orders
И сравнит с CURRENT_TIMESTAMP - 12 hours.
Если в source нет колонки с временем загрузки (например, immutable dump):
- Можно не указать
loaded_at_field— но тогда freshness не работает. - Можно использовать псевдоколонку (
_dbt_loaded_at = CURRENT_TIMESTAMPчерез Fivetran metadata). - Можно явно отключить:
freshness: null.
Команда dbt source freshness
dbt source freshness
Проверяет все sources с freshness блоком. Результаты:
Sources freshness check:
PASS - stripe.orders (max _loaded_at: 2026-05-19 04:00:00, 4 hours ago)
WARN - salesforce.accounts (max _loaded_at: 2026-05-18 21:00:00, 12 hours ago)
FAIL - shopify.products (max _loaded_at: 2026-05-17 12:00:00, 47 hours ago)
Exit code: 1 (have errors)
Также генерируется target/sources.json — JSON-отчёт со всеми результатами. Полезен для downstream tooling (dashboards, alerts).
Селекторы
Проверить только конкретные:
# Один источник
dbt source freshness --select source:stripe
# Несколько
dbt source freshness --select source:stripe source:salesforce
# Конкретная таблица
dbt source freshness --select source:stripe.orders
В CI/cron используют --select для критичных sources — те которые обязаны быть свежими для production reporting.
Использование как gate в workflow
# .github/workflows/dbt-prod.yml
name: dbt Production
on:
schedule:
- cron: '0 5 * * *'
workflow_dispatch:
jobs:
freshness-gate:
runs-on: ubuntu-latest
outputs:
should-proceed: ${'{{'} steps.freshness.outputs.proceed {'}}'}
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install dbt-snowflake==1.10.0
- run: dbt deps
- name: Check source freshness
id: freshness
run: |
dbt source freshness --target prod || {
echo "Source freshness failed — sending alert and blocking pipeline"
echo "proceed=false" >> $GITHUB_OUTPUT
exit 1
}
echo "proceed=true" >> $GITHUB_OUTPUT
env:
SNOWFLAKE_PASSWORD: ${'{{'} secrets.SNOWFLAKE_PASSWORD {'}}'}
- name: Notify on freshness fail
if: failure()
uses: slackapi/slack-github-action@v1
with:
channel-id: ${'{{'} secrets.SLACK_DATA_CHANNEL {'}}'}
slack-message: "ALERT: Source freshness check failed in prod. dbt build skipped."
env:
SLACK_BOT_TOKEN: ${'{{'} secrets.SLACK_BOT_TOKEN {'}}'}
dbt-build:
needs: freshness-gate
if: needs.freshness-gate.outputs.should-proceed == 'true'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install dbt-snowflake==1.10.0
- run: dbt deps
- run: dbt build --target prod
env:
SNOWFLAKE_PASSWORD: ${'{{'} secrets.SNOWFLAKE_PASSWORD {'}}'}
Ключевая часть — if: needs.freshness-gate.outputs.should-proceed == 'true'. Если freshness не прошёл, dbt-build job не запустится. И всё это раннее. Без compute расходов на бесполезный build.
Severity: warn vs error
freshness:
warn_after: {count: 6, period: hour} # Soft warning
error_after: {count: 24, period: hour} # Hard fail
- warn —
dbt source freshnessпоказывает WARN, но exit code = 0. Pipeline продолжает. - error — exit code = 1, pipeline останавливается.
Реалистичная стратегия:
- warn at typical SLA — например, 1.5x normal load time. Если ETL обычно занимает 4 часа, warn после 6h. Это signal: «что-то медленнее обычного, посмотреть».
- error at hard SLA — где данные становятся непригодны для бизнеса. Например 24h — данные старше суток непригодны для daily reporting.
В _sources.yml:
freshness:
warn_after: {count: 6, period: hour}
error_after: {count: 24, period: hour}
Можно использовать filter если данные с историей:
loaded_at_field: created_at
freshness:
filter: created_at > current_date - interval '7 days'
error_after: {count: 24, period: hour}
filter ограничивает что dbt считает «свежим». Если в источнике 5 лет данных, без фильтра MAX(created_at) — для свежих записей. С фильтром — только последние 7 дней, эффективнее.
Selective freshness — не все sources одинаково критичны
sources:
raw.stripe.* — critical (revenue depends)
raw.salesforce.* — important (sales depends, refresh daily)
raw.archive.* — not critical (historical, можно без freshness)
В YAML:
sources:
- name: stripe
freshness:
error_after: {count: 24, period: hour}
tables:
- name: orders
- name: customers
- name: salesforce
freshness:
warn_after: {count: 24, period: hour}
error_after: {count: 48, period: hour}
tables:
- name: accounts
- name: archive_data
freshness: null # disable freshness check
tables:
- name: legacy_data_2020
freshness: null — отключает проверку. Полезно для immutable historical sources.
dbt retry — retry-on-failure
После dbt build чего-то прошло, что-то нет. Можно перезапустить только упавшие модели:
# Первый запуск
dbt build --target prod
# Падает на fct_revenue с error (warehouse timeout)
# Retry — только то что упало (и все downstream)
dbt retry --target prod
dbt retry смотрит на target/run_results.json от последнего run, находит модели со статусом error, и перезапускает их вместе с downstream.
Это критично для transient errors (warehouse hiccup, network issue) которые повторяются раз в 1000 запусков. Без retry — приходится либо перезапускать весь build (дорого), либо вручную селектить упавшие модели.
Retry в CI
- name: dbt build with retry
run: |
dbt build --target prod || (
echo "First run failed, retrying once"
sleep 60
dbt retry --target prod
)
Один retry с задержкой 60 секунд покрывает большинство transient errors. Если и retry упал — это реальная проблема, нужно человеку посмотреть.
Retry — для transient errors, не для логических. Если SQL в модели неправильный, retry поможет ровно ничему. Бесконечный retry-loop = маскировка реальных проблем. Один retry — норма, больше — анти-паттерн.
Конкретные periods и SLA
| Source | period | error_after | warn_after | Комментарий |
|---|---|---|---|---|
| Real-time orders | minute | 10 minutes | 5 minutes | Strict SLA, alerting в Slack/PagerDuty. |
| Daily customers | hour | 24 hours | 12 hours | Daily batch, должно быть свежее сегодня. |
| CRM accounts | hour | 48 hours | 24 hours | Слабый SLA. |
| Historical events | day | 7 days | 3 days | Не критично. |
| Reference data | day | 30 days | 14 days | Стабильные dimension таблицы. |
Эти SLA — отражение business needs, не технических. Узнать у consumers (BI users, analysts): «что критично иметь свежее?».
Source freshness + dashboards
Результаты dbt source freshness сохраняются в sources.json. Можно прокидывать в:
- dbt docs —
dbt docs generateподхватывает sources.json, в Explorer видна свежесть. - Datadog/Grafana — экспорт через парсинг sources.json.
- dbt Cloud Explorer — нативная интеграция.
- Custom dashboard — Snowflake таблица
_dbt_source_freshness_log, обновляется через on-run-end hook.
Пример макроса для логирования:
-- macros/log_freshness.sql
{% macro log_freshness() %}
{% if execute %}
{% set sources = graph.sources.values() | list %}
-- INSERT INTO snapshots.source_freshness_log ...
{% endif %}
{% endmacro %}
В dbt_project.yml:
on-run-end:
- "{'{{ log_freshness() }}'}"
Каждый run пишется в табличку — historical track.
Best practice: split production jobs
В реальном production не один большой dbt build, а несколько jobs:
01:00 — source_freshness_job
↓ (если pass)
02:00 — staging_job (dbt build --select staging.*)
↓
03:00 — marts_job (dbt build --select marts.*)
↓
04:00 — exposure_refresh_job (notify BI tools)
Каждый job — отдельный workflow. Они зависят последовательно. Если freshness падает — staging/marts не запускаются. Если staging падает — marts не запускается.
Это defense in depth и fail-fast. Не пересчитываем marts на сломанном staging.
Попробуй сам
В вашем dbt-проекте:
- Добавьте freshness в
models/_sources.yml:
sources:
- name: jaffle_shop
loaded_at_field: _loaded_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: orders
- name: customers
- Если у вас в source нет
_loaded_at— добавьте его через seed или mock:
-- В DuckDB через инлайн
CREATE TABLE main.orders AS
SELECT *, CURRENT_TIMESTAMP - INTERVAL '30 hours' AS _loaded_at
FROM main.orders
- Запустите:
dbt source freshness
Получите WARN/ERROR (зависит от вашего mock).
-
Сделайте данные «свежими» (обновите _loaded_at в одной из таблиц), снова запустите. Увидите PASS.
-
Посмотрите
target/sources.json— JSON отчёт со всеми результатами.
Бонус: добавьте filter: created_at > current_date - interval '7 days' в freshness блок. Посмотрите как меняется поведение.
Ключевые выводы
- Source freshness — gate в production pipelines. Проверяет что raw данные не устарели. Останавливает build до моделей, не после.
- Конфиг в _sources.yml:
loaded_at_field(колонка с timestamp),freshness.warn_after/error_after(пороги). - Команда
dbt source freshness: проверяет, выводит PASS/WARN/FAIL, exit code 1 на error. Селекторы для конкретных sources (--select source:stripe). - CI gate: jobs с
needs:иif:— если freshness fail, downstream jobs не запускаются. Экономия compute + раннее обнаружение проблемы. - warn vs error: warn для soft SLA (что-то медленнее обычного), error для hard SLA (данные непригодны). Реалистичные значения 1.5x typical для warn, 24h для error на daily batch.
- freshness: null — отключить проверку (для historical/immutable sources).
- dbt retry: перезапуск только упавших моделей через
run_results.json. Полезен для transient errors. Один retry — норма, бесконечный — анти-паттерн. - Best practice: разделить production на jobs (freshness -> staging -> marts -> exposures). Каждый зависит от предыдущего через
needs:.