Best practices: Spark в production
Обзор
Этот урок — сводка production-рекомендаций из всего модуля. Каждый раздел — checklist, который data-инженер должен проверить перед запуском Spark-приложения в production.
Configuration Management
spark-defaults.conf
Централизованная конфигурация кластера:
# spark-defaults.conf -- production baseline
# === Performance ===
spark.sql.adaptive.enabled true
spark.sql.adaptive.coalescePartitions.enabled true
spark.sql.adaptive.skewJoin.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.shuffle.partitions 200
# === Stability ===
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors 50
spark.task.maxFailures 4
spark.speculation true
# === Security ===
spark.authenticate true
spark.ssl.enabled true
spark.io.encryption.enabled true
spark.acls.enable true
# === Monitoring ===
spark.eventLog.enabled true
spark.eventLog.dir hdfs:///spark-events
spark.metrics.conf /etc/spark/metrics.properties
Per-Job Overrides
# Overrides для конкретного job
spark = SparkSession.builder \
.appName("heavy-join-job") \
.config("spark.sql.shuffle.partitions", 400) \
.config("spark.executor.memory", "16g") \
.config("spark.executor.memoryOverhead", "4g") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \
.getOrCreate()
Environment-Specific Configs
# Структура конфигурации
# configs/
# ├── spark-defaults-dev.conf # local[4], 2g memory
# ├── spark-defaults-staging.conf # small cluster, sample data
# └── spark-defaults-prod.conf # full cluster, all configs
# spark-submit с environment-specific config
# spark-submit \
# --properties-file configs/spark-defaults-prod.conf \
# --conf spark.app.name="etl-{{ date }}" \
# app.py
Приоритет конфигурации: spark-submit --conf > SparkSession.config() > spark-defaults.conf > built-in defaults. Для production: базовые настройки в spark-defaults.conf, job-specific overrides через --conf.
Logging
log4j2 Configuration
# log4j2.properties -- production logging
# Root logger
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = console
# Application logger -- INFO для вашего кода
logger.app.name = com.company.spark
logger.app.level = INFO
logger.app.appenderRef.file.ref = file
# Подавление шумных Spark-логов
logger.spark.name = org.apache.spark
logger.spark.level = WARN
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = WARN
logger.jetty.name = org.eclipse.jetty
logger.jetty.level = ERROR
# Console appender
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}: %m%n
# File appender с ротацией
appender.file.type = RollingFile
appender.file.name = file
appender.file.fileName = /var/log/spark/${spark.app.name}.log
appender.file.filePattern = /var/log/spark/${spark.app.name}-%d{yyyy-MM-dd}.log.gz
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{ISO8601} %-5p [%t] %c{2}: %m%n
appender.file.policies.type = Policies
appender.file.policies.time.type = TimeBasedTriggeringPolicy
appender.file.policies.time.interval = 1
Structured Logging в коде
import logging
import json
logger = logging.getLogger("spark_etl")
def process_batch(df, batch_date):
input_count = df.count()
logger.info(json.dumps({
"event": "batch_start",
"date": batch_date,
"input_rows": input_count,
}))
result = df.filter("amount > 0").dropDuplicates(["order_id"])
output_count = result.count()
dropped = input_count - output_count
logger.info(json.dumps({
"event": "batch_complete",
"date": batch_date,
"input_rows": input_count,
"output_rows": output_count,
"dropped_rows": dropped,
"drop_rate": f"{dropped / input_count:.2%}" if input_count > 0 else "0%",
}))
return result
Error Handling
Retry Strategies
from tenacity import retry, stop_after_attempt, wait_exponential
from pyspark.sql import SparkSession
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=30, max=300),
)
def run_spark_job(date: str):
spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet(f"s3://data/events/{date}/")
result = df.groupBy("event_type").count()
result.write.mode("overwrite").parquet(f"s3://output/metrics/{date}/")
Dead-Letter Pattern
def process_with_dead_letter(df):
"""Невалидные строки записываются в dead-letter queue."""
valid = df.filter(
(df.order_id.isNotNull()) &
(df.amount > 0) &
(df.customer_id.isNotNull())
)
invalid = df.subtract(valid)
# Записать невалидные строки для ручного разбора
if invalid.count() > 0:
invalid.write \
.mode("append") \
.parquet(f"s3://dead-letter/orders/{date}/")
logger.warning(f"Dead-lettered {invalid.count()} rows")
return valid
Graceful Degradation
def read_with_fallback(spark, primary_path, fallback_path):
"""Чтение с fallback на альтернативный источник."""
try:
df = spark.read.parquet(primary_path)
if df.count() == 0:
raise ValueError("Empty dataset")
return df
except Exception as e:
logger.warning(f"Primary source failed: {e}. Using fallback.")
return spark.read.parquet(fallback_path)
Мониторинг
Spark предоставляет встроенные инструменты мониторинга, которые мы подробно разбирали в M08 (Monitoring & Observability):
- Spark UI (порт 4040) — stages, tasks, shuffle, storage
- Event Log — история выполнения для Spark History Server
- Metrics — Prometheus/Graphite export через
spark.metrics.conf - Structured Streaming —
query.lastProgressдля streaming метрик
# metrics.properties -- Prometheus export
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
Capacity Planning
Cluster Sizing Formulas
Sizing Guidelines:
1. Data size → cluster memory
Rule: cluster_memory >= 3x input_data_size (for joins/aggregations)
Example: 100GB input → 300GB total executor memory
2. Parallelism → executor count
Rule: num_executors = data_size_GB / partition_size_MB * 1000 / cores_per_executor
Example: 100GB / 128MB * 1000 / 4 ≈ 200 executor cores → 50 executors (4 cores each)
3. Shuffle → network/disk
Rule: shuffle_write ≈ 1-3x input for joins
Ensure: local SSD for shuffle, sufficient network bandwidth
4. Growth projection
Rule: plan for 2x current volume (6-12 month horizon)
Use dynamic allocation to handle variance
Resource Quotas
# Kubernetes ResourceQuota для Spark namespace
apiVersion: v1
kind: ResourceQuota
metadata:
name: spark-quota
namespace: spark-jobs
spec:
hard:
requests.cpu: "200"
requests.memory: 800Gi
limits.cpu: "400"
limits.memory: 1600Gi
pods: "500"
Disaster Recovery
Checkpoint Recovery
# Streaming pipeline с checkpoint recovery
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", "s3://checkpoints/pipeline-v2/") \
.outputMode("append") \
.start("s3://output/events/")
# При перезапуске Spark автоматически:
# 1. Читает offset log → определяет последний committed offset
# 2. Восстанавливает state store
# 3. Продолжает с точки останова
Data Replication
# S3 cross-region replication для DR
# aws s3api put-bucket-replication \
# --bucket production-data \
# --replication-configuration '{
# "Role": "arn:aws:iam::role/replication",
# "Rules": [{
# "Status": "Enabled",
# "Destination": {
# "Bucket": "arn:aws:s3:::production-data-dr"
# }
# }]
# }'
Backup Strategy
| Данные | Стратегия | RPO | RTO |
|---|---|---|---|
| Delta tables | Time travel (30 days) | 0 | Минуты |
| Hive metastore | Daily backup | 24h | Часы |
| Spark configs | Git versioned | 0 | Минуты |
| Checkpoints | S3 cross-region | Секунды | Минуты |
| Job artifacts | CI/CD re-deploy | 0 | Минуты |
Production Checklist
Сводная таблица рекомендаций из всего модуля:
| Категория | Рекомендация | Урок |
|---|---|---|
| Security | Kerberos + TLS + ACLs enabled | 01 |
| Security | spark.io.encryption.enabled для shuffle | 01 |
| Security | Фиксированные порты для firewall | 01 |
| Cluster | Kubernetes для новых cloud-проектов | 02 |
| Cluster | cluster deploy-mode для production | 02 |
| Cloud | Minimize vendor lock-in (standard API) | 03 |
| Cloud | Spark Connect для remote development | 03 |
| Cost | Right-size: cost per job, not per hour | 04 |
| Cost | Spot instances для executor (not driver) | 04 |
| Cost | Dynamic allocation enabled | 04 |
| CI/CD | Dependencies в Docker image, не —packages | 05 |
| CI/CD | lint -> test -> build -> deploy pipeline | 05 |
| Orchestration | Airflow DAG: sensor -> transform -> validate -> notify | 06 |
| Orchestration | SLA monitoring + retry с exponential backoff | 06 |
| dbt | Incremental materialization для больших таблиц | 07 |
| dbt | ref() для dependency management | 07 |
| Logging | log4j2: WARN для Spark, INFO для app code | 08 |
| Errors | Dead-letter pattern для невалидных данных | 08 |
| DR | Checkpoint на надёжном storage (S3/HDFS) | 08 |
Итоги модуля
Модуль Production Operations покрыл полный цикл от настройки безопасности до disaster recovery. Ключевые takeaways:
- Безопасность — четыре уровня (auth, encryption in-transit, at-rest, authorization)
- Cluster manager — Kubernetes для cloud, YARN для Hadoop, Standalone для dev
- Cloud — managed платформы снижают ops overhead, но следите за vendor lock-in
- Cost — считайте cost per job, используйте spot + dynamic allocation
- CI/CD — автоматизируйте всё от lint до deploy
- Orchestration — Airflow для scheduling, dbt для SQL-first transforms
- Reliability — logging, error handling, capacity planning, DR