Learning Platform
Глоссарий Troubleshooting
Урок 13.03 · 18 мин
Продвинутый
JoinsAggregationsWindow FunctionsSCD Type 1Delta Lake MergeSilver LayerGold Layer

Трансформации и хранилище

От Bronze к Silver и Gold

В предыдущем уроке мы загрузили сырые данные в bronze layer. Теперь построим два следующих слоя:

  • Silver — обогащённые данные: join orders + customers, deduplication, SCD merge
  • Gold — агрегированные бизнес-таблицы: revenue по дням, городам, ранжирование товаров

Эти трансформации используют все ключевые возможности PySpark, которые мы изучали в модулях М03 (DataFrames, joins, window functions) и М04 (performance tuning для joins).

Silver Layer: обогащение данных

Чтение из Bronze

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EcommerceCapstone_Silver") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

orders = spark.read.format("delta").load("/data/bronze/orders")
customers = spark.read.format("delta").load("/data/bronze/customers")

Join orders + customers

Inner join по customer_id обогащает каждый заказ информацией о клиенте (имя, город, дата регистрации):

enriched = orders.join(
    customers,
    on="customer_id",
    how="inner"
).select(
    orders["order_id"],
    orders["customer_id"],
    customers["name"].alias("customer_name"),
    customers["city"],
    orders["product_id"],
    orders["quantity"],
    orders["price"],
    orders["order_date"],
    orders["status"],
)

Мы используем inner join, потому что заказы без клиента — аномалия, которую перехватит quality gate на уроке 04. В М03 мы разбирали стратегии join (broadcast, sort-merge, shuffle hash). Для нашего набора данных Spark автоматически выберет broadcast join для таблицы customers (она значительно меньше orders).

Deduplication

Удаляем дубликаты orders по order_id, оставляя последнюю запись:

from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

dedup_window = Window.partitionBy("order_id").orderBy(col("order_date").desc())

silver_orders = enriched \
    .withColumn("rn", row_number().over(dedup_window)) \
    .filter(col("rn") == 1) \
    .drop("rn")

Этот паттерн через row_number() + window — стандартный способ дедупликации в PySpark. Альтернатива dropDuplicates(["order_id"]) не гарантирует, какую строку оставит.

Вычисляемые колонки

from pyspark.sql.functions import round as spark_round

silver_orders = silver_orders.withColumn(
    "total_amount",
    spark_round(col("quantity") * col("price"), 2)
)

Запись Silver Layer

silver_orders.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/data/silver/enriched_orders")
Проверка знанийKnowledge check
ОтветAnswer

SCD Type 1: обновление customer dimension

Когда клиент меняет город или email, мы обновляем его запись без сохранения истории — это SCD Type 1. Delta Lake MERGE делает это атомарно.

from delta.tables import DeltaTable

# Предполагаем, что silver customer dimension уже существует
if DeltaTable.isDeltaTable(spark, "/data/silver/customers"):
    silver_customers = DeltaTable.forPath(spark, "/data/silver/customers")

    silver_customers.alias("target").merge(
        customers.alias("source"),
        "target.customer_id = source.customer_id"
    ).whenMatchedUpdate(set={
        "name": "source.name",
        "email": "source.email",
        "city": "source.city",
    }).whenNotMatchedInsertAll().execute()
else:
    # Первый запуск: создаём таблицу
    customers.write \
        .format("delta") \
        .mode("overwrite") \
        .save("/data/silver/customers")

SCD Type 1 — простейший вариант: перезаписываем изменённые поля. Для сценариев, где нужна история изменений (например, отслеживание переезда клиента между городами), используют SCD Type 2 с колонками valid_from / valid_to. Подробнее — в М10 (Delta Lake merge patterns).

Проверка знанийKnowledge check
ОтветAnswer

Gold Layer: бизнес-агрегаты

Gold layer содержит готовые для потребления таблицы. Мы создадим три:

1. Daily Revenue

from pyspark.sql.functions import sum as spark_sum, count

daily_revenue = silver_orders \
    .filter(col("status") == "completed") \
    .groupBy("order_date") \
    .agg(
        spark_sum("total_amount").alias("revenue"),
        count("order_id").alias("order_count"),
    ) \
    .orderBy("order_date")

daily_revenue.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/data/gold/daily_revenue")

2. City Revenue с Running Total

Window function для running total — кумулятивная выручка по каждому городу:

from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.window import Window

city_window = Window \
    .partitionBy("city") \
    .orderBy("order_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

city_revenue = silver_orders \
    .filter(col("status") == "completed") \
    .groupBy("city", "order_date") \
    .agg(spark_sum("total_amount").alias("daily_revenue")) \
    .withColumn(
        "running_total",
        spark_sum("daily_revenue").over(city_window)
    ) \
    .orderBy("city", "order_date")

city_revenue.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/data/gold/city_revenue")

3. Product Rankings

Ранжирование товаров по суммарной выручке с использованием dense_rank:

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

product_stats = silver_orders \
    .filter(col("status") == "completed") \
    .groupBy("product_id") \
    .agg(
        spark_sum("total_amount").alias("total_revenue"),
        spark_sum("quantity").alias("total_quantity"),
        count("order_id").alias("order_count"),
    )

rank_window = Window.orderBy(col("total_revenue").desc())

product_rankings = product_stats.withColumn(
    "rank", dense_rank().over(rank_window)
)

product_rankings.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/data/gold/product_rankings")

Проверка результатов

# Silver layer
silver = spark.read.format("delta").load("/data/silver/enriched_orders")
print(f"Silver orders: {silver.count()}")
silver.show(5)

# Gold tables
for table in ["daily_revenue", "city_revenue", "product_rankings"]:
    gold = spark.read.format("delta").load(f"/data/gold/{table}")
    print(f"\n{table}: {gold.count()} rows")
    gold.show(5)

Оптимизация

Для production-нагрузок рекомендуем запустить OPTIMIZE на Delta-таблицах после записи:

spark.sql("OPTIMIZE delta.`/data/silver/enriched_orders`")
spark.sql("OPTIMIZE delta.`/data/gold/daily_revenue`")

OPTIMIZE объединяет мелкие файлы в оптимальные по размеру (target size ~1GB). Подробнее о OPTIMIZE и Z-ORDER — в модуле М10.

Итоги

Silver и gold layers готовы:

  • Silver: enriched orders (join + dedup + total_amount), customer dimension с SCD Type 1
  • Gold: daily_revenue, city_revenue (с running total), product_rankings (с dense_rank)
  • Все таблицы в Delta Lake с transaction log и time travel

В следующем уроке мы добавим quality gates на каждом переходе между слоями через Great Expectations.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5