dbt + Spark: модели и инкрементальная обработка
Что такое dbt и зачем он для Spark?
dbt (data build tool) — фреймворк для SQL-first трансформаций данных. Вместо написания PySpark-кода вы определяете модели как SELECT-запросы, а dbt управляет:
- Зависимостями между моделями (через
ref()macro) - Материализацией (table, view, incremental)
- Тестированием данных (not_null, unique, custom)
- Документированием (auto-generated lineage graph)
Установка и настройка
dbt-spark adapter
# Установка
# pip install dbt-spark[PyHive] # для Spark Thrift Server
# pip install dbt-spark[ODBC] # для ODBC-подключения
# pip install dbt-databricks # для Databricks (отдельный adapter)
profiles.yml
# ~/.dbt/profiles.yml
spark_project:
target: dev
outputs:
dev:
type: spark
method: thrift
host: spark-thrift-server
port: 10001
user: dbt_user
schema: analytics_dev
threads: 4
production:
type: spark
method: thrift
host: spark-thrift-prod
port: 10001
user: dbt_service
schema: analytics
threads: 8
# Databricks вариант
databricks_prod:
type: databricks
host: adb-1234567890.azuredatabricks.net
http_path: /sql/1.0/warehouses/abc123
token: "{{ env_var('DBT_DATABRICKS_TOKEN') }}"
schema: analytics
threads: 4
Структура dbt-проекта
# spark-analytics/
# ├── dbt_project.yml
# ├── profiles.yml
# ├── models/
# │ ├── staging/
# │ │ ├── stg_orders.sql
# │ │ └── stg_customers.sql
# │ ├── intermediate/
# │ │ └── int_order_metrics.sql
# │ └── marts/
# │ └── fct_daily_revenue.sql
# ├── seeds/
# │ └── country_codes.csv
# ├── tests/
# │ └── assert_revenue_positive.sql
# └── macros/
# └── date_utils.sql
# dbt_project.yml
name: spark_analytics
version: "1.0.0"
profile: spark_project
model-paths: ["models"]
seed-paths: ["seeds"]
test-paths: ["tests"]
models:
spark_analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: table
marts:
+materialized: incremental
+schema: analytics
SQL-модели с ref()
Каждая модель — это SELECT-запрос. ref() macro создаёт зависимости:
-- models/staging/stg_orders.sql
-- Staging: минимальная очистка, приведение типов
SELECT
order_id,
customer_id,
CAST(order_date AS DATE) AS order_date,
CAST(amount AS DECIMAL(10, 2)) AS amount,
status,
CAST(updated_at AS TIMESTAMP) AS updated_at
FROM {{ source('raw', 'orders') }}
WHERE status != 'CANCELLED'
-- models/staging/stg_customers.sql
SELECT
customer_id,
customer_name,
region,
CAST(created_at AS DATE) AS signup_date
FROM {{ source('raw', 'customers') }}
-- models/intermediate/int_order_metrics.sql
-- Intermediate: бизнес-логика, joins
SELECT
o.order_id,
o.customer_id,
c.customer_name,
c.region,
o.order_date,
o.amount,
SUM(o.amount) OVER (
PARTITION BY o.customer_id
ORDER BY o.order_date
) AS cumulative_spend
FROM {{ ref('stg_orders') }} o
JOIN {{ ref('stg_customers') }} c
ON o.customer_id = c.customer_id
-- models/marts/fct_daily_revenue.sql
-- Mart: business-ready aggregation
SELECT
order_date,
region,
COUNT(DISTINCT customer_id) AS unique_customers,
COUNT(order_id) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM {{ ref('int_order_metrics') }}
GROUP BY order_date, region
ref() автоматически строит DAG зависимостей:
stg_orders ──┐
├──→ int_order_metrics ──→ fct_daily_revenue
stg_customers┘
Incremental Materialization
Incremental — ключевая стратегия для больших объёмов. Вместо пересоздания всей таблицы dbt обрабатывает только новые данные:
-- models/marts/fct_daily_revenue.sql
{{ config(
materialized='incremental',
unique_key='order_date || region',
incremental_strategy='merge',
on_schema_change='append_new_columns',
file_format='delta'
) }}
SELECT
order_date,
region,
COUNT(DISTINCT customer_id) AS unique_customers,
COUNT(order_id) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value
FROM {{ ref('int_order_metrics') }}
{% if is_incremental() %}
-- При incremental run: только новые данные
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
GROUP BY order_date, region
Как работает is_incremental():
Первый запуск (full refresh):
is_incremental() = false
→ SELECT без WHERE → создаётся полная таблица
Последующие запуски:
is_incremental() = true
→ SELECT с WHERE updated_at > MAX(updated_at)
→ MERGE новых строк в существующую таблицу
Incremental strategies для Spark:
| Strategy | Описание | File Format |
|---|---|---|
append | INSERT новых строк | Любой |
merge | MERGE по unique_key (upsert) | Delta Lake |
insert_overwrite | Перезапись partition | Hive/Parquet |
# Запуск dbt
# dbt run # все модели
# dbt run --select fct_daily_revenue # одна модель
# dbt run --full-refresh # полная перезагрузка incremental
# dbt run --select tag:daily # по тегу
Тестирование в dbt
Generic tests (встроенные)
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: amount
tests:
- not_null
- name: status
tests:
- accepted_values:
values: ['PENDING', 'COMPLETED', 'REFUNDED']
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_id
Custom Spark SQL tests
-- tests/assert_revenue_positive.sql
-- Тест проходит, если запрос возвращает 0 строк
SELECT order_date, region, total_revenue
FROM {{ ref('fct_daily_revenue') }}
WHERE total_revenue < 0
# Запуск тестов
# dbt test # все тесты
# dbt test --select stg_orders # тесты одной модели
# dbt build # run + test в правильном порядке
dbt + Spark vs SDP (Spark 4.1)
Spark4.1Spark Declarative Pipelines (SDP) в Spark 4.1 — встроенная альтернатива dbt для Spark:
| Критерий | dbt + Spark | SDP (Spark 4.1) |
|---|---|---|
| Язык | SQL (Jinja macros) | Python + SQL |
| Инфраструктура | Thrift Server / Databricks | Встроен в Spark |
| Incremental | is_incremental() + strategies | @dlt.table(is_streaming=True) |
| Тестирование | generic + custom SQL tests | Expectations framework |
| Lineage | dbt docs generate | Встроен в Spark UI |
| Ecosystem | dbt Hub (packages), community | Spark-native |
| Когда использовать | SQL-first команды, multi-engine | Spark-only, Python-first |
# SDP пример (для сравнения)
from pyspark import pipelines as dp
@dp.table
def daily_revenue():
return (
dp.read("int_order_metrics")
.groupBy("order_date", "region")
.agg(
count("order_id").alias("total_orders"),
sum("amount").alias("total_revenue"),
)
)
Когда dbt, когда SDP? Если ваша команда думает в SQL и использует несколько движков (Spark + Snowflake + BigQuery) — dbt. Если вы Spark-only и предпочитаете Python — SDP. Оба инструмента решают одну задачу: декларативные трансформации с dependency management.
Что дальше?
В заключительном уроке модуля мы соберём все production-рекомендации в единый Best Practices checklist.