Learning Platform
Глоссарий Troubleshooting
Урок 04.02 · 20 мин
Продвинутый
Batch PipelineIdempotencyBackfillIncrementalPartition Overwrite

Batch Pipeline Design Patterns

Pattern 1: Idempotent Pipeline

Проблема: Pipeline failed midway. Rerun processes some data twice.

Решение: Idempotent writes — повторный запуск даёт тот же результат:

# [NO] Non-idempotent (append)
df.write.mode("append").parquet("output/")
# Rerun → duplicates!

# [OK] Idempotent (overwrite partition)
df.write.mode("overwrite") \
  .partitionBy("date") \
  .parquet("output/")
# Rerun → same result (partition overwritten)

Partition Overwrite Strategy

Target: output/date=2026-03-23/
  
First run:  write 1000 records → output/date=2026-03-23/part-00000.parquet
Failed run: write 500 records → partial data
Rerun:      overwrite entire partition → 1000 records again [OK]

Key: granularity of overwrite = partition (date, hour)
  Too coarse (monthly): reprocessing 30 days for 1 day fix
  Too fine (hourly): 24 partitions per day, many small files
  Sweet spot: daily partitions for most batch jobs
Partitioning стратегии в Spark AQE: динамическое coalesce partitions

Pattern 2: Incremental Processing

Проблема: Reprocessing all data every day is expensive for 100TB+ datasets.

Решение: Process only new/changed data:

# Incremental pattern
last_processed = read_watermark("pipeline_state")  # e.g., 2026-03-22

new_data = spark.read.parquet("bronze/") \
  .filter(col("ingestion_date") > last_processed)

processed = transform(new_data)
processed.write.mode("append").parquet("silver/")

update_watermark("pipeline_state", current_date())
Full Refresh vs Incremental
Full Refresh
Incremental

Pattern 3: Backfill

Проблема: Logic changed → need to reprocess historical data.

Backfill scenarios:
  1. Bug fix: cleaning logic was wrong for 30 days
  2. New column: add "customer_segment" to Silver
  3. Schema change: split "name" into "first_name" + "last_name"
  4. New source: integrate new data source retroactively

Backfill approach:
  for date in date_range("2026-02-01", "2026-03-01"):
    run_pipeline(date)  # idempotent partition overwrite
    
  # Each date overwrites its partition → clean state after backfill

Backfill Design Considerations

ConcernApproach
Resource contentionRun backfill on separate cluster / off-peak hours
Data orderingProcess dates sequentially (dependencies)
Progress trackingLog completed dates, resume from last
ValidationCompare row counts before/after backfill
NotificationAlert downstream consumers about reprocessing

Pattern 4: SCD Type 2 (Slowly Changing Dimensions)

SCD Type 2: track history of dimension changes

dim_customers:
| customer_id | name    | city      | valid_from | valid_to   | is_current |
|-------------|---------|-----------|------------|------------|------------|
| 1           | Alice   | Moscow    | 2024-01-01 | 2025-06-15 | false      |
| 1           | Alice   | SPb       | 2025-06-15 | 9999-12-31 | true       |
| 2           | Bob     | Kazan     | 2024-03-01 | 9999-12-31 | true       |

When city changes:
  1. Close current record (valid_to = today, is_current = false)
  2. Insert new record (valid_from = today, is_current = true)

Query current: WHERE is_current = true
Query historical: WHERE valid_from <= @date AND valid_to > @date

Pattern 5: Data Quality Gates

# Quality gate between Bronze → Silver
bronze_df = spark.read.parquet("bronze/date=2026-03-23/")

# Quality checks
null_rate = bronze_df.filter(col("user_id").isNull()).count() / bronze_df.count()
duplicate_rate = 1 - bronze_df.dropDuplicates(["event_id"]).count() / bronze_df.count()

# Gate: fail if quality below threshold
assert null_rate < 0.01, f"Null rate {null_rate} exceeds 1% threshold"
assert duplicate_rate < 0.005, f"Duplicate rate {duplicate_rate} exceeds 0.5%"

# Only proceed if quality passed
silver_df = transform(bronze_df)
silver_df.write.mode("overwrite").parquet("silver/date=2026-03-23/")
Проверка знанийKnowledge check
ОтветAnswer

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 2. Pipeline пишет результаты через df.write.mode('append'). Pipeline упал посередине и был перезапущен. Что произойдёт?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 2