Learning Platform
Глоссарий Troubleshooting
Урок 10.05 · 30 мин
Продвинутый
IncrementalExecutionStateStoreSaveExecStateStoreRestoreExecWatermarkOffsetSeqMetadataStateful Operators

IncrementalExecution и stateful-операторы

Когда MicroBatchExecution строит план для каждого батча, он не использует стандартный QueryExecution. Вместо него создаётся IncrementalExecution — специализированный планировщик, который знает о батч-идентификаторах, версиях state store, watermark’ах и output mode. Именно IncrementalExecution превращает абстрактные streaming-операторы (агрегация, деduplication, join) в конкретные физические узлы с ссылками на StateStore. Без понимания этого класса невозможно читать explain() streaming-запросов и диагностировать проблемы с state.

IncrementalExecution: что это и зачем

IncrementalExecution (sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala) — это QueryExecution с переопределённым preparations и optimizedPlan.

class IncrementalExecution(
    sparkSession: SparkSession,
    logicalPlan: LogicalPlan,
    outputMode: OutputMode,           // Append, Update, Complete
    checkpointLocation: String,
    queryId: UUID,
    runId: UUID,
    currentBatchId: Long,             // batchId текущего батча
    prevOffsetSeqMetadata: Option[OffsetSeqMetadata],
    offsetSeqMetadata: OffsetSeqMetadata,  // содержит watermark
    watermarkPropagator: WatermarkPropagator
) extends QueryExecution(sparkSession, logicalPlan)

Ключевые поля, которые отличают IncrementalExecution от обычного QueryExecution:

  • currentBatchId — идентификатор текущего батча, передаётся в StateStoreId как версия
  • offsetSeqMetadata.batchWatermarkMs — watermark для фильтрации поздних данных (eviction)
  • prevOffsetSeqMetadata — watermark предыдущего батча (для фильтрации событий, которые не должны попасть в текущий батч)
  • outputMode — определяет поведение StateStoreSaveExec (агрессивный eviction при Append)

State preparation rule: заполнение метаданных

IncrementalExecution переопределяет preparations — набор правил, применяемых к физическому плану перед исполнением. Ключевое правило — state:

// Упрощённо: state preparation rule
val state = new Rule[SparkPlan] {
  var operatorId = 0

  def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
    case StateStoreSaveExec(keys, None, None, None, storeConf, child) =>
      operatorId += 1
      StateStoreSaveExec(
        keys,
        Some(currentBatchId),
        Some(outputMode),
        Some(offsetSeqMetadata),  // передаём watermark
        StateStoreId(checkpointLocation, operatorId, partitionId),
        storeConf,
        child
      )
    case StateStoreRestoreExec(keys, None, storeConf, child) =>
      StateStoreRestoreExec(
        keys,
        Some(currentBatchId),
        StateStoreId(checkpointLocation, operatorId, partitionId),
        storeConf,
        child
      )
    // аналогично для FlatMapGroupsWithStateExec, StreamingDeduplicateExec, etc.
  }
}

Эта rule пробегает физический план снизу вверх и заполняет все stateful-операторы ссылками на конкретный StateStore (через StateStoreId) и текущими метаданными батча.

Дерево stateful-операторов

После применения state preparation rule физический план streaming-запроса с агрегацией выглядит так:

// df.groupBy("key").count().writeStream.outputMode("update")...
explain() вывод для одного батча:

== Physical Plan ==
WriteToDataSourceV2 [...]
+- StateStoreSaveExec [key#10], stateInfo = StatefulOperatorStateInfo(
     checkpointLocation = /checkpoint/state,
     queryRunId = ...,
     operatorId = 0,
     storeVersion = 42,
     numPartitions = 200)
   outputMode = Update, watermark = EventTimeWatermark(0)
   +- HashAggregateExec(keys=[key#10], functions=[count(1)])
      +- StateStoreRestoreExec [key#10], stateInfo = StatefulOperatorStateInfo(...)
         +- ShuffleExchangeExec(HashPartitioning([key#10], 200))
            +- KafkaBatchScan [...]

Обратите внимание на пару StateStoreRestoreExec / StateStoreSaveExec: они оборачивают агрегацию. Restore читает предыдущее состояние перед агрегацией, Save записывает обновлённое состояние после.

Физический план с StateStore

Пара Restore/Save оборачивает агрегацию. Restore подаёт предыдущий state на вход агрегации, Save сохраняет результат.

KafkaBatchScanЧитает данные батча N: (key=A, value=5), (key=B, value=3)Источник данных для этого батча. Данные для диапазона offset'ов, записанных в offsetLog.
shuffle by key
ShuffleExchangeExecДанные партиционированы по ключуВсе записи с одним ключом идут в одну партицию, где живёт state этого ключа.
StateStoreRestoreExecget(key=A) -> {count: 100}Для каждой входной записи читает предыдущее состояние из StateStore. Результат передаётся в агрегацию как начальное значение.
(key, prev_state, new_row)
HashAggregateExecmerge: count(A) = 100 + 1 = 101Агрегация работает поверх предыдущего state + новых данных. Результат — обновлённые агрегаты.
StateStoreSaveExecput(key=A, {count: 101}), mode=UpdateСохраняет обновлённые агрегаты обратно в StateStore. OutputMode определяет, какие ключи испускаются в sink.

StateStoreRestoreExec: детали

StateStoreRestoreExec — унарный физический оператор. Для каждой входной строки он:

  1. Достаёт ключ (ключевые колонки из keys выражения)
  2. Делает store.get(key) — читает предыдущее состояние
  3. Если ключ есть в store — испускает две строки: старое состояние и новую входную строку
  4. Если ключа нет — испускает только новую входную строку
// Псевдокод doExecute
override def doExecute(): RDD[InternalRow] = {
  val stateStore = getStateStore(stateInfo)

  child.execute().mapPartitionsWithStateStore(stateStore) { iter =>
    iter.flatMap { row =>
      val key = getKey(row)
      val savedState = stateStore.get(key)
      if (savedState != null) {
        Seq(savedState, row)  // предыдущий state + новая строка
      } else {
        Seq(row)              // только новая строка
      }
    }
  }
}

Почему две строки? HashAggregateExec знает, как мёрджить частичные агрегаты. Передавая старое состояние как «предыдущий частичный агрегат», Spark избегает необходимости специального streaming-aware агрегатора.

StateStoreSaveExec: output mode-зависимое поведение

StateStoreSaveExec — более сложный оператор. Его поведение кардинально меняется от outputMode:

Update mode:

Для каждой строки: store.put(key, newValue)
Испускать: все обновлённые строки (даже если значение не изменилось)
Eviction: строки с ключами старше watermark удаляются из state (store.remove(key))

Append mode:

Для каждой строки: store.put(key, newValue)
Испускать: только строки, ключи которых "закрыты" watermark (window.end < watermark)
Eviction: закрытые ключи удаляются из state после испускания

Complete mode:

Для каждой строки: store.put(key, newValue)
Испускать: ВСЕ строки из state (полный снимок)
Eviction: нет (state никогда не удаляется)
WARNING

Complete mode никогда не удаляет из state. Для unbounded streaming запросов без ограничений это означает неограниченный рост state. Complete mode подходит только для запросов с ограниченным набором ключей (например, агрегация по статическому enum-полю).

Watermark в физическом плане: два значения

В IncrementalExecution есть два разных watermark значения, что часто путает:

// eventTimeWatermarkForEviction: используется для удаления state
val eventTimeWatermarkForEviction: Option[Long] = 
  offsetSeqMetadata.batchWatermarkMs match {
    case 0L => None
    case ms => Some(ms)
  }

// eventTimeWatermarkForLateEvents: используется для фильтрации входных данных
val eventTimeWatermarkForLateEvents: Option[Long] =
  prevOffsetSeqMetadata.flatMap(_.batchWatermarkMs match {
    case 0L => None  
    case ms => Some(ms)
  })

Почему два значения? Watermark обновляется в конце батча, но применяется в следующем батче. Это означает:

  • Фильтрация поздних событий (input filtering) использует watermark предыдущего батча — чтобы батч N не отфильтровал события, которые сам же и использовал для продвижения watermark
  • Eviction state (удаление старых ключей) использует watermark текущего батча — актуальное значение для агрессивного освобождения памяти
Батч 40: max event time = 10:00:30
  -> watermark после батча = 10:00:30 - 10min = 09:50:30

Батч 41: 
  eventTimeWatermarkForLateEvents = 09:50:30 (из предыдущего батча) 
  eventTimeWatermarkForLateEvents применяется к входным данным
  
  (допустим, max event time = 10:00:45, watermark продвигается до 09:50:45)
  
  eventTimeWatermarkForEviction = 09:50:45 (из текущего батча)
  используется для eviction state в StateStoreSaveExec

OffsetSeqMetadata: мост между батчами

OffsetSeqMetadata — это структура, которая сохраняется в offsetLog вместе с offset’ами и восстанавливается при рестарте:

case class OffsetSeqMetadata(
  batchWatermarkMs: Long,    // watermark на момент начала батча
  batchTimestampMs: Long,    // системное время начала батча
  conf: Map[String, String]  // spark.sql.shuffle.partitions и другие конфиги
)

Поле conf особенно важно: если при рестарте spark.sql.shuffle.partitions изменился — Spark может обнаружить несоответствие и либо предупредить, либо отказаться продолжать (в зависимости от конфига). Это гарантирует, что state, созданный с 200 партициями, не будет прочитан при 400 партициях (что привело бы к неправильной маршрутизации данных).

Watermark flow между батчами

Watermark обновляется в конце батча N и применяется в батче N+1. Два разных значения используются для разных целей.

Батч N выполненmax event time = 10:05:00, watermark = 10:05:00 - 10min = 09:55:00WatermarkTracker обновляет глобальный watermark после успешного завершения батча N.
CommitLog.add(N, nextBatchWatermarkMs=09:55:00)Watermark сохраняется в commitLognextBatchWatermarkMs записывается в commitLog, чтобы пережить рестарт.
Батч N+1: IncrementalExecutionbatchWatermarkMs (current) = 09:55:00OffsetSeqMetadata батча N+1 содержит watermark, обновлённый по результатам батча N.
Eviction stateУдаляем ключи с window.end < 09:55:00StateStoreSaveExec использует currentWatermark для удаления закрытых окон.
Late data filterОтфильтровываем события с ts < prev watermarkВходные события старше предыдущего watermark отбрасываются до попадания в агрегацию.

Другие stateful-операторы

Помимо агрегации, IncrementalExecution конфигурирует и другие stateful-операторы:

StreamingDeduplicateExec — реализует dropDuplicates(). State хранит виденные ключи:

df.dropDuplicates(["user_id", "event_id"])
# StateStore: Set<(user_id, event_id)>
# При получении строки: если ключ в state -> отбрасываем, иначе -> испускаем + добавляем в state

StreamingSymmetricHashJoinExec — реализует stream-stream join. Хранит два StateStore: один для левого потока, один для правого:

stream1.join(stream2, "join_key", "inner")
# Left StateStore: буферизует строки из stream1, которые ещё не нашли пару
# Right StateStore: буферизует строки из stream2
# Watermark позволяет evict старые строки из обоих буферов

FlatMapGroupsWithStateExec / MapGroupsWithStateExec — старый arbitrary state API (deprecated в Spark 4.0 в пользу transformWithState).

Чтение explain() для streaming запросов

# Посмотреть физический план streaming-запроса
query = (df.groupBy("key").count()
    .writeStream
    .outputMode("update")
    .format("console")
    .start())

# IncrementalExecution.explain() доступен через lastExecution
spark.streams.get(query.id).lastExecution.executedPlan.toString()

# Более читаемо:
print(query.explain(True))

Ключевые вещи, на которые смотреть в explain():

  • storeVersion — должен совпадать с batchId
  • numPartitions — если изменилось, state несовместим
  • outputMode — убедиться, что правильный режим
  • watermark — если 0, eviction не происходит

Попробуй сам

Исследуйте, как IncrementalExecution настраивает физический план:

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, col
import time

spark = SparkSession.builder \
    .appName("incremental-execution-demo") \
    .getOrCreate()

df = (spark.readStream
    .format("rate")
    .option("rowsPerSecond", 100)
    .load())

# Windowed aggregation с watermark
result = (df
    .withWatermark("timestamp", "30 seconds")
    .groupBy(window("timestamp", "1 minute"), col("value") % 5)
    .count())

query = (result.writeStream
    .outputMode("update")
    .format("memory")
    .queryName("plan_demo")
    .option("checkpointLocation", "/tmp/plan-demo")
    .start())

time.sleep(15)

# Получаем последний executed plan
last_exec = spark.streams.get(query.id).lastExecution
if last_exec:
    print("=== Executed Plan ===")
    print(last_exec.executedPlan)
    print("\n=== State Operators ===")
    for op in query.lastProgress.get("stateOperators", []):
        print(f"Operator: {op['operatorName']}")
        print(f"  rows in state: {op['numRowsTotal']}")
        print(f"  memory used: {op['memoryUsedBytes']} bytes")

query.stop()

В выводе executedPlan найдите StateStoreSaveExec и StateStoreRestoreExec — они обрамляют HashAggregateExec. Обратите внимание на storeVersion в stateInfo — он должен совпадать с последним batchId.

Проверка знанийKnowledge check
Streaming-запрос: df.withWatermark("ts", "10 minutes").groupBy(window("ts", "5 minutes")).count().writeStream.outputMode("append"). В батче N watermark продвинулся до 10:20:00. Окно [10:10:00, 10:15:00] содержит 50 записей. Что произойдёт с этим окном в StateStoreSaveExec при outputMode=Append, и почему Append требует watermark для корректной работы?
ОтветAnswer
В Append mode StateStoreSaveExec испускает результат окна (и удаляет его из state) только когда window.end меньше текущего watermark. Для окна [10:10:00, 10:15:00] window.end = 10:15:00, а watermark = 10:20:00. Условие 10:15:00 < 10:20:00 выполнено — окно «закрыто». StateStoreSaveExec испускает строку (Window(10:10-10:15), count=50) в sink и удаляет окно из StateStore. Если watermark НЕ задан (withWatermark не вызван), окна никогда не закрываются в Append mode — state растёт бесконечно, и в sink ничего не пишется. Вот почему Append mode требует watermark: без него нельзя определить, когда окно «финализировано» и новые данные в него уже не придут. Update mode не имеет этого ограничения: он испускает обновлённые агрегаты каждый батч, не дожидаясь «закрытия» окна, поэтому watermark там опционален (хотя и рекомендован для eviction state).

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. StateStoreRestoreExec для агрегации испускает ДВЕ строки если ключ уже есть в state: старое состояние и новую входную строку. Почему именно две, а не просто «обновлённый результат»?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6