Learning Platform
Глоссарий Troubleshooting
Урок 10.01 · 30 мин
Продвинутый
MicroBatchExecutionStreamExecutionTriggerIncrementalExecutionContinuous Processing

Движок micro-batch исполнения

Когда вы вызываете df.writeStream.start(), Spark не начинает немедленно перекачивать байты из источника в сток. Вместо этого он запускает отдельный поток, внутри которого крутится бесконечный цикл, и каждая итерация этого цикла — это обычный batch-запрос. Именно этот цикл и есть micro-batch execution engine. Понять его устройство — значит понять, почему Structured Streaming ведёт себя именно так, как он ведёт себя: задержки, гарантии exactly-once, поведение при падении, поведение при watermark.

StreamExecution: абстрактная основа

Весь streaming execution начинается с абстрактного класса StreamExecution (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala). До Spark 2.3 этот класс был конкретным и содержал всю логику. В 2.3 его разбили на иерархию:

StreamExecution (abstract)
├── MicroBatchExecution     -- ProcessingTime, Once, AvailableNow
└── ContinuousExecution     -- Trigger.Continuous (experimental)

StreamExecution содержит общую инфраструктуру:

  • offsetLog: OffsetSeqLog — write-ahead log для offsets (подробно в уроке 2)
  • commitLog: CommitLog — log успешно завершённых батчей
  • currentBatchId: Long — монотонно растущий идентификатор текущего батча
  • streamDeathCause: Throwable — причина гибели стрима при ошибке
  • queryExecutionThread — поток, в котором крутится execution loop
  • WatermarkTracker — трекер глобального watermark (обновляется по завершении батча)

Когда вы вызываете start(), StreamExecution создаёт queryExecutionThread (daemon thread) и запускает его. Поток вызывает runStream(), который устанавливает Spark-контекст, настраивает StreamMetrics, а затем делегирует конкретной реализации через абстрактный метод runActivatedStream().

MicroBatchExecution: конкретная реализация

MicroBatchExecution — это то, что действительно работает для подавляющего большинства production-запросов. Его можно охарактеризовать одной фразой: он превращает streaming-запрос в бесконечную серию обычных Spark SQL batch-запросов, каждый из которых обрабатывает инкремент данных с момента предыдущего батча.

Ключевые поля:

// Отслеживаем границы offset'ов
protected var committedOffsets = new StreamProgress
protected var availableOffsets = new StreamProgress

// Источники и их метаданные
private val sources: Seq[SparkDataStream]
private val uniqueSources: Map[SparkDataStream, ReadLimit]

// Состояние планировщика батчей
private var isCurrentBatchConstructed = false

// TriggerExecutor определяет ритм исполнения
private val triggerExecutor: TriggerExecutor
Иерархия StreamExecution

Абстрактный StreamExecution и две конкретные реализации. MicroBatchExecution используется для всех production trigger'ов, ContinuousExecution — экспериментальная.

StreamExecution (abstract)offsetLog, commitLog, currentBatchId, WatermarkTracker, queryExecutionThreadОбщая инфраструктура: WAL-логи, поток исполнения, метрики, lifecycle management
MicroBatchExecutionTriggerExecutor, committedOffsets, availableOffsets, runBatch()Обрабатывает данные батчами. Используется для ProcessingTimeTrigger, OnceТrigger, AvailableNowTrigger.
ContinuousExecutionEpochCoordinator, continuous RPC, low latencyЭкспериментальный режим: исполнители обрабатывают записи непрерывно, offset'ы фиксируются по эпохам.

TriggerExecutor: ритм исполнения

MicroBatchExecution создаёт TriggerExecutor на основе переданного пользователем Trigger. Это стратегия, определяющая, как часто запускать батч:

TriggerTriggerExecutorПоведение
ProcessingTime("10 seconds")ProcessingTimeExecutorБатч каждые 10 секунд (учитывает время самого батча)
OnceSingleBatchExecutorОдин батч, затем завершение
AvailableNowMultiBatchExecutorОбрабатывает весь накопленный backlog, затем завершение
Continuous("1 second")(уходит в ContinuousExecution)

Важная деталь ProcessingTimeExecutor: если батч занял 8 секунд, а интервал 10 секунд — следующий батч стартует через 2 секунды. Если батч занял 15 секунд — следующий стартует немедленно, без паузы. Это гарантирует, что при backlog’е Spark догоняет источник как можно быстрее.

Полный цикл micro-batch: от триггера до commit

Центральный метод — runActivatedStream(). Внутри него triggerExecutor.execute(batchRunner) вызывает batchRunner в бесконечном цикле. Каждая итерация — один микро-батч:

Цикл одного micro-batch

Восемь фаз, от startTrigger до commit. Метрики каждой фазы видны в StreamingQueryProgress.durationMs.

1. startTrigger()Сбрасывает метрики текущего батча, фиксирует start timestampНачало окна триггера. Метрики времени для каждой фазы собираются здесь.
2. constructNextBatch()Запрашивает offset'ы у источников, пишет в offsetLog (WAL)Ключевая фаза: WAL-запись происходит ДО исполнения батча. Это основа exactly-once.
3. getBatch()Для каждого источника вызывает source.getBatch(startOffset, endOffset)Источники возвращают DataFrame с данными в диапазоне offset'ов. Kafka возвращает план чтения по partition/offset.
4. logicalPlan transformПодставляет реальные планы источников вместо StreamingExecutionRelationЛогический план стрима содержит-заглушки StreamingExecutionRelation. Здесь они заменяются реальными данными.
5. queryPlanning (IncrementalExecution)Создаёт IncrementalExecution с batchId, watermark, outputModeIncrementalExecution — специализированный QueryExecution для streaming. Применяет state preparation rules.
6. addBatch()Материализует DataFrame, записывает в sink через SQLExecution.withNewExecutionIdСобственно исполнение Spark Job. Sink получает DataFrame и пишет данные.
7. watermark updateWatermarkTracker.updateWatermark(lastExecution)После успешного исполнения обновляется глобальный watermark для следующего батча.
8. commitLog.add(batchId)Записывает batchId в commitLog — батч считается завершённымТолько после этой записи батч считается committed. Если упадём после WAL, но до commit — батч будет replay'ен.

Эта последовательность — не случайная. Именно порядок шагов обеспечивает exactly-once гарантии. Разберём это детально.

constructNextBatch: WAL-запись как фундамент

Фаза constructNextBatch() — это самое важное место в cycle. Она делает следующее:

  1. Для каждого источника (SparkDataStream) вызывает getOffset() или latestOffset() — источник возвращает текущий «кончик» данных.
  2. Сравнивает с committedOffsets — есть ли что-то новое?
  3. Если новых данных нет и нет pending stateful операторов — батч не нужен, возвращает false.
  4. Если данные есть — записывает в offsetLog запись (batchId -> OffsetSeq).
// Псевдокод constructNextBatch
val newOffsets = sources.map(s => s -> s.getOffset()).toMap
if (newOffsets == committedOffsets && !stateOperatorsPending) {
  return false  // пропускаем батч
}
offsetLog.add(currentBatchId, OffsetSeq.fill(sources, newOffsets))
availableOffsets = newOffsets
true

Запись в offsetLog происходит до любого исполнения. Это Write-Ahead Log в чистом виде: если процесс упадёт после offsetLog.add() но до commitLog.add(), при перезапуске Spark увидит, что batch N есть в offsetLog, но нет в commitLog — и переиграет его заново.

WARNING

Именно порядок «сначала offsetLog, потом исполнение, потом commitLog» обеспечивает exactly-once при наличии replayable-источников (Kafka, Delta) и idempotent-синков (Delta, Iceberg, форматы с overwrite по ключу). Если синк не idempotent (например, plain Parquet append), дублирование возможно при повторном исполнении батча после сбоя.

runBatch: планирование каждого батча

После успешного constructNextBatch() вызывается runBatch(). Именно здесь streaming-запрос превращается в batch-запрос:

Шаг 1 — getBatch. Каждый источник превращает диапазон offset’ов в DataFrame. Kafka-источник, например, вернёт план типа KafkaScan(topic, partition=0, startOffset=100, endOffset=150). Это чисто логический план — никаких данных ещё не читается.

Шаг 2 — transform logical plan. Логический план стрима содержит заглушки StreamingExecutionRelation — по одной на каждый источник. Эти заглушки заменяются реальными логическими планами, полученными из источников на предыдущем шаге.

Шаг 3 — queryPlanning. Создаётся IncrementalExecution:

val newExecution = new IncrementalExecution(
  sparkSession,
  logicalPlan,          // transformed логический план
  outputMode,           // Append / Update / Complete
  checkpointLocation,   // директория для state
  queryId,
  runId,
  currentBatchId,
  prevOffsetSeqMetadata,
  offsetSeqMetadata,    // содержит batchWatermarkMs
  watermarkPropagator
)

IncrementalExecution — это обычный QueryExecution с дополнительными правилами планирования специально для streaming. О нём подробно в уроке 5.

Шаг 4 — addBatch. StreamExecution передаёт newExecution.executedPlan в sink через sink.addBatch(batchId, data). Здесь происходит реальное исполнение Spark Job с читкой данных и записью результата.

Что происходит с реальными данными

Любопытный факт: до шага addBatch никакие данные из источника физически не читаются. Все предыдущие шаги работают только с метаданными (offset’ами, планами). Это позволяет Spark построить полный физический план (включая оптимизации Catalyst) до того, как начнётся чтение.

В момент addBatch Spark исполняет DAG:

  1. Читает данные у источников (например, из Kafka партиций)
  2. Применяет трансформации (filter, join, aggregation)
  3. Записывает в sink

Все операции упакованы в один SQLExecution.withNewExecutionId — поэтому в Spark UI вы видите один Execution ID на батч, но внутри него может быть несколько Spark Job (например, при агрегации с shuffle).

Как streaming-запрос превращается в batch-запрос

Замена заглушек реальными планами и создание IncrementalExecution. Каждый батч — обычный Spark SQL запрос.

Logical plan стрима (статичный)StreamingExecutionRelation(kafkaSource) -> Filter -> AggregationЭтот план создаётся один раз при парсинге запроса. Источники — абстрактные заглушки.
getBatch: kafkaSource.getBatch(start=100, end=150)
Transformed logical plan (per-batch)KafkaScan(0..150) -> Filter -> AggregationЗаглушки заменяются реальными планами чтения. Теперь это обычный батч-запрос.
IncrementalExecution.executedPlan
Physical plan (per-batch)HashAggregateExec -> ShuffleExchangeExec -> FilterExec -> KafkaBatchScanCatalyst строит и оптимизирует физический план так же, как для обычного batch SQL.

Метрики в Spark UI и логах

После каждого батча MicroBatchExecution публикует StreamingQueryProgress. В Spark UI вкладка «Structured Streaming» показывает:

  • inputRowsPerSecond — скорость поступления данных из источников
  • processedRowsPerSecond — пропускная способность обработки
  • batchDuration — суммарное время батча
  • durationMs — карта по фазам: {"triggerExecution": 1200, "getBatch": 50, "queryPlanning": 80, "addBatch": 950, "walCommit": 15, "commitOffsets": 10}
# Получить последний StreamingQueryProgress программно
query = df.writeStream.start()
progress = query.lastProgress
print(progress["durationMs"])
# {"triggerExecution": 1247, "getBatch": 48, "queryPlanning": 76, "addBatch": 1089}

Если addBatch занимает 90%+ времени — узкое место в исполнении запроса (логика, join, shuffle). Если getBatch большой — медленный источник или большой backlog. Если walCommit вдруг вырос — I/O проблемы с checkpoint location (HDFS/S3 latency).

TIP

Распространённая проблема в production: processedRowsPerSecond намного ниже inputRowsPerSecond означает нарастающий backlog. Spark не теряет данные (Kafka хранит их), но задержка растёт. Решение: масштабировать executor’ы или уменьшить сложность запроса.

Micro-batch vs Continuous Processing

Spark поддерживает два режима обработки. Понять их различие важно для осознанного выбора:

Micro-batch (MicroBatchExecution):

  • Данные читаются пакетами с фиксированным ритмом
  • Каждый батч — отдельный Spark Job
  • Минимальная latency: ~100 мс (обычно 1–30 секунд в production)
  • Гарантия: exactly-once
  • Подходит для: агрегация, join, stateful-операторы, все стандартные случаи

Continuous Processing (ContinuousExecution):

  • Исполнители читают данные непрерывно, без пауз
  • Offset’ы фиксируются по эпохам через EpochCoordinator
  • Минимальная latency: ~1 мс
  • Гарантия: at-least-once (в Spark 4.0 всё ещё экспериментальный режим)
  • Подходит для: ультра-низкая latency, простые map/filter без state

Ключевое ограничение continuous processing: stateful-операторы (агрегации, join, transformWithState) не поддерживаются. Это фундаментальное ограничение архитектуры — state требует синхронизации между всеми задачами, что несовместимо с моделью непрерывного исполнения.

# Micro-batch (по умолчанию, рекомендован для production)
query = (df.writeStream
  .trigger(processingTime="10 seconds")
  .start())

# Continuous processing (experimental, только map/filter)
query = (df.writeStream
  .trigger(continuous="1 second")
  .start())

Восстановление после падения

Когда streaming-запрос падает и перезапускается, MicroBatchExecution.populateStartOffsets() восстанавливает состояние:

  1. Читает последний batchId из commitLog — это последний успешно завершённый батч.
  2. Читает тот же batchId из offsetLog — восстанавливает committedOffsets.
  3. Проверяет: есть ли в offsetLog запись для batchId + 1?
    • Если есть — батч batchId + 1 был начат (WAL записан), но не завершён. Переигрываем его с теми же offset’ами.
    • Если нет — запрашиваем новые offset’ы у источников и строим новый батч.

Именно здесь WAL-семантика играет свою роль: мы никогда не пропустим данные, потому что offset’ы записаны до исполнения.

Попробуй сам

Запустите streaming-запрос и понаблюдайте за метриками по фазам:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
import time

spark = SparkSession.builder \
    .appName("microbatch-internals") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/streaming-checkpoint") \
    .getOrCreate()

# Читаем из rate-источника (генерирует строки автоматически)
df = (spark.readStream
    .format("rate")
    .option("rowsPerSecond", 100)
    .load())

# Простая агрегация
query = (df.groupBy("value")
    .count()
    .writeStream
    .outputMode("update")
    .format("memory")
    .queryName("test_query")
    .trigger(processingTime="5 seconds")
    .start())

# Ждём несколько батчей
time.sleep(30)

# Смотрим метрики последнего батча
progress = query.lastProgress
print("Batch ID:", progress["batchId"])
print("Input rows/sec:", progress["inputRowsPerSecond"])
print("Processed rows/sec:", progress["processedRowsPerSecond"])
print("Duration breakdown:", progress["durationMs"])

query.stop()

Ожидаемый вывод:

Batch ID: 5
Input rows/sec: 100.0
Processed rows/sec: 2500.0
Duration breakdown: {"triggerExecution": 2041, "getBatch": 12, 
                      "queryPlanning": 45, "addBatch": 1950,
                      "walCommit": 18, "commitOffsets": 16}
Проверка знанийKnowledge check
MicroBatchExecution записывает offset в offsetLog ДО исполнения батча, а в commitLog — ПОСЛЕ. Запрос упал после записи в offsetLog, но до записи в commitLog. Что произойдёт при перезапуске, и гарантирует ли это exactly-once для Kafka-источника с Delta-синком?
ОтветAnswer
При перезапуске populateStartOffsets() обнаружит, что batchId N есть в offsetLog, но отсутствует в commitLog. Spark переиграет батч N с теми же offset'ами, что были записаны в offsetLog. Для Kafka-источника это safe: Kafka позволяет re-читать те же offset'ы. Delta как синк использует транзакционные записи с уникальным batchId — повторная запись одного и того же batchId будет обнаружена как дубликат и отвергнута (idempotent write). Таким образом, комбинация replayable-источника (Kafka) и idempotent-синка (Delta) обеспечивает exactly-once: данные будут прочитаны заново из Kafka, но записаны в Delta только один раз. Если синк не idempotent — гарантия деградирует до at-least-once.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. MicroBatchExecution создаёт IncrementalExecution на каждый батч. Какой именно параметр передаётся в IncrementalExecution для обеспечения правильной версии StateStore?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6