Сервисный слой и обзор проекта
Финальный слой: Serving
В предыдущих уроках мы построили полный pipeline: ingestion -> transformation -> quality validation. Теперь добавим последний элемент — serving layer, который делает gold tables доступными для downstream-потребителей: BI-инструментов, аналитиков и ML-моделей.
Публикация Gold Tables
Выбор формата
Gold tables в нашем pipeline хранятся в Delta Lake. Для serving у нас два варианта:
- Delta Lake — если потребители поддерживают Delta (Databricks, Spark, Trino с Delta connector)
- Parquet — универсальный формат, читаемый любым инструментом (Pandas, DuckDB, Presto, Tableau)
Для максимальной совместимости создадим Parquet-копии gold tables:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("EcommerceCapstone_Serving") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
gold_tables = ["daily_revenue", "city_revenue", "product_rankings"]
for table in gold_tables:
df = spark.read.format("delta").load(f"/data/gold/{table}")
df.coalesce(1).write \
.mode("overwrite") \
.parquet(f"/data/serving/{table}")
print(f"Published {table}: {df.count()} rows")
coalesce(1) создаёт один файл — удобно для небольших агрегированных таблиц, которые будут скачиваться целиком. Для больших таблиц (миллионы строк) используйте партиционирование.
Полный Pipeline DAG
Посмотрим на весь pipeline как единый DAG (directed acyclic graph):
Каждый блок — отдельный этап, который можно перезапустить независимо. Delta Lake time travel позволяет откатить любой слой к предыдущей версии.
Production Deployment
Оркестрация с Airflow
В production pipeline запускается по расписанию через Apache Airflow (М12). Каждый этап — Airflow task:
# Пример Airflow DAG (концептуальный)
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
with DAG("ecommerce_pipeline", schedule="0 6 * * *") as dag:
ingest = SparkSubmitOperator(
task_id="bronze_ingestion",
application="/pipeline/01_ingest.py",
)
transform = SparkSubmitOperator(
task_id="silver_transform",
application="/pipeline/02_transform.py",
)
validate = SparkSubmitOperator(
task_id="quality_validation",
application="/pipeline/03_validate.py",
)
serve = SparkSubmitOperator(
task_id="gold_serving",
application="/pipeline/04_serve.py",
)
ingest >> transform >> validate >> serve
CI/CD
Как мы обсуждали в модуле М12:
- Unit-тесты для каждого этапа (pytest + pyspark.testing)
- Integration-тесты на sample data перед deploy
- Schema contracts — schema bronze/silver/gold зафиксированы в коде
Самооценка проекта
Используйте этот чеклист для проверки вашего pipeline:
Ingestion (Bronze)
- CSV orders читаются с явной StructType-схемой (не inferSchema)
- JSON customers читаются с явной схемой
- Bronze layer партиционирован по year/month
- Данные записаны в Delta Lake формат
Transformation (Silver)
- Orders обогащены через inner join с customers
- Дубликаты удалены через row_number() + window
- Вычислен total_amount = quantity * price
- Customer dimension обновляется через SCD Type 1 (Delta MERGE)
Quality (Gates)
- Bronze gate: null-проверки, диапазоны price/quantity
- Silver gate: уникальность order_id
- Referential integrity проверена (left_anti join)
- Невалидные строки отправляются в quarantine
Analytics (Gold)
- daily_revenue агрегирует выручку по дням
- city_revenue включает running total через window function
- product_rankings использует dense_rank для ранжирования
Serving
- Gold tables экспортированы в Parquet
- Pipeline можно перезапустить без потери данных (idempotent)
Расширения проекта
Pipeline можно развивать в нескольких направлениях:
Streaming источник (М09)
Вместо batch-загрузки CSV подключите Structured Streaming к Kafka topic. Orders приходят в реальном времени, bronze layer обновляется непрерывно. Паттерны из модуля М09 (Structured Streaming) применимы напрямую.
Apache Iceberg (М10)
Замените Delta Lake на Apache Iceberg для vendor-neutral lakehouse. Iceberg поддерживает те же ACID-гарантии, time travel и schema evolution, но работает с любым query engine (Trino, Flink, Dremio).
Мониторинг и alerting (М08)
Добавьте Spark UI метрики и Grafana-дашборд для мониторинга pipeline: время выполнения каждого этапа, объём обработанных данных, количество quarantine-строк. Паттерны из модуля М08 (мониторинг и отладка) применимы здесь.
ML Feature Store
Gold tables можно использовать как feature store для ML-моделей: customer_lifetime_value, product_affinity_scores, churn_probability_features. Spark MLlib (М14) интегрируется с Delta Lake для versioned features.
Итоги модуля
Вы построили полный end-to-end data pipeline:
- Ingestion — CSV/JSON -> Delta Lake bronze с schema enforcement
- Transformation — Joins, aggregations, window functions -> silver/gold layers
- Quality — Great Expectations gates + quarantine pattern
- Serving — Parquet exports для BI и analytics
Pipeline интегрирует навыки из всех модулей курса: DataFrame API (М03), performance tuning (М04), lakehouse formats (М10), data quality (М13), CI/CD patterns (М12) и production operations (М14).
Этот проект демонстрирует паттерн, который используется в production data engineering: medallion architecture с quality gates на каждом переходе. Масштаб может быть от тысяч строк (как в нашем примере) до петабайтов — PySpark API остаётся тем же.