Learning Platform
Глоссарий Troubleshooting
Урок 10.07 · 14 мин
Продвинутый
dbtdbt-sparkIncremental ModelsSQL ModelsData TransformationMaterialization

dbt + Spark: модели и инкрементальная обработка

Что такое dbt и зачем он для Spark?

dbt (data build tool) — фреймворк для SQL-first трансформаций данных. Вместо написания PySpark-кода вы определяете модели как SELECT-запросы, а dbt управляет:

  • Зависимостями между моделями (через ref() macro)
  • Материализацией (table, view, incremental)
  • Тестированием данных (not_null, unique, custom)
  • Документированием (auto-generated lineage graph)
dbt Architecture с Spark
dbt CLI / Cloud
Models (SELECT SQL)Tests (data assertions)Docs (auto-lineage)
SQL через dbt-spark adapter
Spark Thrift Server(или Databricks SQL)
Spark Engine(execution)

Установка и настройка

dbt-spark adapter

# Установка
# pip install dbt-spark[PyHive]     # для Spark Thrift Server
# pip install dbt-spark[ODBC]       # для ODBC-подключения
# pip install dbt-databricks        # для Databricks (отдельный adapter)

profiles.yml

# ~/.dbt/profiles.yml
spark_project:
  target: dev
  outputs:
    dev:
      type: spark
      method: thrift
      host: spark-thrift-server
      port: 10001
      user: dbt_user
      schema: analytics_dev
      threads: 4

    production:
      type: spark
      method: thrift
      host: spark-thrift-prod
      port: 10001
      user: dbt_service
      schema: analytics
      threads: 8

    # Databricks вариант
    databricks_prod:
      type: databricks
      host: adb-1234567890.azuredatabricks.net
      http_path: /sql/1.0/warehouses/abc123
      token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}"
      schema: analytics
      threads: 4

Структура dbt-проекта

# spark-analytics/
# ├── dbt_project.yml
# ├── profiles.yml
# ├── models/
# │   ├── staging/
# │   │   ├── stg_orders.sql
# │   │   └── stg_customers.sql
# │   ├── intermediate/
# │   │   └── int_order_metrics.sql
# │   └── marts/
# │       └── fct_daily_revenue.sql
# ├── seeds/
# │   └── country_codes.csv
# ├── tests/
# │   └── assert_revenue_positive.sql
# └── macros/
#     └── date_utils.sql
# dbt_project.yml
name: spark_analytics
version: "1.0.0"
profile: spark_project

model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]

models:
  spark_analytics:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: table
    marts:
      +materialized: incremental
      +schema: analytics

SQL-модели с ref()

Каждая модель — это SELECT-запрос. ref() macro создаёт зависимости:

-- models/staging/stg_orders.sql
-- Staging: минимальная очистка, приведение типов

SELECT
    order_id,
    customer_id,
    CAST(order_date AS DATE) AS order_date,
    CAST(amount AS DECIMAL(10, 2)) AS amount,
    status,
    CAST(updated_at AS TIMESTAMP) AS updated_at
FROM {{ source('raw', 'orders') }}
WHERE status != 'CANCELLED'
-- models/staging/stg_customers.sql

SELECT
    customer_id,
    customer_name,
    region,
    CAST(created_at AS DATE) AS signup_date
FROM {{ source('raw', 'customers') }}
-- models/intermediate/int_order_metrics.sql
-- Intermediate: бизнес-логика, joins

SELECT
    o.order_id,
    o.customer_id,
    c.customer_name,
    c.region,
    o.order_date,
    o.amount,
    SUM(o.amount) OVER (
        PARTITION BY o.customer_id
        ORDER BY o.order_date
    ) AS cumulative_spend
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_customers') }} c
    ON o.customer_id = c.customer_id
-- models/marts/fct_daily_revenue.sql
-- Mart: business-ready aggregation

SELECT
    order_date,
    region,
    COUNT(DISTINCT customer_id) AS unique_customers,
    COUNT(order_id) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value
FROM {{ ref('int_order_metrics') }}
GROUP BY order_date, region

ref() автоматически строит DAG зависимостей:

stg_orders ──┐
             ├──→ int_order_metrics ──→ fct_daily_revenue
stg_customers┘

Incremental Materialization

Incremental — ключевая стратегия для больших объёмов. Вместо пересоздания всей таблицы dbt обрабатывает только новые данные:

-- models/marts/fct_daily_revenue.sql
{{ config(
    materialized='incremental',
    unique_key='order_date || region',
    incremental_strategy='merge',
    on_schema_change='append_new_columns',
    file_format='delta'
) }}

SELECT
    order_date,
    region,
    COUNT(DISTINCT customer_id) AS unique_customers,
    COUNT(order_id) AS total_orders,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value
FROM {{ ref('int_order_metrics') }}

{% if is_incremental() %}
    -- При incremental run: только новые данные
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

GROUP BY order_date, region

Как работает is_incremental():

Первый запуск (full refresh):
  is_incremental() = false
  → SELECT без WHERE → создаётся полная таблица

Последующие запуски:
  is_incremental() = true
  → SELECT с WHERE updated_at > MAX(updated_at)
  → MERGE новых строк в существующую таблицу

Incremental strategies для Spark:

StrategyОписаниеFile Format
appendINSERT новых строкЛюбой
mergeMERGE по unique_key (upsert)Delta Lake
insert_overwriteПерезапись partitionHive/Parquet
# Запуск dbt
# dbt run                           # все модели
# dbt run --select fct_daily_revenue # одна модель
# dbt run --full-refresh            # полная перезагрузка incremental
# dbt run --select tag:daily        # по тегу

Тестирование в dbt

Generic tests (встроенные)

# models/staging/schema.yml
version: 2

models:
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: amount
        tests:
          - not_null
      - name: status
        tests:
          - accepted_values:
              values: ['PENDING', 'COMPLETED', 'REFUNDED']
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customers')
              field: customer_id

Custom Spark SQL tests

-- tests/assert_revenue_positive.sql
-- Тест проходит, если запрос возвращает 0 строк

SELECT order_date, region, total_revenue
FROM {{ ref('fct_daily_revenue') }}
WHERE total_revenue < 0
# Запуск тестов
# dbt test                          # все тесты
# dbt test --select stg_orders      # тесты одной модели
# dbt build                         # run + test в правильном порядке

dbt + Spark vs SDP (Spark 4.1)

Spark4.1

Spark Declarative Pipelines (SDP) в Spark 4.1 — встроенная альтернатива dbt для Spark:

Критерийdbt + SparkSDP (Spark 4.1)
ЯзыкSQL (Jinja macros)Python + SQL
ИнфраструктураThrift Server / DatabricksВстроен в Spark
Incrementalis_incremental() + strategies@dlt.table(is_streaming=True)
Тестированиеgeneric + custom SQL testsExpectations framework
Lineagedbt docs generateВстроен в Spark UI
Ecosystemdbt Hub (packages), communitySpark-native
Когда использоватьSQL-first команды, multi-engineSpark-only, Python-first
# SDP пример (для сравнения)
from pyspark import pipelines as dp

@dp.table
def daily_revenue():
    return (
        dp.read("int_order_metrics")
        .groupBy("order_date", "region")
        .agg(
            count("order_id").alias("total_orders"),
            sum("amount").alias("total_revenue"),
        )
    )
TIP

Когда dbt, когда SDP? Если ваша команда думает в SQL и использует несколько движков (Spark + Snowflake + BigQuery) — dbt. Если вы Spark-only и предпочитаете Python — SDP. Оба инструмента решают одну задачу: декларативные трансформации с dependency management.

Проверка знанийKnowledge check
Как работает incremental materialization в dbt-spark? Что делает is_incremental() и зачем нужен unique_key?
ОтветAnswer
Incremental materialization обрабатывает только новые данные вместо пересоздания таблицы. is_incremental() возвращает true при повторных запусках (таблица уже существует и не full-refresh) -- позволяет добавить WHERE-фильтр для отбора только новых строк (обычно по updated_at > MAX(updated_at) из текущей таблицы). unique_key определяет ключ для MERGE (upsert): если строка с таким ключом уже существует -- UPDATE, иначе INSERT. Без unique_key dbt делает append (может создать дубликаты). MERGE strategy требует Delta Lake file format.
Проверка знанийKnowledge check
Зачем в dbt используется ref() macro вместо прямых имён таблиц? Какие проблемы это решает?
ОтветAnswer
ref() macro решает три проблемы: (1) Автоматический dependency graph -- dbt знает порядок выполнения моделей и запускает их в правильной последовательности; (2) Environment isolation -- ref('stg_orders') в dev разрешается в analytics_dev.stg_orders, в prod -- в analytics.stg_orders (через schema prefix из profiles.yml); (3) Refactoring safety -- при переименовании модели dbt покажет ошибку во всех downstream-моделях, использующих ref() (вместо silent broken query).

Что дальше?

В заключительном уроке модуля мы соберём все production-рекомендации в единый Best Practices checklist.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Зачем в dbt используется ref() macro вместо прямых имён таблиц в SQL-моделях?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8