Learning Platform
Глоссарий Troubleshooting
Урок 10.08 · 12 мин
Средний
Best PracticesConfigurationLoggingError HandlingCapacity PlanningDisaster Recovery

Best practices: Spark в production

Обзор

Этот урок — сводка production-рекомендаций из всего модуля. Каждый раздел — checklist, который data-инженер должен проверить перед запуском Spark-приложения в production.

Configuration Management

spark-defaults.conf

Централизованная конфигурация кластера:

# spark-defaults.conf -- production baseline

# === Performance ===
spark.sql.adaptive.enabled                     true
spark.sql.adaptive.coalescePartitions.enabled   true
spark.sql.adaptive.skewJoin.enabled            true
spark.serializer                               org.apache.spark.serializer.KryoSerializer
spark.sql.shuffle.partitions                   200

# === Stability ===
spark.dynamicAllocation.enabled                true
spark.dynamicAllocation.minExecutors           2
spark.dynamicAllocation.maxExecutors           50
spark.task.maxFailures                         4
spark.speculation                              true

# === Security ===
spark.authenticate                             true
spark.ssl.enabled                              true
spark.io.encryption.enabled                    true
spark.acls.enable                              true

# === Monitoring ===
spark.eventLog.enabled                         true
spark.eventLog.dir                             hdfs:///spark-events
spark.metrics.conf                             /etc/spark/metrics.properties

Per-Job Overrides

# Overrides для конкретного job
spark = SparkSession.builder \
    .appName("heavy-join-job") \
    .config("spark.sql.shuffle.partitions", 400) \
    .config("spark.executor.memory", "16g") \
    .config("spark.executor.memoryOverhead", "4g") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m") \
    .getOrCreate()

Environment-Specific Configs

# Структура конфигурации
# configs/
# ├── spark-defaults-dev.conf       # local[4], 2g memory
# ├── spark-defaults-staging.conf   # small cluster, sample data
# └── spark-defaults-prod.conf      # full cluster, all configs

# spark-submit с environment-specific config
# spark-submit \
#   --properties-file configs/spark-defaults-prod.conf \
#   --conf spark.app.name="etl-{{ date }}" \
#   app.py
TIP

Приоритет конфигурации: spark-submit --conf > SparkSession.config() > spark-defaults.conf > built-in defaults. Для production: базовые настройки в spark-defaults.conf, job-specific overrides через --conf.

Logging

log4j2 Configuration

# log4j2.properties -- production logging

# Root logger
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = console

# Application logger -- INFO для вашего кода
logger.app.name = com.company.spark
logger.app.level = INFO
logger.app.appenderRef.file.ref = file

# Подавление шумных Spark-логов
logger.spark.name = org.apache.spark
logger.spark.level = WARN

logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = WARN

logger.jetty.name = org.eclipse.jetty
logger.jetty.level = ERROR

# Console appender
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}: %m%n

# File appender с ротацией
appender.file.type = RollingFile
appender.file.name = file
appender.file.fileName = /var/log/spark/${spark.app.name}.log
appender.file.filePattern = /var/log/spark/${spark.app.name}-%d{yyyy-MM-dd}.log.gz
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{ISO8601} %-5p [%t] %c{2}: %m%n
appender.file.policies.type = Policies
appender.file.policies.time.type = TimeBasedTriggeringPolicy
appender.file.policies.time.interval = 1

Structured Logging в коде

import logging
import json

logger = logging.getLogger("spark_etl")

def process_batch(df, batch_date):
    input_count = df.count()
    logger.info(json.dumps({
        "event": "batch_start",
        "date": batch_date,
        "input_rows": input_count,
    }))

    result = df.filter("amount > 0").dropDuplicates(["order_id"])
    output_count = result.count()
    dropped = input_count - output_count

    logger.info(json.dumps({
        "event": "batch_complete",
        "date": batch_date,
        "input_rows": input_count,
        "output_rows": output_count,
        "dropped_rows": dropped,
        "drop_rate": f"{dropped / input_count:.2%}" if input_count > 0 else "0%",
    }))

    return result

Error Handling

Retry Strategies

from tenacity import retry, stop_after_attempt, wait_exponential
from pyspark.sql import SparkSession

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=30, max=300),
)
def run_spark_job(date: str):
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.parquet(f"s3://data/events/{date}/")
    result = df.groupBy("event_type").count()
    result.write.mode("overwrite").parquet(f"s3://output/metrics/{date}/")

Dead-Letter Pattern

def process_with_dead_letter(df):
    """Невалидные строки записываются в dead-letter queue."""
    valid = df.filter(
        (df.order_id.isNotNull()) &
        (df.amount > 0) &
        (df.customer_id.isNotNull())
    )

    invalid = df.subtract(valid)

    # Записать невалидные строки для ручного разбора
    if invalid.count() > 0:
        invalid.write \
            .mode("append") \
            .parquet(f"s3://dead-letter/orders/{date}/")
        logger.warning(f"Dead-lettered {invalid.count()} rows")

    return valid

Graceful Degradation

def read_with_fallback(spark, primary_path, fallback_path):
    """Чтение с fallback на альтернативный источник."""
    try:
        df = spark.read.parquet(primary_path)
        if df.count() == 0:
            raise ValueError("Empty dataset")
        return df
    except Exception as e:
        logger.warning(f"Primary source failed: {e}. Using fallback.")
        return spark.read.parquet(fallback_path)

Мониторинг

Spark предоставляет встроенные инструменты мониторинга, которые мы подробно разбирали в M08 (Monitoring & Observability):

  • Spark UI (порт 4040) — stages, tasks, shuffle, storage
  • Event Log — история выполнения для Spark History Server
  • Metrics — Prometheus/Graphite export через spark.metrics.conf
  • Structured Streamingquery.lastProgress для streaming метрик
# metrics.properties -- Prometheus export
*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
*.sink.prometheusServlet.path=/metrics/prometheus
master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Capacity Planning

Cluster Sizing Formulas

Sizing Guidelines:

1. Data size → cluster memory
   Rule: cluster_memory >= 3x input_data_size (for joins/aggregations)
   Example: 100GB input → 300GB total executor memory

2. Parallelism → executor count
   Rule: num_executors = data_size_GB / partition_size_MB * 1000 / cores_per_executor
   Example: 100GB / 128MB * 1000 / 4 ≈ 200 executor cores → 50 executors (4 cores each)

3. Shuffle → network/disk
   Rule: shuffle_write ≈ 1-3x input for joins
   Ensure: local SSD for shuffle, sufficient network bandwidth

4. Growth projection
   Rule: plan for 2x current volume (6-12 month horizon)
   Use dynamic allocation to handle variance

Resource Quotas

# Kubernetes ResourceQuota для Spark namespace
apiVersion: v1
kind: ResourceQuota
metadata:
  name: spark-quota
  namespace: spark-jobs
spec:
  hard:
    requests.cpu: "200"
    requests.memory: 800Gi
    limits.cpu: "400"
    limits.memory: 1600Gi
    pods: "500"

Disaster Recovery

Checkpoint Recovery

# Streaming pipeline с checkpoint recovery
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "s3://checkpoints/pipeline-v2/") \
    .outputMode("append") \
    .start("s3://output/events/")

# При перезапуске Spark автоматически:
# 1. Читает offset log → определяет последний committed offset
# 2. Восстанавливает state store
# 3. Продолжает с точки останова

Data Replication

# S3 cross-region replication для DR
# aws s3api put-bucket-replication \
#   --bucket production-data \
#   --replication-configuration '{
#     "Role": "arn:aws:iam::role/replication",
#     "Rules": [{
#       "Status": "Enabled",
#       "Destination": {
#         "Bucket": "arn:aws:s3:::production-data-dr"
#       }
#     }]
#   }'

Backup Strategy

ДанныеСтратегияRPORTO
Delta tablesTime travel (30 days)0Минуты
Hive metastoreDaily backup24hЧасы
Spark configsGit versioned0Минуты
CheckpointsS3 cross-regionСекундыМинуты
Job artifactsCI/CD re-deploy0Минуты

Production Checklist

Сводная таблица рекомендаций из всего модуля:

КатегорияРекомендацияУрок
SecurityKerberos + TLS + ACLs enabled01
Securityspark.io.encryption.enabled для shuffle01
SecurityФиксированные порты для firewall01
ClusterKubernetes для новых cloud-проектов02
Clustercluster deploy-mode для production02
CloudMinimize vendor lock-in (standard API)03
CloudSpark Connect для remote development03
CostRight-size: cost per job, not per hour04
CostSpot instances для executor (not driver)04
CostDynamic allocation enabled04
CI/CDDependencies в Docker image, не —packages05
CI/CDlint -> test -> build -> deploy pipeline05
OrchestrationAirflow DAG: sensor -> transform -> validate -> notify06
OrchestrationSLA monitoring + retry с exponential backoff06
dbtIncremental materialization для больших таблиц07
dbtref() для dependency management07
Logginglog4j2: WARN для Spark, INFO для app code08
ErrorsDead-letter pattern для невалидных данных08
DRCheckpoint на надёжном storage (S3/HDFS)08
Проверка знанийKnowledge check
Какой приоритет имеют разные способы задания Spark-конфигурации? Что перезаписывает что?
ОтветAnswer
Приоритет от высшего к низшему: (1) spark-submit --conf (командная строка) -- наивысший приоритет; (2) SparkSession.builder.config() в коде; (3) spark-defaults.conf на кластере; (4) built-in defaults Spark. В production: базовые настройки (security, monitoring, AQE) в spark-defaults.conf, а job-specific overrides (memory, partitions) через --conf или SparkSession.config(). Это позволяет менять параметры без изменения кода.
Проверка знанийKnowledge check
Что такое dead-letter pattern и зачем он нужен в production Spark pipeline?
ОтветAnswer
Dead-letter pattern -- стратегия обработки невалидных данных без остановки pipeline. Вместо падения job при встрече невалидной строки (null order_id, отрицательный amount): (1) фильтруем данные на valid и invalid; (2) invalid строки записываем в отдельное хранилище (dead-letter queue/table); (3) valid строки продолжают обработку; (4) логируем количество dead-lettered строк. Это обеспечивает graceful degradation -- pipeline не падает из-за нескольких битых строк, но проблемы не теряются и доступны для ручного разбора.

Итоги модуля

Модуль Production Operations покрыл полный цикл от настройки безопасности до disaster recovery. Ключевые takeaways:

  1. Безопасность — четыре уровня (auth, encryption in-transit, at-rest, authorization)
  2. Cluster manager — Kubernetes для cloud, YARN для Hadoop, Standalone для dev
  3. Cloud — managed платформы снижают ops overhead, но следите за vendor lock-in
  4. Cost — считайте cost per job, используйте spot + dynamic allocation
  5. CI/CD — автоматизируйте всё от lint до deploy
  6. Orchestration — Airflow для scheduling, dbt для SQL-first transforms
  7. Reliability — logging, error handling, capacity planning, DR

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Зачем разделять Spark-конфигурацию на spark-defaults.conf (baseline) и per-job overrides?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8