Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 15 мин
Продвинутый
Data IngestionStructTypeSchema EnforcementDelta LakeBronze LayerCSVJSON

Загрузка данных

Первый слой pipeline: Bronze

В предыдущем уроке мы определили архитектуру pipeline и схему данных. Теперь начнём строить — с bronze layer, который загружает сырые данные из CSV и JSON в Delta Lake.

Ключевой принцип bronze: никаких трансформаций. Мы только читаем, валидируем схему и сохраняем. Это гарантирует, что исходные данные всегда доступны для перезапуска pipeline.

Настройка SparkSession

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EcommerceCapstone") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

Delta Lake требует два конфига: DeltaSparkSessionExtension для SQL-команд (MERGE, OPTIMIZE) и DeltaCatalog для регистрации таблиц в каталоге. Мы подробно разбирали эту настройку в модуле М10 (Delta Lake).

Чтение orders из CSV

Определение схемы

Мы используем явную схему через StructType вместо inferSchema=True. Как мы обсуждали в модуле М03 (DataFrames), inferSchema:

  • Требует дополнительный проход по всем данным для определения типов
  • Может неправильно определить типы (например, price как INTEGER вместо DECIMAL)
  • Не гарантирует стабильность схемы при изменении данных
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, DecimalType, DateType, StringType
)

orders_schema = StructType([
    StructField("order_id", IntegerType(), nullable=False),
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("product_id", IntegerType(), nullable=False),
    StructField("quantity", IntegerType(), nullable=False),
    StructField("price", DecimalType(10, 2), nullable=False),
    StructField("order_date", DateType(), nullable=False),
    StructField("status", StringType(), nullable=False),
])

Обратите внимание на nullable=False для всех полей. При schema enforcement строки с null в NOT NULL-полях попадут в corrupt records — мы обработаем их через quality gate в уроке 04.

Чтение CSV

orders_raw = spark.read \
    .schema(orders_schema) \
    .option("header", True) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .csv("/data/raw/orders.csv")

Режим PERMISSIVE сохраняет все строки, добавляя колонку _corrupt_record для строк, которые не соответствуют схеме. В production pipeline это лучше, чем FAILFAST, потому что мы не теряем валидные строки из-за единичных ошибок.

Проверка знанийKnowledge check
ОтветAnswer

Чтение customers из JSON

customers_schema = StructType([
    StructField("customer_id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("email", StringType(), nullable=False),
    StructField("city", StringType(), nullable=False),
    StructField("registration_date", DateType(), nullable=False),
])

customers_raw = spark.read \
    .schema(customers_schema) \
    .json("/data/raw/customers.json")

JSON-файлы PySpark читает в формате JSON Lines (один JSON-объект на строку). Это стандартный формат для экспорта из NoSQL-баз и REST API.

Запись в Delta Lake Bronze

Партиционирование orders

Orders партиционируем по year и month из order_date. Это оптимизирует запросы вида «все заказы за январь 2024» — Spark читает только нужные партиции (partition pruning, М04).

from pyspark.sql.functions import year, month

orders_bronze = orders_raw \
    .withColumn("year", year("order_date")) \
    .withColumn("month", month("order_date"))

orders_bronze.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .save("/data/bronze/orders")

Запись customers

Customers — справочная таблица (dimension), которая обновляется через SCD-паттерн. На bronze-слое сохраняем как есть:

customers_raw.write \
    .format("delta") \
    .mode("overwrite") \
    .save("/data/bronze/customers")

Проверка результата

После записи проверяем, что данные доступны и схема корректна:

# Проверяем orders
bronze_orders = spark.read.format("delta").load("/data/bronze/orders")
print(f"Orders count: {bronze_orders.count()}")
bronze_orders.printSchema()

# Проверяем customers
bronze_customers = spark.read.format("delta").load("/data/bronze/customers")
print(f"Customers count: {bronze_customers.count()}")
bronze_customers.printSchema()

Вывод printSchema() должен точно соответствовать определённым StructType-схемам (плюс year и month для orders).

Инкрементальная загрузка

В production pipeline начальная загрузка (full load) выполняется один раз. Последующие загрузки — инкрементальные: только новые файлы.

# Инкрементальный append для новых файлов
new_orders = spark.read \
    .schema(orders_schema) \
    .csv("/data/raw/orders_2024_02.csv")

new_orders \
    .withColumn("year", year("order_date")) \
    .withColumn("month", month("order_date")) \
    .write \
    .format("delta") \
    .mode("append") \
    .partitionBy("year", "month") \
    .save("/data/bronze/orders")

Delta Lake автоматически добавляет новые файлы к существующей таблице, обновляя transaction log. Time travel (М10) позволяет вернуться к любой предыдущей версии.

Итоги

Bronze layer готов. У нас есть:

  • orders в Delta Lake, партиционированные по year/month
  • customers в Delta Lake как справочная таблица
  • Явные схемы, гарантирующие стабильность формата данных
  • PERMISSIVE-режим для обработки corrupt records

В следующем уроке мы построим silver layer: join orders + customers, deduplication, SCD Type 1 merge и вычисляемые колонки.

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5