Batch Pipeline Design Patterns
Pattern 1: Idempotent Pipeline
Проблема: Pipeline failed midway. Rerun processes some data twice.
Решение: Idempotent writes — повторный запуск даёт тот же результат:
# [NO] Non-idempotent (append)
df.write.mode("append").parquet("output/")
# Rerun → duplicates!
# [OK] Idempotent (overwrite partition)
df.write.mode("overwrite") \
.partitionBy("date") \
.parquet("output/")
# Rerun → same result (partition overwritten)
Partition Overwrite Strategy
Target: output/date=2026-03-23/
First run: write 1000 records → output/date=2026-03-23/part-00000.parquet
Failed run: write 500 records → partial data
Rerun: overwrite entire partition → 1000 records again [OK]
Key: granularity of overwrite = partition (date, hour)
Too coarse (monthly): reprocessing 30 days for 1 day fix
Too fine (hourly): 24 partitions per day, many small files
Sweet spot: daily partitions for most batch jobs
Partitioning стратегии в Spark
AQE: динамическое coalesce partitions
Pattern 2: Incremental Processing
Проблема: Reprocessing all data every day is expensive for 100TB+ datasets.
Решение: Process only new/changed data:
# Incremental pattern
last_processed = read_watermark("pipeline_state") # e.g., 2026-03-22
new_data = spark.read.parquet("bronze/") \
.filter(col("ingestion_date") > last_processed)
processed = transform(new_data)
processed.write.mode("append").parquet("silver/")
update_watermark("pipeline_state", current_date())
Full Refresh
Incremental
Pattern 3: Backfill
Проблема: Logic changed → need to reprocess historical data.
Backfill scenarios:
1. Bug fix: cleaning logic was wrong for 30 days
2. New column: add "customer_segment" to Silver
3. Schema change: split "name" into "first_name" + "last_name"
4. New source: integrate new data source retroactively
Backfill approach:
for date in date_range("2026-02-01", "2026-03-01"):
run_pipeline(date) # idempotent partition overwrite
# Each date overwrites its partition → clean state after backfill
Backfill Design Considerations
| Concern | Approach |
|---|---|
| Resource contention | Run backfill on separate cluster / off-peak hours |
| Data ordering | Process dates sequentially (dependencies) |
| Progress tracking | Log completed dates, resume from last |
| Validation | Compare row counts before/after backfill |
| Notification | Alert downstream consumers about reprocessing |
Pattern 4: SCD Type 2 (Slowly Changing Dimensions)
SCD Type 2: track history of dimension changes
dim_customers:
| customer_id | name | city | valid_from | valid_to | is_current |
|-------------|---------|-----------|------------|------------|------------|
| 1 | Alice | Moscow | 2024-01-01 | 2025-06-15 | false |
| 1 | Alice | SPb | 2025-06-15 | 9999-12-31 | true |
| 2 | Bob | Kazan | 2024-03-01 | 9999-12-31 | true |
When city changes:
1. Close current record (valid_to = today, is_current = false)
2. Insert new record (valid_from = today, is_current = true)
Query current: WHERE is_current = true
Query historical: WHERE valid_from <= @date AND valid_to > @date
Pattern 5: Data Quality Gates
# Quality gate between Bronze → Silver
bronze_df = spark.read.parquet("bronze/date=2026-03-23/")
# Quality checks
null_rate = bronze_df.filter(col("user_id").isNull()).count() / bronze_df.count()
duplicate_rate = 1 - bronze_df.dropDuplicates(["event_id"]).count() / bronze_df.count()
# Gate: fail if quality below threshold
assert null_rate < 0.01, f"Null rate {null_rate} exceeds 1% threshold"
assert duplicate_rate < 0.005, f"Duplicate rate {duplicate_rate} exceeds 0.5%"
# Only proceed if quality passed
silver_df = transform(bronze_df)
silver_df.write.mode("overwrite").parquet("silver/date=2026-03-23/")