Загрузка данных
Первый слой pipeline: Bronze
В предыдущем уроке мы определили архитектуру pipeline и схему данных. Теперь начнём строить — с bronze layer, который загружает сырые данные из CSV и JSON в Delta Lake.
Ключевой принцип bronze: никаких трансформаций. Мы только читаем, валидируем схему и сохраняем. Это гарантирует, что исходные данные всегда доступны для перезапуска pipeline.
Настройка SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("EcommerceCapstone") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
Delta Lake требует два конфига: DeltaSparkSessionExtension для SQL-команд (MERGE, OPTIMIZE) и DeltaCatalog для регистрации таблиц в каталоге. Мы подробно разбирали эту настройку в модуле М10 (Delta Lake).
Чтение orders из CSV
Определение схемы
Мы используем явную схему через StructType вместо inferSchema=True. Как мы обсуждали в модуле М03 (DataFrames), inferSchema:
- Требует дополнительный проход по всем данным для определения типов
- Может неправильно определить типы (например,
priceкак INTEGER вместо DECIMAL) - Не гарантирует стабильность схемы при изменении данных
from pyspark.sql.types import (
StructType, StructField,
IntegerType, DecimalType, DateType, StringType
)
orders_schema = StructType([
StructField("order_id", IntegerType(), nullable=False),
StructField("customer_id", IntegerType(), nullable=False),
StructField("product_id", IntegerType(), nullable=False),
StructField("quantity", IntegerType(), nullable=False),
StructField("price", DecimalType(10, 2), nullable=False),
StructField("order_date", DateType(), nullable=False),
StructField("status", StringType(), nullable=False),
])
Обратите внимание на nullable=False для всех полей. При schema enforcement строки с null в NOT NULL-полях попадут в corrupt records — мы обработаем их через quality gate в уроке 04.
Чтение CSV
orders_raw = spark.read \
.schema(orders_schema) \
.option("header", True) \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.csv("/data/raw/orders.csv")
Режим PERMISSIVE сохраняет все строки, добавляя колонку _corrupt_record для строк, которые не соответствуют схеме. В production pipeline это лучше, чем FAILFAST, потому что мы не теряем валидные строки из-за единичных ошибок.
Чтение customers из JSON
customers_schema = StructType([
StructField("customer_id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=False),
StructField("email", StringType(), nullable=False),
StructField("city", StringType(), nullable=False),
StructField("registration_date", DateType(), nullable=False),
])
customers_raw = spark.read \
.schema(customers_schema) \
.json("/data/raw/customers.json")
JSON-файлы PySpark читает в формате JSON Lines (один JSON-объект на строку). Это стандартный формат для экспорта из NoSQL-баз и REST API.
Запись в Delta Lake Bronze
Партиционирование orders
Orders партиционируем по year и month из order_date. Это оптимизирует запросы вида «все заказы за январь 2024» — Spark читает только нужные партиции (partition pruning, М04).
from pyspark.sql.functions import year, month
orders_bronze = orders_raw \
.withColumn("year", year("order_date")) \
.withColumn("month", month("order_date"))
orders_bronze.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.save("/data/bronze/orders")
Запись customers
Customers — справочная таблица (dimension), которая обновляется через SCD-паттерн. На bronze-слое сохраняем как есть:
customers_raw.write \
.format("delta") \
.mode("overwrite") \
.save("/data/bronze/customers")
Проверка результата
После записи проверяем, что данные доступны и схема корректна:
# Проверяем orders
bronze_orders = spark.read.format("delta").load("/data/bronze/orders")
print(f"Orders count: {bronze_orders.count()}")
bronze_orders.printSchema()
# Проверяем customers
bronze_customers = spark.read.format("delta").load("/data/bronze/customers")
print(f"Customers count: {bronze_customers.count()}")
bronze_customers.printSchema()
Вывод printSchema() должен точно соответствовать определённым StructType-схемам (плюс year и month для orders).
Инкрементальная загрузка
В production pipeline начальная загрузка (full load) выполняется один раз. Последующие загрузки — инкрементальные: только новые файлы.
# Инкрементальный append для новых файлов
new_orders = spark.read \
.schema(orders_schema) \
.csv("/data/raw/orders_2024_02.csv")
new_orders \
.withColumn("year", year("order_date")) \
.withColumn("month", month("order_date")) \
.write \
.format("delta") \
.mode("append") \
.partitionBy("year", "month") \
.save("/data/bronze/orders")
Delta Lake автоматически добавляет новые файлы к существующей таблице, обновляя transaction log. Time travel (М10) позволяет вернуться к любой предыдущей версии.
Итоги
Bronze layer готов. У нас есть:
- orders в Delta Lake, партиционированные по year/month
- customers в Delta Lake как справочная таблица
- Явные схемы, гарантирующие стабильность формата данных
- PERMISSIVE-режим для обработки corrupt records
В следующем уроке мы построим silver layer: join orders + customers, deduplication, SCD Type 1 merge и вычисляемые колонки.