Learning Platform
Глоссарий Troubleshooting
Урок 13.05 · 14 мин
Продвинутый
Serving LayerPipeline DAGParquetProduction DeploymentAirflowCI/CDSelf-Assessment

Сервисный слой и обзор проекта

Финальный слой: Serving

В предыдущих уроках мы построили полный pipeline: ingestion -> transformation -> quality validation. Теперь добавим последний элемент — serving layer, который делает gold tables доступными для downstream-потребителей: BI-инструментов, аналитиков и ML-моделей.

Публикация Gold Tables

Выбор формата

Gold tables в нашем pipeline хранятся в Delta Lake. Для serving у нас два варианта:

  • Delta Lake — если потребители поддерживают Delta (Databricks, Spark, Trino с Delta connector)
  • Parquet — универсальный формат, читаемый любым инструментом (Pandas, DuckDB, Presto, Tableau)

Для максимальной совместимости создадим Parquet-копии gold tables:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EcommerceCapstone_Serving") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

gold_tables = ["daily_revenue", "city_revenue", "product_rankings"]

for table in gold_tables:
    df = spark.read.format("delta").load(f"/data/gold/{table}")

    df.coalesce(1).write \
        .mode("overwrite") \
        .parquet(f"/data/serving/{table}")

    print(f"Published {table}: {df.count()} rows")

coalesce(1) создаёт один файл — удобно для небольших агрегированных таблиц, которые будут скачиваться целиком. Для больших таблиц (миллионы строк) используйте партиционирование.

Проверка знанийKnowledge check
ОтветAnswer

Полный Pipeline DAG

Посмотрим на весь pipeline как единый DAG (directed acyclic graph):

Pipeline Architecture (Review)
orders.csv(source)
customers.json
BRONZE LAYERorders_bronze (Delta, part.)customers_bronze (Delta)
GE Bronze Gatenull checks, ranges, valid status
SILVER LAYERenriched_orders (join + dedup)customers_dim (SCD Type 1)
GE Silver Gateunique order_id, ref integrity
GOLD LAYERdaily_revenue · city_revenue (+ running total)product_rankings (+ dense_rank)
SERVING LAYERParquet exports for BI/ML
Side channel
Quarantine tablesinvalid rows
Quality logvalidation metrics

Каждый блок — отдельный этап, который можно перезапустить независимо. Delta Lake time travel позволяет откатить любой слой к предыдущей версии.

Production Deployment

Оркестрация с Airflow

В production pipeline запускается по расписанию через Apache Airflow (М12). Каждый этап — Airflow task:

# Пример Airflow DAG (концептуальный)
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

with DAG("ecommerce_pipeline", schedule="0 6 * * *") as dag:
    ingest = SparkSubmitOperator(
        task_id="bronze_ingestion",
        application="/pipeline/01_ingest.py",
    )
    transform = SparkSubmitOperator(
        task_id="silver_transform",
        application="/pipeline/02_transform.py",
    )
    validate = SparkSubmitOperator(
        task_id="quality_validation",
        application="/pipeline/03_validate.py",
    )
    serve = SparkSubmitOperator(
        task_id="gold_serving",
        application="/pipeline/04_serve.py",
    )

    ingest >> transform >> validate >> serve

CI/CD

Как мы обсуждали в модуле М12:

  • Unit-тесты для каждого этапа (pytest + pyspark.testing)
  • Integration-тесты на sample data перед deploy
  • Schema contracts — schema bronze/silver/gold зафиксированы в коде

Самооценка проекта

Используйте этот чеклист для проверки вашего pipeline:

Ingestion (Bronze)

  • CSV orders читаются с явной StructType-схемой (не inferSchema)
  • JSON customers читаются с явной схемой
  • Bronze layer партиционирован по year/month
  • Данные записаны в Delta Lake формат

Transformation (Silver)

  • Orders обогащены через inner join с customers
  • Дубликаты удалены через row_number() + window
  • Вычислен total_amount = quantity * price
  • Customer dimension обновляется через SCD Type 1 (Delta MERGE)

Quality (Gates)

  • Bronze gate: null-проверки, диапазоны price/quantity
  • Silver gate: уникальность order_id
  • Referential integrity проверена (left_anti join)
  • Невалидные строки отправляются в quarantine

Analytics (Gold)

  • daily_revenue агрегирует выручку по дням
  • city_revenue включает running total через window function
  • product_rankings использует dense_rank для ранжирования

Serving

  • Gold tables экспортированы в Parquet
  • Pipeline можно перезапустить без потери данных (idempotent)

Расширения проекта

Pipeline можно развивать в нескольких направлениях:

Streaming источник (М09)

Вместо batch-загрузки CSV подключите Structured Streaming к Kafka topic. Orders приходят в реальном времени, bronze layer обновляется непрерывно. Паттерны из модуля М09 (Structured Streaming) применимы напрямую.

Apache Iceberg (М10)

Замените Delta Lake на Apache Iceberg для vendor-neutral lakehouse. Iceberg поддерживает те же ACID-гарантии, time travel и schema evolution, но работает с любым query engine (Trino, Flink, Dremio).

Мониторинг и alerting (М08)

Добавьте Spark UI метрики и Grafana-дашборд для мониторинга pipeline: время выполнения каждого этапа, объём обработанных данных, количество quarantine-строк. Паттерны из модуля М08 (мониторинг и отладка) применимы здесь.

ML Feature Store

Gold tables можно использовать как feature store для ML-моделей: customer_lifetime_value, product_affinity_scores, churn_probability_features. Spark MLlib (М14) интегрируется с Delta Lake для versioned features.

Итоги модуля

Вы построили полный end-to-end data pipeline:

  1. Ingestion — CSV/JSON -> Delta Lake bronze с schema enforcement
  2. Transformation — Joins, aggregations, window functions -> silver/gold layers
  3. Quality — Great Expectations gates + quarantine pattern
  4. Serving — Parquet exports для BI и analytics

Pipeline интегрирует навыки из всех модулей курса: DataFrame API (М03), performance tuning (М04), lakehouse formats (М10), data quality (М13), CI/CD patterns (М12) и production operations (М14).

Этот проект демонстрирует паттерн, который используется в production data engineering: medallion architecture с quality gates на каждом переходе. Масштаб может быть от тысяч строк (как в нашем примере) до петабайтов — PySpark API остаётся тем же.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5