Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 20 мин
Начальный
freshnessloaded_at_fieldwarn_aftererror_afterCI gate

В прошлых уроках мы научились декларировать sources и обращаться к ним через source(). Третья большая фича source-системы — freshness checks. Это способ ответить на вопрос: “когда последний раз обновлялись raw-данные, и не пора ли паниковать?”.

Без freshness-проверок типичная история: dbt-pipeline отрабатывает зелёным, дашборд показывает данные, но они на самом деле устарели на сутки — потому что Fivetran-коннектор тихо сломался ночью. Никто не заметил, потому что dbt тесты не падают (данные внутренне консистентны, просто старые).

Как dbt понимает, что таблица “свежая”

dbt не может сам определить, когда таблица обновлялась. Он не следит за warehouse-метаданными — это специфично для каждой СУБД, не у всех есть надёжный last_modified_time. Поэтому dbt полагается на поле в самой таблице, в котором loader записывает timestamp своей операции.

Это поле называется loaded_at_field, и его нужно указать в YAML. Логика freshness — это просто:

SELECT MAX(<loaded_at_field>) FROM <source_table>

Если разница между этим max и текущим временем больше порога — freshness fail.

Что делает freshness-проверка

dbt подключается к warehouse, выполняет SELECT MAX(loaded_at) на каждой задекларированной source-таблице, сравнивает с порогами warn_after/error_after, и выдаёт статус.

dbt source freshnessкоманда
SELECT MAX(loaded_at) FROM sourceна warehouse
now() - max(loaded_at)разница
< warn_after: PASSwarn_after — мягкий порог. dbt подсветит warning, но не упадёт. Подходит для рутинного мониторинга.
warn_after - error_after: WARNМежду warn_after и error_after — дбт скажет WARN. В CI это можно сделать blocking или passing, как настроишь.
> error_after: ERRORerror_after — твёрдый порог. dbt вернёт exit code != 0, downstream pipeline можно блокировать.

Анатомия freshness-блока в YAML

Полный пример с двумя таблицами и разными настройками:

version: 2

sources:
  - name: jaffle_shop
    database: raw
    schema: jaffle_shop
    loaded_at_field: _fivetran_synced
    freshness:
      warn_after: { count: 6, period: hour }
      error_after: { count: 24, period: hour }
    tables:
      - name: orders
        # наследует loaded_at_field и freshness из source-уровня

      - name: payments
        # переопределяет порог для критичной таблицы
        freshness:
          warn_after: { count: 1, period: hour }
          error_after: { count: 4, period: hour }

      - name: legacy_customers
        # вообще выключает freshness для архивной таблицы
        freshness: null

Разберём по полям:

  • loaded_at_field: _fivetran_synced — колонка с timestamp загрузки. Может наследоваться от source к таблицам или переопределяться на уровне таблицы. Если loader не пишет timestamp — нужно сделать его самому в процессе загрузки.
  • warn_after: { count: 6, period: hour } — мягкий порог. Через 6 часов после max(loaded_at_field) freshness статус “WARN”.
  • error_after: { count: 24, period: hour } — твёрдый порог. Через 24 часа — “ERROR” и exit code 1.
  • freshness: null — отключает freshness для конкретной таблицы. Используется для архивных/исторических данных, которые обновляются редко.

period принимает: minute, hour, day. Этого хватает для большинства задач.

Команда dbt source freshness

Запускается отдельно от dbt run:

dbt source freshness

Output:

14:23:01  Running dbt source freshness
14:23:01  Concurrency: 4 threads
14:23:02  1 of 3 START freshness of jaffle_shop.orders ........... [RUN]
14:23:02  2 of 3 START freshness of jaffle_shop.payments .......... [RUN]
14:23:02  3 of 3 START freshness of jaffle_shop.customers ......... [RUN]
14:23:03  1 of 3 PASS freshness of jaffle_shop.orders ............. [PASS in 0.42s]
14:23:03  2 of 3 WARN freshness of jaffle_shop.payments ........... [WARN in 0.51s]
14:23:03  3 of 3 ERROR freshness of jaffle_shop.customers ......... [ERROR in 0.38s]

Done. PASS=1 WARN=1 ERROR=1 SKIP=0 TOTAL=3

Каждая таблица — отдельная задача, работают параллельно (как обычные модели). Если хотя бы одна вернула ERROR — exit code будет 1. CLI-output показывает время до устаревания для каждой таблицы.

Можно запустить только для отдельного source:

dbt source freshness --select source:jaffle_shop

Или только для одной таблицы:

dbt source freshness --select source:jaffle_shop.orders

Filter для уточнения максимума

Иногда таблица содержит исторические данные, которые загружали один раз и больше не трогают, плюс активные данные, которые обновляются ежедневно. Если просто взять MAX(loaded_at), старые записи могут забить выборку, и реальная свежесть не определится.

Для таких случаев есть filter:

sources:
  - name: jaffle_shop
    loaded_at_field: _fivetran_synced
    freshness:
      warn_after: { count: 6, period: hour }
      error_after: { count: 24, period: hour }
      filter: created_at > '2024-01-01'
    tables:
      - name: events

Скомпилируется примерно в:

SELECT MAX(_fivetran_synced) FROM raw.jaffle_shop.events WHERE created_at > '2024-01-01'

Реже встречается, но полезно знать.

Freshness как gate в CI

Самый ценный паттерн использования: запускать dbt source freshness перед dbt run. Если raw-данные устарели — нет смысла пересчитывать staging/marts, всё равно получим вчерашние цифры в дашбордах.

Базовый bash в CI-job:

#!/bin/bash
set -e

dbt source freshness || {
    echo "[ERROR] Source freshness failed, aborting downstream run"
    exit 1
}

dbt run
dbt test

В GitHub Actions:

- name: Check freshness
  run: dbt source freshness

- name: Run models
  if: success()
  run: dbt run

- name: Run tests
  if: success()
  run: dbt test

Когда что-то ломается — pipeline останавливается на freshness, ошибка обзорно показывает, какая таблица не обновлялась, дальше можно искать причину в loader.

TIP

В небольших проектах достаточно сделать freshness не блокирующим (continue-on-error: true в GH Actions), но логировать результат в Slack. В production-проектах с критичными дашбордами — обязательно блокировать downstream, иначе users увидят stale данные.

Результат: source.json и parse

dbt source freshness пишет результат в target/sources.json. Структура:

{
  "metadata": { ... },
  "results": [
    {
      "unique_id": "source.jaffle_shop.jaffle_shop.orders",
      "status": "pass",
      "max_loaded_at": "2026-05-19T13:42:17Z",
      "snapshotted_at": "2026-05-19T14:23:02Z",
      "criteria": {
        "warn_after": { "count": 6, "period": "hour" },
        "error_after": { "count": 24, "period": "hour" }
      }
    }
  ]
}

Можно парсить в Python/jq, заводить алерты, складывать в monitoring-систему. На junior-уровне это уже выходит за рамки, но полезно знать, что машиночитаемый артефакт существует.

Что НЕ умеет freshness

  • Не проверяет содержимое таблицы. Если loader записал 0 строк с timestamp = now() — freshness пройдёт, но данные пустые. Для row counts нужны обычные тесты (например, dbt_utils.expression_is_true на COUNT(*) > 0).
  • Не понимает расписание загрузок. dbt считает время от max(loaded_at) до now(). Если у тебя loader запускается раз в день в 02:00, а freshness check ты делаешь в 01:30 — формально таблица “устарела на 23 часа”, хотя завтра обновится через 30 минут. Решение — синхронизировать расписания или использовать фильтры.
  • Не работает на non-incremental external sources. Если source — это статичный CSV-файл, у него нет loaded_at. Для таких отключай freshness через freshness: null.

DuckDB-специфика для freshness

В DuckDB freshness работает так же, как на любом другом warehouse. Только пара нюансов:

  1. Локальный файл — нет latency. dbt source freshness на DuckDB отрабатывает за миллисекунды, потому что SELECT MAX по файлу мгновенный.
  2. Время в файле — UTC. DuckDB по умолчанию хранит timestamp без таймзоны (TIMESTAMP). Если loader пишет local time, а сравнение делается с UTC now(), можно получить ложно-устаревший статус. Решение — использовать TIMESTAMPTZ или явно конвертировать в UTC.
  3. Нет multi-session. Поскольку DuckDB single-writer per file, freshness check во время dbt run может встать в очередь. На практике для junior’а — невидимо, потому что обе операции быстрые.
NOTE

В реальной работе на Snowflake/BigQuery freshness check обращается к warehouse через сеть, занимает 0.5-2 секунды на таблицу, может стоить денег (если warehouse биллится по compute). На DuckDB это бесплатно и мгновенно — поэтому freshness можно гонять часто. Не привыкай к этому, если планируешь работать с cloud warehouses.

Попробуй сам

В своём Jaffle Shop добавь поле loaded_at_field и freshness в _sources.yml:

version: 2

sources:
  - name: jaffle_shop
    database: jaffle_shop
    schema: main
    loaded_at_field: created_at
    freshness:
      warn_after: { count: 1, period: hour }
      error_after: { count: 24, period: hour }
    tables:
      - name: raw_orders

Запусти:

dbt source freshness

Если created_at есть в таблице — увидишь PASS/WARN/ERROR. Если max(created_at) старше суток — ERROR. Попробуй изменить пороги (warn_after = 1 minute) и снова запустить, чтобы увидеть WARN.

Затем поэкспериментируй с отключением:

tables:
  - name: legacy_table
    freshness: null

dbt source freshness скипнёт эту таблицу.

Что мы поняли

Freshness — это механизм проверки свежести raw-данных через SELECT MAX(loaded_at_field). Указывается на уровне source или отдельной таблицы. Команда dbt source freshness запускает проверки параллельно, возвращает PASS/WARN/ERROR с exit code, и идеально вписывается как gate в CI-pipeline. На junior-проекте достаточно настроить разумные пороги (warn = период загрузки * 1.5, error = период * 3-5).

В следующем уроке разберём, как переопределять конфигурацию source — например, когда source приходит из package, и нужно подменить database/schema под свой проект.

Source freshness как gate в production CI/CD jobs Шесть измерений data quality — freshness как одно из них
Проверка знанийKnowledge check
Ты настроил freshness на таблицу events с loaded_at_field=created_at, warn_after=1 hour. dbt source freshness возвращает PASS, но в реальности данные не обновлялись неделю. В чём может быть проблема?
ОтветAnswer
Скорее всего created_at в таблице — это не "когда loader записал строку", а "когда событие произошло в источнике" (бизнес-time). Loader мог переписывать одни и те же старые записи. Решение: указать loaded_at_field на колонку, которую пишет именно loader (например _fivetran_synced, _airbyte_loaded_at, или специальная dbt_inserted_at). Или добавить filter, чтобы исключать исторические перезаписи.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 6. Что физически выполняет dbt source freshness в warehouse для одной таблицы?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4