Трансформации и хранилище
От Bronze к Silver и Gold
В предыдущем уроке мы загрузили сырые данные в bronze layer. Теперь построим два следующих слоя:
- Silver — обогащённые данные: join orders + customers, deduplication, SCD merge
- Gold — агрегированные бизнес-таблицы: revenue по дням, городам, ранжирование товаров
Эти трансформации используют все ключевые возможности PySpark, которые мы изучали в модулях М03 (DataFrames, joins, window functions) и М04 (performance tuning для joins).
Silver Layer: обогащение данных
Чтение из Bronze
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("EcommerceCapstone_Silver") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
orders = spark.read.format("delta").load("/data/bronze/orders")
customers = spark.read.format("delta").load("/data/bronze/customers")
Join orders + customers
Inner join по customer_id обогащает каждый заказ информацией о клиенте (имя, город, дата регистрации):
enriched = orders.join(
customers,
on="customer_id",
how="inner"
).select(
orders["order_id"],
orders["customer_id"],
customers["name"].alias("customer_name"),
customers["city"],
orders["product_id"],
orders["quantity"],
orders["price"],
orders["order_date"],
orders["status"],
)
Мы используем inner join, потому что заказы без клиента — аномалия, которую перехватит quality gate на уроке 04. В М03 мы разбирали стратегии join (broadcast, sort-merge, shuffle hash). Для нашего набора данных Spark автоматически выберет broadcast join для таблицы customers (она значительно меньше orders).
Deduplication
Удаляем дубликаты orders по order_id, оставляя последнюю запись:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
dedup_window = Window.partitionBy("order_id").orderBy(col("order_date").desc())
silver_orders = enriched \
.withColumn("rn", row_number().over(dedup_window)) \
.filter(col("rn") == 1) \
.drop("rn")
Этот паттерн через row_number() + window — стандартный способ дедупликации в PySpark. Альтернатива dropDuplicates(["order_id"]) не гарантирует, какую строку оставит.
Вычисляемые колонки
from pyspark.sql.functions import round as spark_round
silver_orders = silver_orders.withColumn(
"total_amount",
spark_round(col("quantity") * col("price"), 2)
)
Запись Silver Layer
silver_orders.write \
.format("delta") \
.mode("overwrite") \
.save("/data/silver/enriched_orders")
SCD Type 1: обновление customer dimension
Когда клиент меняет город или email, мы обновляем его запись без сохранения истории — это SCD Type 1. Delta Lake MERGE делает это атомарно.
from delta.tables import DeltaTable
# Предполагаем, что silver customer dimension уже существует
if DeltaTable.isDeltaTable(spark, "/data/silver/customers"):
silver_customers = DeltaTable.forPath(spark, "/data/silver/customers")
silver_customers.alias("target").merge(
customers.alias("source"),
"target.customer_id = source.customer_id"
).whenMatchedUpdate(set={
"name": "source.name",
"email": "source.email",
"city": "source.city",
}).whenNotMatchedInsertAll().execute()
else:
# Первый запуск: создаём таблицу
customers.write \
.format("delta") \
.mode("overwrite") \
.save("/data/silver/customers")
SCD Type 1 — простейший вариант: перезаписываем изменённые поля. Для сценариев, где нужна история изменений (например, отслеживание переезда клиента между городами), используют SCD Type 2 с колонками valid_from / valid_to. Подробнее — в М10 (Delta Lake merge patterns).
Gold Layer: бизнес-агрегаты
Gold layer содержит готовые для потребления таблицы. Мы создадим три:
1. Daily Revenue
from pyspark.sql.functions import sum as spark_sum, count
daily_revenue = silver_orders \
.filter(col("status") == "completed") \
.groupBy("order_date") \
.agg(
spark_sum("total_amount").alias("revenue"),
count("order_id").alias("order_count"),
) \
.orderBy("order_date")
daily_revenue.write \
.format("delta") \
.mode("overwrite") \
.save("/data/gold/daily_revenue")
2. City Revenue с Running Total
Window function для running total — кумулятивная выручка по каждому городу:
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.window import Window
city_window = Window \
.partitionBy("city") \
.orderBy("order_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
city_revenue = silver_orders \
.filter(col("status") == "completed") \
.groupBy("city", "order_date") \
.agg(spark_sum("total_amount").alias("daily_revenue")) \
.withColumn(
"running_total",
spark_sum("daily_revenue").over(city_window)
) \
.orderBy("city", "order_date")
city_revenue.write \
.format("delta") \
.mode("overwrite") \
.save("/data/gold/city_revenue")
3. Product Rankings
Ранжирование товаров по суммарной выручке с использованием dense_rank:
from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window
product_stats = silver_orders \
.filter(col("status") == "completed") \
.groupBy("product_id") \
.agg(
spark_sum("total_amount").alias("total_revenue"),
spark_sum("quantity").alias("total_quantity"),
count("order_id").alias("order_count"),
)
rank_window = Window.orderBy(col("total_revenue").desc())
product_rankings = product_stats.withColumn(
"rank", dense_rank().over(rank_window)
)
product_rankings.write \
.format("delta") \
.mode("overwrite") \
.save("/data/gold/product_rankings")
Проверка результатов
# Silver layer
silver = spark.read.format("delta").load("/data/silver/enriched_orders")
print(f"Silver orders: {silver.count()}")
silver.show(5)
# Gold tables
for table in ["daily_revenue", "city_revenue", "product_rankings"]:
gold = spark.read.format("delta").load(f"/data/gold/{table}")
print(f"\n{table}: {gold.count()} rows")
gold.show(5)
Оптимизация
Для production-нагрузок рекомендуем запустить OPTIMIZE на Delta-таблицах после записи:
spark.sql("OPTIMIZE delta.`/data/silver/enriched_orders`")
spark.sql("OPTIMIZE delta.`/data/gold/daily_revenue`")
OPTIMIZE объединяет мелкие файлы в оптимальные по размеру (target size ~1GB). Подробнее о OPTIMIZE и Z-ORDER — в модуле М10.
Итоги
Silver и gold layers готовы:
- Silver: enriched orders (join + dedup + total_amount), customer dimension с SCD Type 1
- Gold: daily_revenue, city_revenue (с running total), product_rankings (с dense_rank)
- Все таблицы в Delta Lake с transaction log и time travel
В следующем уроке мы добавим quality gates на каждом переходе между слоями через Great Expectations.