Spark Declarative Pipelines (SDP) Spark4.1
Spark 4.1+ only
Spark Declarative Pipelines (SDP) доступны начиная с Spark 4.1 (декабрь 2025). Код из этого урока не будет работать на Spark 4.0 и ниже.
Что такое SDP?
Spark Declarative Pipelines (SDP) — открытая версия Databricks Delta Live Tables (DLT). Вместо императивного кода (“прочитай -> трансформируй -> запиши”), вы декларативно описываете таблицы и их зависимости, а Spark автоматически:
- Определяет порядок выполнения (dependency resolution)
- Управляет инкрементальной обработкой
- Применяет data quality expectations
- Оптимизирует выполнение пайплайна
# Императивный подход (до SDP):
raw = spark.readStream.format("kafka").load()
parsed = raw.select(from_json(...))
parsed.writeStream.format("delta").start("/silver/orders")
# Декларативный подход (SDP):
@dp.materialized_view
def silver_orders():
return spark.table("bronze_raw_events") \
.filter(col("event_type") == "order") \
.select("order_id", "customer_id", "amount", "ts")
Import Path
from pyspark import pipelines as dp
Anti-pattern: DLT import path
НЕ используйте import dlt — это Databricks-проприетарный import. В open-source Spark используется:
from pyspark import pipelines as dp # правильно
# import dlt # НЕПРАВИЛЬНО -- Databricks onlyКлючевые концепции
@dp.materialized_view (batch recompute)
Materialized view — таблица, которая полностью пересчитывается при каждом запуске пайплайна. Подходит для batch ETL:
from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum as _sum, count
@dp.materialized_view
def silver_orders():
"""Валидированные заказы из bronze."""
return (
spark.table("bronze_raw_events")
.filter(col("event_type") == "order")
.filter(col("amount") > 0)
.select(
col("order_id"),
col("customer_id"),
col("amount").cast("double"),
col("ts").cast("timestamp")
)
)
@dp.materialized_view
def gold_daily_revenue():
"""Ежедневная выручка -- агрегат из silver."""
return (
spark.table("silver_orders")
.groupBy(col("ts").cast("date").alias("order_date"))
.agg(
_sum("amount").alias("total_revenue"),
count("*").alias("order_count")
)
)
Spark автоматически определяет зависимость: gold_daily_revenue зависит от silver_orders, которая зависит от bronze_raw_events. Порядок выполнения определяется автоматически.
@dp.table (streaming append)
Table — streaming-таблица, куда данные дописываются инкрементально:
@dp.table
def streaming_orders():
"""Streaming append из Kafka."""
return (
spark.readStream
.format("kafka")
.option("subscribe", "orders")
.option("kafka.bootstrap.servers", "kafka:9092")
.load()
.select(
from_json(
col("value").cast("string"),
order_schema
).alias("data")
)
.select("data.*")
)
Разница с materialized_view: @dp.table использует readStream и дописывает новые данные инкрементально, а @dp.materialized_view полностью пересчитывает таблицу.
Flows (custom processing)
Flow — механизм для кастомной обработки между таблицами, когда стандартного materialized_view / table недостаточно:
@dp.table
def raw_events():
"""Target table для custom flow."""
return spark.readStream.format("kafka").load()
@dp.flow(target="enriched_events", mode="append")
def enrich_events():
"""Custom flow: обогащение событий dimension-данными."""
events = spark.table("raw_events")
dimensions = spark.table("dim_customers")
return events.join(dimensions, "customer_id", "left")
Expectations (Data Quality)
SDP поддерживает expectations — декларативные правила качества данных:
@dp.materialized_view
@dp.expect("amount_positive", "amount > 0")
@dp.expect("customer_not_null", "customer_id IS NOT NULL")
@dp.expect_or_drop("valid_status", "status IN ('new', 'paid', 'shipped')")
def validated_orders():
return spark.table("bronze_orders")
Типы expectations:
@dp.expect— предупреждение (данные проходят, ошибка логгируется)@dp.expect_or_drop— фильтрация (невалидные записи отбрасываются)@dp.expect_or_fail— hard fail (пайплайн останавливается при нарушении)
Связь с медальонной архитектурой
SDP идеально ложится на medallion architecture:
from pyspark import pipelines as dp
# Bronze: raw ingestion
@dp.table
def bronze_events():
return spark.readStream.format("kafka") \
.option("subscribe", "events").load()
# Silver: validation + dedup
@dp.materialized_view
@dp.expect_or_drop("has_event_id", "event_id IS NOT NULL")
def silver_events():
return spark.table("bronze_events") \
.dropDuplicates(["event_id"]) \
.select("event_id", "user_id", "amount", "event_time")
# Gold: business aggregates
@dp.materialized_view
def gold_user_totals():
return spark.table("silver_events") \
.groupBy("user_id") \
.agg(sum("amount").alias("lifetime_value"))
SDP автоматически: определяет порядок bronze -> silver -> gold, управляет streaming checkpoints для bronze, полностью пересчитывает gold при обновлении silver.
Связь с Delta Live Tables (DLT)
| Характеристика | DLT (Databricks) | SDP (Open Source) |
|---|---|---|
| Import | import dlt | from pyspark import pipelines as dp |
| Платформа | Databricks only | Любой Spark 4.1+ |
| Доступность | С 2020 | С декабря 2025 |
| Expectations | Полные | Полные (тот же API) |
| Движок | Databricks Runtime | Apache Spark |
Миграция DLT -> SDP
Если у вас есть DLT-пайплайны в Databricks, миграция на SDP требует замены import dlt на from pyspark import pipelines as dp и замены dlt.table() / dlt.view() на @dp.table / @dp.materialized_view. Логика трансформаций остаётся идентичной.
Anti-patterns
Частые ошибки при работе с SDP
-
Использование
import dlt— это Databricks-проприетарный import. В open-source Spark 4.1 используйтеfrom pyspark import pipelines as dp. -
Императивный pipeline management в SDP — не используйте
writeStream.start()внутри@dp.table. SDP сам управляет lifecycle streaming queries. -
Циклические зависимости —
table_aчитает изtable_b, аtable_bизtable_a. SDP не разрешит циклы в DAG зависимостей.
Что дальше?
В последнем уроке модуля мы соберём лучшие практики Lakehouse: стратегии партиционирования, расписания compaction, политики retention и контроль качества данных на каждом слое медальонной архитектуры.