Learning Platform
Глоссарий Troubleshooting
Урок 09.07 · 12 мин
Продвинутый
SDPDeclarative Pipelinesmaterialized_viewtableFlowSpark 4.1

Spark Declarative Pipelines (SDP) Spark4.1

WARNING

Spark 4.1+ only

Spark Declarative Pipelines (SDP) доступны начиная с Spark 4.1 (декабрь 2025). Код из этого урока не будет работать на Spark 4.0 и ниже.

Что такое SDP?

Spark Declarative Pipelines (SDP) — открытая версия Databricks Delta Live Tables (DLT). Вместо императивного кода (“прочитай -> трансформируй -> запиши”), вы декларативно описываете таблицы и их зависимости, а Spark автоматически:

  • Определяет порядок выполнения (dependency resolution)
  • Управляет инкрементальной обработкой
  • Применяет data quality expectations
  • Оптимизирует выполнение пайплайна
# Императивный подход (до SDP):
raw = spark.readStream.format("kafka").load()
parsed = raw.select(from_json(...))
parsed.writeStream.format("delta").start("/silver/orders")

# Декларативный подход (SDP):
@dp.materialized_view
def silver_orders():
    return spark.table("bronze_raw_events") \
        .filter(col("event_type") == "order") \
        .select("order_id", "customer_id", "amount", "ts")

Import Path

from pyspark import pipelines as dp
WARNING

Anti-pattern: DLT import path

НЕ используйте import dlt — это Databricks-проприетарный import. В open-source Spark используется:

from pyspark import pipelines as dp  # правильно
# import dlt  # НЕПРАВИЛЬНО -- Databricks only

Ключевые концепции

@dp.materialized_view (batch recompute)

Materialized view — таблица, которая полностью пересчитывается при каждом запуске пайплайна. Подходит для batch ETL:

from pyspark import pipelines as dp
from pyspark.sql.functions import col, sum as _sum, count

@dp.materialized_view
def silver_orders():
    """Валидированные заказы из bronze."""
    return (
        spark.table("bronze_raw_events")
        .filter(col("event_type") == "order")
        .filter(col("amount") > 0)
        .select(
            col("order_id"),
            col("customer_id"),
            col("amount").cast("double"),
            col("ts").cast("timestamp")
        )
    )

@dp.materialized_view
def gold_daily_revenue():
    """Ежедневная выручка -- агрегат из silver."""
    return (
        spark.table("silver_orders")
        .groupBy(col("ts").cast("date").alias("order_date"))
        .agg(
            _sum("amount").alias("total_revenue"),
            count("*").alias("order_count")
        )
    )

Spark автоматически определяет зависимость: gold_daily_revenue зависит от silver_orders, которая зависит от bronze_raw_events. Порядок выполнения определяется автоматически.

@dp.table (streaming append)

Table — streaming-таблица, куда данные дописываются инкрементально:

@dp.table
def streaming_orders():
    """Streaming append из Kafka."""
    return (
        spark.readStream
        .format("kafka")
        .option("subscribe", "orders")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .load()
        .select(
            from_json(
                col("value").cast("string"),
                order_schema
            ).alias("data")
        )
        .select("data.*")
    )

Разница с materialized_view: @dp.table использует readStream и дописывает новые данные инкрементально, а @dp.materialized_view полностью пересчитывает таблицу.

Проверка знанийKnowledge check
В чём разница между @dp.materialized_view и @dp.table в SDP?
ОтветAnswer
@dp.materialized_view -- batch пересчёт: таблица полностью пересчитывается при каждом запуске пайплайна. Подходит для агрегатов, отчётов, витрин данных. @dp.table -- streaming append: данные дописываются инкрементально через readStream. Подходит для ingestion из Kafka, streaming pipelines. Materialized view гарантирует согласованность всех данных, а table -- минимальную задержку для новых данных.

Flows (custom processing)

Flow — механизм для кастомной обработки между таблицами, когда стандартного materialized_view / table недостаточно:

@dp.table
def raw_events():
    """Target table для custom flow."""
    return spark.readStream.format("kafka").load()

@dp.flow(target="enriched_events", mode="append")
def enrich_events():
    """Custom flow: обогащение событий dimension-данными."""
    events = spark.table("raw_events")
    dimensions = spark.table("dim_customers")
    return events.join(dimensions, "customer_id", "left")

Expectations (Data Quality)

SDP поддерживает expectations — декларативные правила качества данных:

@dp.materialized_view
@dp.expect("amount_positive", "amount > 0")
@dp.expect("customer_not_null", "customer_id IS NOT NULL")
@dp.expect_or_drop("valid_status", "status IN ('new', 'paid', 'shipped')")
def validated_orders():
    return spark.table("bronze_orders")

Типы expectations:

  • @dp.expect — предупреждение (данные проходят, ошибка логгируется)
  • @dp.expect_or_drop — фильтрация (невалидные записи отбрасываются)
  • @dp.expect_or_fail — hard fail (пайплайн останавливается при нарушении)
Проверка знанийKnowledge check
Как SDP решает проблему порядка выполнения в ETL-пайплайне?
ОтветAnswer
SDP использует автоматический dependency resolution. Каждая функция с декоратором @dp.materialized_view или @dp.table читает данные через spark.table('other_table'). SDP анализирует эти зависимости и автоматически строит DAG выполнения: определяет, какие таблицы нужно обработать первыми. Разработчик описывает WHAT (что вычислить), а SDP определяет HOW (в каком порядке). Это исключает ручное управление порядком запуска.

Связь с медальонной архитектурой

SDP идеально ложится на medallion architecture:

from pyspark import pipelines as dp

# Bronze: raw ingestion
@dp.table
def bronze_events():
    return spark.readStream.format("kafka") \
        .option("subscribe", "events").load()

# Silver: validation + dedup
@dp.materialized_view
@dp.expect_or_drop("has_event_id", "event_id IS NOT NULL")
def silver_events():
    return spark.table("bronze_events") \
        .dropDuplicates(["event_id"]) \
        .select("event_id", "user_id", "amount", "event_time")

# Gold: business aggregates
@dp.materialized_view
def gold_user_totals():
    return spark.table("silver_events") \
        .groupBy("user_id") \
        .agg(sum("amount").alias("lifetime_value"))

SDP автоматически: определяет порядок bronze -> silver -> gold, управляет streaming checkpoints для bronze, полностью пересчитывает gold при обновлении silver.

Связь с Delta Live Tables (DLT)

ХарактеристикаDLT (Databricks)SDP (Open Source)
Importimport dltfrom pyspark import pipelines as dp
ПлатформаDatabricks onlyЛюбой Spark 4.1+
ДоступностьС 2020С декабря 2025
ExpectationsПолныеПолные (тот же API)
ДвижокDatabricks RuntimeApache Spark
TIP

Миграция DLT -> SDP

Если у вас есть DLT-пайплайны в Databricks, миграция на SDP требует замены import dlt на from pyspark import pipelines as dp и замены dlt.table() / dlt.view() на @dp.table / @dp.materialized_view. Логика трансформаций остаётся идентичной.

Anti-patterns

WARNING

Частые ошибки при работе с SDP

  1. Использование import dlt — это Databricks-проприетарный import. В open-source Spark 4.1 используйте from pyspark import pipelines as dp.

  2. Императивный pipeline management в SDP — не используйте writeStream.start() внутри @dp.table. SDP сам управляет lifecycle streaming queries.

  3. Циклические зависимостиtable_a читает из table_b, а table_b из table_a. SDP не разрешит циклы в DAG зависимостей.

Что дальше?

В последнем уроке модуля мы соберём лучшие практики Lakehouse: стратегии партиционирования, расписания compaction, политики retention и контроль качества данных на каждом слое медальонной архитектуры.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое Spark Declarative Pipelines (SDP) и какой декоратор создаёт таблицу, которая полностью пересчитывается при каждом запуске?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8