Learning Platform
Глоссарий Troubleshooting
Урок 04.06 · 12 мин
Средний
cachepersistStorageLevelunpersistMEMORY_AND_DISK

Кэширование и persistence

Spark использует lazy evaluation — трансформации не выполняются до вызова action (show, count, write). Это означает, что если один и тот же DataFrame используется дважды, Spark пересчитает его заново для каждого action. cache() и persist() позволяют сохранить результат в памяти или на диске и использовать повторно.

cache() vs persist()

# cache() -- shortcut для persist(StorageLevel.MEMORY_AND_DISK)
df_cached = df.cache()

# persist() -- с выбором уровня хранения
from pyspark import StorageLevel
df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK)

cache() — это просто persist(StorageLevel.MEMORY_AND_DISK). Разницы нет, кроме того, что persist() позволяет выбрать другой StorageLevel.

StorageLevel: варианты хранения

StorageLevelПамятьДискСериализацияРеплик
MEMORY_ONLYДаНетНет1
MEMORY_AND_DISKДаДа (spillover)Нет1
DISK_ONLYНетДаДа1
MEMORY_ONLY_SERДаНетДа1
MEMORY_AND_DISK_SERДаДаДа1
OFF_HEAPOff-heapНетДа1
MEMORY_ONLY_2ДаНетНет2
MEMORY_AND_DISK_2ДаДаНет2

Что означают варианты

Десериализованный (без _SER): данные хранятся как Java-объекты в heap. Быстрый доступ, но занимает 2-5x больше памяти, чем на диске.

Сериализованный (_SER): данные хранятся в сжатом бинарном формате. Экономит 60-80% памяти, но требует десериализации при чтении — дополнительный CPU.

С репликами (_2): данные дублируются на 2 executors. Если один executor падает, данные доступны на другом. Полезно для критичных вычислений, но удваивает расход памяти.

OFF_HEAP: хранение вне JVM heap (в Tungsten managed memory). Без GC давления, но требует настройки spark.memory.offHeap.enabled=true.

# Для большинства случаев -- MEMORY_AND_DISK (default cache)
df.cache()

# Для больших DataFrame, когда не хватает памяти -- сериализованный
df.persist(StorageLevel.MEMORY_AND_DISK_SER)

# Для очень больших данных, когда память не нужна -- только диск
df.persist(StorageLevel.DISK_ONLY)
Spark4.0

Spark 4.0: улучшенный MEMORY_AND_DISK. В Spark 4.0 spill-to-disk для MEMORY_AND_DISK стал более эффективным — Spark использует columnar storage format при спилле, что сокращает объём данных на диске до 3x по сравнению со старым row-based spillом.

Когда кэшировать

1. Итеративные алгоритмы

# ML: K-means использует одни данные 100 раз
training_data = spark.read.parquet("/data/features/") \
    .filter(col("valid") == True) \
    .select("feature_1", "feature_2", "label")

training_data.cache()  # Вычисляем один раз

for i in range(100):
    model = update_model(training_data, model)
    # Без cache: 100 чтений Parquet + 100 фильтраций
    # С cache: 1 чтение + 99 обращений к памяти

2. DataFrame используется несколько раз

# Промежуточный результат используется дважды
filtered = orders.join(customers, "id").filter(col("amount") > 1000)
filtered.cache()

# Использование 1: подсчёт
total = filtered.count()

# Использование 2: запись
filtered.write.parquet("/output/high_value_orders/")

# Без cache: join + filter выполняется ДВАЖДЫ
# С cache: join + filter выполняется ОДИН раз

3. После дорогой трансформации перед несколькими actions

# Дорогой ETL pipeline
result = (raw_data
    .join(broadcast(dimensions), "key")    # join
    .groupBy("category").agg(...)          # shuffle + aggregate
    .withColumn("rank", row_number().over(window))  # window function
)
result.cache()

# Несколько выходов из одного pipeline
result.filter(col("rank") <= 10).write.parquet("/top_10/")
result.write.parquet("/full_result/")
result.describe().show()

Когда НЕ кэшировать

Анти-паттерн: кэширование всего

# ПЛОХО: кэшируем всё подряд
df1 = spark.read.parquet("/data/table1/").cache()
df2 = spark.read.parquet("/data/table2/").cache()
df3 = spark.read.parquet("/data/table3/").cache()

result = df1.join(df2, "key").join(df3, "key")
result.cache()
result.show()

Проблемы:

  • 4 DataFrame в кэше = вся память executor занята кэшем
  • Spill-to-disk для данных, которые не поместились — медленнее, чем пересчёт
  • GC давление: десериализованные объекты в heap вызывают Long GC pauses

1. Одноразовое чтение

# НЕ НУЖЕН CACHE: данные читаются один раз
df = spark.read.parquet("/data/events/")
df.write.parquet("/output/processed/")
# Cache только замедлит -- дополнительная запись в memory/disk

2. Большой DataFrame, который не помещается в память

# ПЛОХО: 500 ГБ DataFrame с 64 ГБ суммарной памяти кластера
huge_df = spark.read.parquet("/data/500gb/")
huge_df.cache()  # Большая часть spill на диск -- хуже, чем пересчёт

3. DataFrame, который дёшево пересчитать

# НЕ НУЖЕН CACHE: простой filter от Parquet с partition pruning
df = spark.read.parquet("/data/events/") \
    .filter(col("year") == 2024)
# Parquet partition pruning + predicate pushdown = быстро
# Cache добавляет overhead без выигрыша
TIP

Правило: кэшируйте, если стоимость пересчёта > стоимость хранения. Сложная трансформация (join + groupBy + window) используется 5 раз — кэшируйте. Простой filter от Parquet — не кэшируйте. Если кэш не помещается в память и большая часть spillится на диск — не кэшируйте.

unpersist() и управление кэшем

Кэшированные данные остаются в памяти до конца SparkSession или явного вызова unpersist():

# Кэшируем
training_data.cache()
training_data.count()  # Материализация кэша (lazy!)

# ... используем training_data ...

# Освобождаем память ЯВНО
training_data.unpersist()

Lazy evaluation и cache

Важно: cache() — это lazy трансформация. Она не вычисляет DataFrame немедленно:

df.cache()  # Ничего не произошло! Cache только запланирован.
df.count()  # СЕЙЧАС данные вычислены и кэшированы.
df.show()   # Читает из кэша.

Первый action после cache() вычисляет данные и кэширует их. Последующие actions читают из кэша.

Мониторинг кэша через Spark UI

Storage tab в Spark UI показывает все кэшированные DataFrame:

Storage Tab:
RDD Name          | Storage Level    | Cached Partitions | Size in Memory | Size on Disk
-----------------|-----------------|-------------------|----------------|-------------
In-memory table  | MEMORY_AND_DISK | 200/200           | 4.2 ГБ         | 0 B
In-memory table  | MEMORY_AND_DISK | 200/200           | 1.8 ГБ         | 2.3 ГБ ← spill!

Если Size on Disk > 0 при StorageLevel MEMORY_AND_DISK — часть данных spillилась на диск из-за нехватки памяти. Это сигнал: кэш слишком большой для доступной памяти.

DAG с кэшем и без

Without cache: два action вычисляют весь pipeline заново.

Action 1 (count):
FileScan → Filter → Join → GroupBy → Count

Action 2 (write):
FileScan → Filter → Join → GroupBy → Write

                           Всё пересчитано!

With cache: второй action читает из кэша.

Action 1 (count):
FileScan → Filter → Join → GroupBy → [CACHE] → Count

Action 2 (write):
[CACHE] → Write

  Из памяти/диска
TIP

StorageLevel.MEMORY_AND_DISK_SER для production. В production рекомендуется MEMORY_AND_DISK_SER вместо default cache (MEMORY_AND_DISK). Сериализация сокращает расход памяти на 60-80%, и данные, не поместившиеся в память, хранятся на диске в компактном формате. Десериализация добавляет ~10-15% CPU overhead, но это меньше, чем пересчёт тяжёлой трансформации.

Проверка знанийKnowledge check
В чём разница между MEMORY_ONLY и MEMORY_AND_DISK? Когда каждый уровень предпочтителен?
ОтветAnswer
MEMORY_ONLY хранит данные только в памяти. Если партиция не помещается -- она не кэшируется и пересчитывается при каждом action. MEMORY_AND_DISK хранит в памяти, а партиции, не поместившиеся, spillит на диск. MEMORY_ONLY предпочтителен, когда пересчёт дешевле дискового I/O (простые трансформации). MEMORY_AND_DISK -- когда пересчёт дорогой (join + groupBy + window) и потеря партиции из кэша означает полный пересчёт pipeline.
Проверка знанийKnowledge check
Почему cache() не вычисляет DataFrame немедленно? Как убедиться, что данные реально в кэше?
ОтветAnswer
cache() -- это lazy трансформация, как filter() или select(). Она добавляет инструкцию 'сохрани результат в кэш' в DAG, но не запускает вычисление. Данные кэшируются только при первом action (count, show, write). Чтобы гарантировать кэширование: вызовите df.cache() и затем df.count() -- count вычислит все партиции и кэширует. Проверить можно в Spark UI на вкладке Storage: кэшированный DataFrame появится в списке с числом Cached Partitions.

Как Spark делит память между execution и storage — разбор на уровне исходников в senior-курсе:

Spark Internals: Unified Memory Manager

Что дальше?

Модуль M04 завершён! Вы освоили ключевые техники оптимизации: shuffle минимизация, партиционирование, bucketing, broadcast join, data skew handling и кэширование. В следующем модуле (M05) мы погрузимся в Adaptive Query Execution (AQE) — механизм runtime-оптимизации, который автоматически применяет многие из этих техник.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём разница между cache() и persist()?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6