Learning Platform
Глоссарий Troubleshooting
Урок 10.02 · 28 мин
Продвинутый
OffsetSeqLogCommitLogHDFSMetadataLogExactly-OnceCheckpointWAL

Offset log, commit log и WAL

Exactly-once — одно из самых ёмких и часто неправильно понимаемых понятий в стриминге. В Spark Structured Streaming оно достигается не магией, а конкретной механикой: двумя write-ahead log-файлами в checkpoint directory, replayable-источниками и idempotent-синками. Разберём эту механику до байтового уровня: как устроены файлы, что в них записывается, в каком порядке, и как система восстанавливается после каждого возможного сбоя.

Структура checkpoint directory

Когда вы задаёте checkpointLocation, Spark создаёт в ней конкретную структуру директорий:

/checkpoint-location/
├── metadata          -- метаданные запроса (queryId, Spark version)
├── offsets/          -- OffsetSeqLog: WAL для offset'ов
│   ├── 0             -- offset'ы батча 0
│   ├── 1             -- offset'ы батча 1
│   └── 2             -- offset'ы батча 2
├── commits/          -- CommitLog: лог завершённых батчей
│   ├── 0
│   └── 1             -- батч 2 ещё не завершён (пример)
└── state/            -- StateStore данные (урок 3, 4)
    └── 0/            -- оператор #0
        └── 0/        -- партиция 0
            ├── 1.delta
            └── 2.delta

Обратите внимание: в этом примере в offsets/ есть файл 2, а в commits/ — нет. Именно эта асимметрия сигнализирует системе: батч 2 был начат, но не завершён, и его нужно переиграть.

HDFSMetadataLog: базовый класс

И OffsetSeqLog, и CommitLog наследуются от HDFSMetadataLog[T] (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala). Это параметризованный класс, который реализует append-only log поверх Hadoop-совместимой файловой системы.

abstract class HDFSMetadataLog[T: ClassTag](sparkSession: SparkSession, path: String)
  extends MetadataLog[T] with Logging {

  // Записать новую запись в лог
  def add(batchId: Long, metadata: T): Boolean

  // Прочитать запись по batchId
  def get(batchId: Long): Option[T]

  // Прочитать диапазон [startId, endId]
  def get(startId: Option[Long], endId: Option[Long]): Array[(Long, T)]

  // Последний batchId в логе
  def getLatest(): Option[(Long, T)]

  // Очистить старые записи
  def purge(thresholdBatchId: Long): Unit
}

Каждая запись хранится как отдельный файл, имя которого совпадает с batchId (числом). Это простое, но эффективное решение: атомарность операции записи обеспечивается атомарностью HDFS rename (write to temp -> rename to final), листинг директории даёт все доступные batchId.

Формат файлов — UTF-8 JSON с заголовочной строкой:

v1
{"batchWatermarkMs": 0, "batchTimestampMs": 1716000000000, "conf": {...}}
{"kafka": {"topic": {"0": 100, "1": 50, "2": 75}}}

Первая строка — версия формата. Вторая — OffsetSeqMetadata (watermark, timestamp, конфиги). Третья — собственно offset’ы источников.

Структура checkpoint directory

Файлы offsets/ и commits/ — два WAL-лога. Асимметрия между ними определяет состояние батча при восстановлении.

offsets/0{"kafka": {"topic-A": {"0": 0, "1": 0}}}Первый батч: читаем от начала до offset 0 в каждой партиции. Записывается ДО исполнения.
offsets/1{"kafka": {"topic-A": {"0": 150, "1": 120}}}Второй батч: offset'ы продвинулись.
commits/0{"nextBatchWatermarkMs": 0}Батч 0 успешно завершён. Записывается ПОСЛЕ успешной записи в sink.
commits/1отсутствуетБатч 1 был начат (есть в offsets/), но не завершён. При перезапуске будет переиграт.

OffsetSeqLog: детали формата

OffsetSeqLog (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala) хранит объект OffsetSeq — обёртку над массивом offset’ов (по одному на каждый источник) плюс OffsetSeqMetadata.

case class OffsetSeq(
  offsets: Seq[Option[Offset]],    // offset для каждого источника (None если источник не изменился)
  metadata: Option[OffsetSeqMetadata]  // watermark и timestamp батча
)

case class OffsetSeqMetadata(
  batchWatermarkMs: Long,          // watermark в мс (используется для eviction state)
  batchTimestampMs: Long,          // время начала батча
  conf: Map[String, String]        // конфиги на момент батча (shuffle partitions, etc.)
)

OffsetSeqMetadata — важная структура: она хранит watermark на момент начала батча. Это не просто метаданные для логирования — watermark используется в IncrementalExecution для фильтрации поздних данных и eviction state. Подробно об этом в уроке 5.

Пример реального файла offsets/42 для Kafka-источника:

v1
{"batchWatermarkMs":1716003600000,"batchTimestampMs":1716003610000,"conf":{"spark.sql.shuffle.partitions":"200"}}
{"apache.kafka.v2":{"topic-orders":{"0":18432,"1":17891,"2":19204,"3":18755}}}

CommitLog: подтверждение завершения

CommitLog (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CommitLog.scala) значительно проще — он хранит только подтверждение того, что батч завершён, плюс watermark для следующего батча:

case class CommitMetadata(
  nextBatchWatermarkMs: Long  // watermark который будет использован в следующем батче
)

Файл commits/42:

v1
{"nextBatchWatermarkMs":1716003600000}

Это умышленная минималистичность: CommitLog — это именно «флаг завершения», не более. Данные о том, что было обработано, уже в OffsetSeqLog.

NOTE

В Spark 4.0 CommitLog также хранит nextBatchWatermarkMs — это watermark, вычисленный по результатам текущего батча, который будет применён к следующему. Это позволяет разделить «watermark для чтения событий» (batchWatermarkMs) и «watermark как результат наблюдений» (nextBatchWatermarkMs).

WAL-протокол: точная последовательность

Теперь посмотрим на точную последовательность операций, которая обеспечивает exactly-once:

Сбой между строками 4 и 11 (после WAL, до commit) приводит к replay батча N с теми же offset’ами.

Анализ всех сценариев сбоя

Рассмотрим все точки, в которых может произойти сбой, и что случится при каждом:

Сбой до offsetLog.add(): Никакого WAL-файла нет. При рестарте система не знает об этом батче. Запрашиваются текущие offset’ы у источников и строится новый батч. Никаких потерь — данные в Kafka не удалены.

Сбой после offsetLog.add(), до исполнения: В offsets/N есть запись, в commits/N — нет. При рестарте: populateStartOffsets() видит разницу, переигрывает батч N с теми же offset’ами. Kafka-источник перечитывает те же данные. Exactly-once при idempotent sink.

Сбой во время addBatch() (Spark Job упал на полпути): Spark Job провалился. Батч N не записан в commits/. При рестарте — тот же сценарий: replay батча N. Delta-синк отвергнет повторную запись с тем же batchId.

Сбой после addBatch(), до commitLog.add(): Самый опасный сценарий — данные уже в синке, но система не знает об этом. При рестарте: replay батча N. Delta-синк обнаружит batchId=N уже существует и применит idempotent semantics (no-op). Exactly-once сохраняется.

Сбой после commitLog.add(): Нормальное завершение. При рестарте: начинаем с батча N+1.

DANGER

Если вы используете синк, который не поддерживает idempotent writes (например, foreachBatch с обычным append в Parquet без overwrite-семантики), сбой между addBatch и commitLog.add() приведёт к дублированию данных. Это не баг Spark — это архитектурное ограничение, которое нужно учитывать при выборе синка.

Replayable sources и idempotent sinks

Exactly-once — это сквозное свойство, которое требует соответствия от всех компонентов:

Replayable sources — источники, которые позволяют читать данные заново по offset’у. Примеры: Apache Kafka, Amazon Kinesis (sequence number), Delta Lake (table version), Apache Iceberg (snapshot id), Google Pub/Sub (с ack). Пример нереplayable-источника: файлы в S3, которые были удалены после чтения.

Idempotent sinks — синки, которые безопасно обрабатывают повторную запись одного и того же батча. Примеры: Delta Lake (использует batchId для deduplication), Apache Iceberg, любой key-value store с overwrite по ключу. Пример неidempotent-синка: простой append в файловую систему без transactional semantics.

# Правильно: Delta как idempotent sink
(result.writeStream
    .format("delta")
    .option("checkpointLocation", "/checkpoint")
    .start("/data/output"))

# Небезопасно при сбоях: plain Parquet append
(result.writeStream
    .format("parquet")
    .option("checkpointLocation", "/checkpoint")
    .start("/data/output"))  # дубликаты возможны при replay

Purge: управление размером логов

По умолчанию OffsetSeqLog и CommitLog не удаляют старые записи автоматически. Это может привести к тому, что в offsets/ накопятся тысячи файлов (по одному на батч). Spark предоставляет конфиги для управления retention:

spark.conf.set("spark.sql.streaming.minBatchesToRetain", "2")
# Минимальное количество батчей, которые нужно хранить в логах.
# При значении 2: хранятся последние 2 бatcha, более старые удаляются.

Механика purge: в конце каждого батча MicroBatchExecution вызывает offsetLog.purge(currentBatchId - minBatchesToRetain) и commitLog.purge(...). Purge просто удаляет файлы с batchId меньше порогового значения.

WARNING

Уменьшение minBatchesToRetain ускоряет листинг директории (меньше файлов), но сокращает окно восстановления. При значении 1 вы можете восстановиться только с последнего завершённого батча. Если у вас долгие батчи (больше интервала), увеличьте значение.

Практика: инспекция checkpoint директории

Вы можете читать WAL-файлы напрямую — они обычный текст:

# Смотрим последние 3 offset-записи
import json

checkpoint_dir = "/checkpoint-location"
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
    spark._jsc.hadoopConfiguration())

for batch_id in range(max(0, last_batch - 2), last_batch + 1):
    path = spark._jvm.org.apache.hadoop.fs.Path(
        f"{checkpoint_dir}/offsets/{batch_id}")
    if fs.exists(path):
        stream = fs.open(path)
        reader = spark._jvm.java.io.BufferedReader(
            spark._jvm.java.io.InputStreamReader(stream))
        lines = []
        line = reader.readLine()
        while line:
            lines.append(line)
            line = reader.readLine()
        print(f"Batch {batch_id}: {lines}")

Или через shell:

# Посмотреть offsets батча 42
hdfs dfs -cat /checkpoint/offsets/42

# Посмотреть все commits
hdfs dfs -ls /checkpoint/commits/

# Найти «дыру» между offsets и commits
diff <(hdfs dfs -ls /checkpoint/offsets/ | awk '{print $NF}' | xargs -n1 basename | sort -n) \
     <(hdfs dfs -ls /checkpoint/commits/ | awk '{print $NF}' | xargs -n1 basename | sort -n)

Попробуй сам

Запустите запрос и исследуйте WAL-файлы в реальном времени:

from pyspark.sql import SparkSession
import os, json, time

spark = SparkSession.builder \
    .appName("wal-inspection") \
    .getOrCreate()

CHECKPOINT = "/tmp/wal-demo-checkpoint"

df = (spark.readStream
    .format("rate")
    .option("rowsPerSecond", 50)
    .load())

query = (df.writeStream
    .format("memory")
    .queryName("wal_demo")
    .option("checkpointLocation", CHECKPOINT)
    .trigger(processingTime="3 seconds")
    .start())

# Ждём несколько батчей
time.sleep(15)
query.stop()

# Исследуем WAL-файлы
print("=== OFFSET LOG ===")
for fname in sorted(os.listdir(f"{CHECKPOINT}/offsets")):
    with open(f"{CHECKPOINT}/offsets/{fname}") as f:
        content = f.readlines()
    batch_id = int(fname)
    print(f"Batch {batch_id}:")
    for line in content:
        print(f"  {line.rstrip()}")

print("\n=== COMMIT LOG ===")
for fname in sorted(os.listdir(f"{CHECKPOINT}/commits")):
    with open(f"{CHECKPOINT}/commits/{fname}") as f:
        content = f.read()
    print(f"Batch {fname}: {content.strip()}")

Ожидаемый вывод:

=== OFFSET LOG ===
Batch 0:
  v1
  {"batchWatermarkMs":0,"batchTimestampMs":1716000000000,"conf":{}}
  {"rate":{"numRows":{"0":150}}}
Batch 1:
  v1
  {"batchWatermarkMs":0,"batchTimestampMs":1716000003000,"conf":{}}
  {"rate":{"numRows":{"0":300}}}

=== COMMIT LOG ===
Batch 0: v1
{"nextBatchWatermarkMs":0}
Batch 1: v1
{"nextBatchWatermarkMs":0}
Проверка знанийKnowledge check
В checkpoint директории файл offsets/99 существует, а commits/99 — нет. Также существуют commits/97 и commits/98. Что это означает, и как поведёт себя система при следующем старте? Есть ли риск потери данных при Kafka-источнике и Delta-синке?
ОтветAnswer
Ситуация однозначна: батч 99 был начат (offset'ы записаны в WAL), но не завершён — либо Spark Job упал, либо процесс был убит между addBatch и commitLog.add(). При следующем старте populateStartOffsets() обнаружит: последний commit = 98, в offsetLog есть запись 99. Это означает «незавершённый батч». Система переиграет батч 99 с теми же offset'ами из offsets/99. Для Kafka-источника это безопасно — Kafka позволяет перечитать те же данные. Для Delta-синка: если предыдущая попытка записи батча 99 частично завершилась и Delta-транзакция была закоммичена, повторная запись с тем же batchId будет идентифицирована как дубликат и проигнорирована. Если транзакция не была завершена — данные запишутся в Delta как обычно. В обоих случаях риска потери или дублирования данных нет. Комбинация replayable-источника (Kafka) и idempotent-синка (Delta) гарантирует exactly-once.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Базовый класс OffsetSeqLog и CommitLog — это HDFSMetadataLog. Каждая запись хранится как отдельный файл, имя которого = batchId. Какой механизм файловой системы обеспечивает атомарность записи?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6