Learning Platform
Глоссарий Troubleshooting
Урок 02.02 · 26 мин
Продвинутый
State managementEvent timeHistoryStormMillWheelDataflow

В большинстве систем обработки данных state и time — это прикладной слой. Вы пишете запрос, runtime его исполняет; чтобы посчитать что-то с состоянием (rolling average, session counts) — добавляете внешнее хранилище (Redis, Cassandra); чтобы обработать event-time — пишете timestamp в данные и сортируете. Это работает для batch и для простых streaming-кейсов, но ломается на сложных streaming workloads: latency растёт из-за внешних round-trips, exactly-once требует ручной координации, поздние события (late data) обрабатываются ad-hoc.

Flink сделал другой выбор: state и time — first-class abstractions runtime-а. Это значит:

  • State — встроенный примитив операторов. Не библиотека, не плагин. ValueState, MapState, ListState — API часть DataStream. Сохранение state в чекпоинт — атомарно вместе со всем pipeline.
  • Time — встроенная концепция Flink-программы. Event time, processing time, ingestion time — first-class в API. Watermarks — это сигналы внутри потока, обрабатываемые runtime-ом.

Эта философия не родилась из воздуха. Она пришла из конкретной эволюции streaming-систем 2010-х: Storm -> MillWheel -> Dataflow -> Flink. В этом уроке разберём эту эволюцию и поймём, почему именно first-class state + time оказались правильным выбором.

Storm: streaming без state

Apache Storm (Twitter, 2011) — первая широко используемая streaming-система. Архитектура: topology из spouts (sources) и bolts (operators), tuples текут от spout к bolts.

Storm topology — простая модель
SpoutИсточник: читает из Kafka, Twitter API, и эмитит tuples в topology.
Bolt: filterПростой stateless оператор: фильтрует tuples.
Bolt: enrichStateless оператор: добавляет данные из внешнего lookup (Redis). Каждый tuple -> call Redis.
Bolt: sinkOutput: пишет в HDFS, Kafka, или БД. Тоже stateless с точки зрения Storm.

Storm — это stateless топология. Каждый bolt — это просто функция от input tuple к output. Если вам нужно состояние (накопить счётчик, посчитать sliding average), вы:

  1. Подключаетесь к внешнему хранилищу (Redis, Cassandra, HBase).
  2. На каждый tuple делаете read-modify-write.
  3. На каждый tuple платите latency external round-trip.
State Stores в Kafka Streams

Это работает для простых case-ов, но имеет фундаментальные проблемы:

Latency

Каждый external lookup — это network round-trip. Redis latency 0.5-2ms в одной AZ, 5-20ms между регионами. Если у вас цепочка из 3 bolts, каждый делает Redis call — суммарная latency 5-15ms на tuple. На stateful bolts это становится bottleneck.

Throughput

Внешнее хранилище — централизованный bottleneck. Если throughput pipeline 100k events/sec, и каждый event делает Redis read+write — Redis нужно 200k ops/sec, что уже требует sharding и сложной топологии Redis cluster.

Exactly-once

Storm имел at-least-once semantics: tuples повторно обрабатывались при failure. State в Redis — eventually consistent: tuple, обработанный дважды, мог дважды инкрементировать счётчик. Exactly-once требовал ручной идемпотентности (по event ID), что усложняло приложение.

Failure recovery

Если Storm bolt упал и стартовал заново, его state в Redis оставался с моментом до сбоя, а tuples обрабатываются с момента последнего commit (через Kafka offsets). Эти два не синхронизированы — recovery всегда был немного inconsistent.

Storm работал, и его успешно использовали в Twitter, Yahoo, и далее. Но в 2013-2014 стало понятно, что для больших stateful streaming workloads этой модели недостаточно.

MillWheel: state как часть runtime

В августе 2013 Google опубликовал статью “MillWheel: Fault-Tolerant Stream Processing at Internet Scale” (Akidau et al, VLDB 2013). Это была первая работа, которая решила проблему правильно.

Главные идеи MillWheel:

1. State — встроенный примитив

В MillWheel каждый computation (аналог bolt) имеет per-key state: KV-store, локальный к процессу, доступный только этому computation для конкретного key. State part of runtime: его сохранение, recovery — обязанность системы, а не приложения.

API (упрощённо):

class MyComputation : public Computation {
  void ProcessRecord(const RecordEnvelope& env) {
    StateAccessor* state = GetStateAccessor();
    int64 counter = state->Get<int64>("counter");
    counter += env.record().value();
    state->Set<int64>("counter", counter);
    EmitOutput(...);
  }
};

Никакого Redis, никаких external round-trips. State — локальный, быстрый, persistent.

2. Time — first-class

MillWheel ввёл концепцию low watermarks — нижней оценки времени всех событий, которые ещё могут прийти. Это решило проблему “когда мне закрывать окно?”: закрывать, когда low watermark прошёл конец окна.

Это была первая система, где event-time был встроенным runtime-понятием, а не приложенческим workaround.

3. Exactly-once через atomic state + record commits

MillWheel использовал underlying Spanner-like KV store для state, и каждый processed record + state delta + outgoing records commit-ились atomically. Это давало end-to-end exactly-once без приложенческой идемпотентности.

MillWheel был internal Google, никогда не open-sourced. Но статья оказала огромное влияние — несколько лет спустя её идеи реализовались в open-source системах.

Dataflow: формализация модели

В 2015 Google опубликовал статью “The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing” (Akidau et al, VLDB 2015). Это была формализация идей MillWheel в более широкую модель.

Главные концепции Dataflow model:

Dataflow model: четыре вопроса
WHATЧто вычисляется. Например: SUM(amount) или COUNT(*) или TopK по user.
WHEREГде (в event-time) вычисляется. Окна: tumbling, sliding, session. Глобальное окно.
WHENКогда (в processing time) эмитим результат. Триггеры: по watermark, периодически, по count, кастомные.
HOWКак relate refinements: при late data, добавляем (accumulating), переписываем (discarding), эмитим delta (accumulating + retracting).

Эта модель — основа Apache Beam (open-source реализация Dataflow model) и, концептуально, архитектурный референс Flink. Flink не имплементирует Beam напрямую, но его API во многих местах изоморфен этой модели:

  • WHAT -> DataStream operations (map, sum, aggregate).
  • WHERE -> .window(TumblingEventTimeWindows.of(...)).
  • WHEN -> Trigger interface (EventTimeTrigger, ProcessingTimeTrigger).
  • HOW -> накопительные vs не-накопительные WindowAggregator-ы; retract/insert messages в Table API.

Flink родился как академический проект Stratosphere (TU Berlin, 2009-2014), и в 2014 был donated в Apache. К 2015 году, когда Dataflow paper вышел, Flink уже имел значительную часть концепций реализованными — потому что Stratosphere смотрел на MillWheel и thinking в том же направлении.

В Flink 1.0 (март 2016) была чёткая first-class реализация:

State API

public class MyFunction extends KeyedProcessFunction<String, Event, Result> {
    private ValueState<Long> counter;
    private MapState<String, Event> events;

    @Override
    public void open(OpenContext ctx) {
        ValueStateDescriptor<Long> counterDesc = new ValueStateDescriptor<>(
            "counter", Types.LONG);
        counter = getRuntimeContext().getState(counterDesc);

        MapStateDescriptor<String, Event> eventsDesc = new MapStateDescriptor<>(
            "events", Types.STRING, Types.POJO(Event.class));
        events = getRuntimeContext().getMapState(eventsDesc);
    }

    @Override
    public void processElement(Event e, Context ctx, Collector<Result> out)
            throws Exception {
        Long current = counter.value();
        if (current == null) current = 0L;
        counter.update(current + 1);

        events.put(e.id, e);

        out.collect(new Result(...));
    }
}

Здесь ValueState и MapStateвстроенные примитивы. Они автоматически:

  • Per-key scoped (только для текущего key обрабатываемого элемента).
  • Persistently stored в state backend (RocksDB / HashMap / ForStDB).
  • Включены в каждый чекпоинт.
  • Восстанавливаются при failure.
  • Migrate-уются при rescaling.

Никакого внешнего хранилища, никакой ручной consistency. Это то, что MillWheel сделал внутри Google — теперь доступно как open-source primitive.

Time API

DataStream<Event> stream = env.fromSource(source,
    WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, ts) -> event.timestamp),
    "kafka-source");

stream
    .keyBy(e -> e.userId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.minutes(1))
    .aggregate(new MyAggregator());

Здесь:

  • WatermarkStrategy — first-class абстракция (org.apache.flink.api.common.eventtime.WatermarkStrategy). Определяет, как генерировать watermarks из event timestamps.
  • TumblingEventTimeWindows — built-in window assigner, основанный на event-time.
  • allowedLateness — built-in поддержка late events.
  • Runtime автоматически propagates watermarks через pipeline, fires окна по watermark, обрабатывает late events.

В Spark Structured Streaming аналогичный код выглядит похоже, но runtime — micro-batch, что меняет latency-характеристики. В Storm — пишите всё руками с external state.

Снапшоты state + position синхронизированы

Алгоритм Чанди-Лэмпорта (модуль 06) обеспечивает, что snapshot всего state всех операторов + позиции source (Kafka offsets) + outgoing transactions sinks (Kafka transactions) — все atomically относительно одного moment-in-time. При failure recovery:

  1. Восстанавливаем state из последнего checkpoint.
  2. Восстанавливаем Kafka source offsets из того же checkpoint.
  3. Aborting любые outgoing transactions, которые были started но не committed (модуль 12, 2PC).
  4. Возобновляем processing — exactly с того же state и с той же source position.

Это и есть exactly-once end-to-end. Без first-class state + first-class checkpointing это невозможно.

Per-key state: ключевая абстракция

В Flink state бывает двух типов:

Operator state vs Keyed state
Operator statePer-operator state, не scoped по ключу. Используется для broadcast state, для Kafka source offsets (исторически — сейчас offsets в keyed-state SourceCoordinator-а), для buffering в connectors. Управляется через CheckpointedFunction interface.
Keyed statePer-key state — каждый key имеет свой instance state. Доступен только в KeyedStream operations (после .keyBy()). Главный тип state в типичных Flink-программах.
Broadcast stateSubset operator state. Одинаковая копия на каждой parallel instance. Используется для distributing rules, lookup tables, конфигурации.
ValueState<T>Per-key single value. Самый простой и частый тип. RocksDB ColumnFamily, JNI getter/setter.
ListState<T>Per-key list of values. Для buffering, например window state. Эффективное append, full read.
MapState<K, V>Per-key map. Каждая map entry — separate row in RocksDB. Эффективен для lookups, iteration, partial updates.
ReducingState<T>Per-key accumulating state с ReduceFunction. Внутри — один value, на update — apply reducer.

Per-key scoping — самая важная idea. Когда DataStream .keyBy(e -> e.userId), Flink партиционирует поток по userId (через KeyGroupAssignment — модуль 02). Каждая parallel-инстанция оператора отвечает за поднабор keys. Состояние, обращённое через valueState.value() в processElement, автоматически scoped к currentKey() — то есть к userId текущего элемента.

Это значит: вам не нужно вручную делать map.get(key) / map.put(key, value). Вы просто работаете с state interface, и runtime сам обеспечивает per-key изоляцию. Это огромный упрощающий примитив.

Code references

Главные классы:

  • org.apache.flink.api.common.state.ValueStateDescriptor — описание state, регистрируемого в open() метода.
  • org.apache.flink.runtime.state.KeyedStateBackend — interface for keyed state implementations.
  • org.apache.flink.contrib.streaming.state.RocksDBValueState — RocksDB implementation для ValueState. Делает JNI calls для get/put в native RocksDB instance.
  • org.apache.flink.runtime.state.heap.HeapValueState — heap (HashMap) implementation. Используется в HashMapStateBackend.

Когда вы видите в коде getRuntimeContext().getState(descriptor), под капотом происходит вызов в KeyedStateBackend#createOrUpdateInternalState, который возвращает либо новый state instance, либо restored из checkpoint.

Time как first-class

Time в Flink — это не просто timestamp на элементе. Это система типов time:

Три типа времени в Flink
Event timeВремя, когда событие реально произошло. Извлекается из payload через TimestampAssigner. Watermarks основаны на event time. Используется для корректной обработки out-of-order данных.
Processing timeВремя, когда событие пришло в оператор (wall clock). Самый простой и быстрый, но не корректный для out-of-order данных. Используется когда latency важнее accuracy.
Ingestion timeВремя, когда событие вошло в pipeline (на source оператор assign timestamp). Промежуточный вариант между event и processing time. Редко используется в 2026.

Все три — first-class в API. Можно выбрать какое использовать через window assigner: TumblingEventTimeWindows, TumblingProcessingTimeWindows. Можно регистрировать timer-ы в обоих временах: ctx.timerService().registerEventTimeTimer(...) или registerProcessingTimeTimer(...).

Watermarks — сигналы внутри потока

В runtime watermark — это специальный record (org.apache.flink.streaming.api.watermark.Watermark), который течёт по pipeline наравне с данными. Каждый stream operator при получении watermark делает:

  1. Forwards watermark downstream (после обработки своих timer-ов).
  2. Если оператор stateful (например, window): fires window-ы, которые “созрели” под этим watermark.

Multiple input операторы (например, после union) emit min watermark из всех входов — это гарантирует, что downstream видит monotonically increasing watermark.

Идея, что time — это signal in the data, фундаментальна и пришла из MillWheel.

// AbstractStreamOperator, упрощённо:
public void processWatermark(Watermark mark) throws Exception {
    if (timeServiceManager != null) {
        timeServiceManager.advanceWatermark(mark);
    }
    output.emitWatermark(mark);  // forward downstream
}

Класс: org.apache.flink.streaming.api.operators.AbstractStreamOperator, метод processWatermark.

Что first-class state даёт на практике

Резюмируем, что вы получаете, выбирая систему с first-class state:

Низкая latency

Доступ к state — это локальный JNI call (RocksDB) или HashMap-lookup (heap). Микросекунды, не миллисекунды. Pipeline с 10 stateful операторами обрабатывает один event за единицы миллисекунд suma.

Большие state

Один TM может держать сотни гигабайт state (RocksDB на local SSD). С disaggregated state (модуль 10) — терабайты. Без external store.

Atomic exactly-once

Через 2PC + first-class checkpointing. Без приложенческой идемпотентности.

Простой application code

Вместо ручного управления Redis-connection + retry + caching, пишете valueState.update(newValue). Runtime всё остальное делает.

Schema evolution встроена

Через State Processor API (модуль 07) и встроенные сериализаторы (Avro, Protobuf), state может эволюционировать без потери данных при upgrade приложения.

Цена first-class state

Будем честны.

State management — это сложно

Чем больше state, тем больше операционных проблем:

  • Checkpoint duration растёт.
  • Recovery time после failure растёт.
  • Migration на новый parallelism (key-group redistribution) — нетривиальная операция.
  • Schema evolution требует осторожности.

В Flink эти проблемы решаются, но требуют понимания internals. Этому посвящён значительный кусок курса (модули 05-07).

State backend — отдельная subsystem

RocksDB — это полноценная база данных, embedded в JVM-процесс. Со своими конфигурациями, тюнингом, gotchas. Если у вас 100 GiB state на RocksDB на TM, вам нужно знать RocksDB internals (модуль 05) или job будет работать неоптимально.

HashMap vs RocksDB: выбор state backend

Disaggregated state — новая reality

Flink 2.0+ принёс disaggregated state — state на DFS вместо local disk. Это решает scalability, но добавляет новый класс проблем (cache management, async access patterns). Курс посвящает этому отдельный модуль (10).

WARNING

Не недооценивайте сложность state management. Многие production-инциденты Flink связаны со state: too big, growing unbounded, slow checkpoints, slow recovery. Time investment в понимание state — самый высокий ROI в Flink internals.

Что дальше

Следующий урок — про эволюцию Flink с 1.x до 2.2. Что было удалено (DataSet, Scala, SourceFunction) и почему. Какие фичи добавлены. Как этот пятилетний путь определил current state Flink.

После этого мы выходим из философского блока и спускаемся в реальные internals: модуль 02 — архитектура JM/TM, scheduler, граф трансформаций.

Проверка знанийKnowledge check
Сравните три подхода к stateful streaming: (a) Storm с Redis (external state), (b) Spark Structured Streaming (micro-batch с in-memory state), (c) Flink с first-class state. Для use case "real-time fraud detection: 50k events/sec, per-user state ~10 KB (history of last 100 transactions), latency budget 100ms p99, exactly-once". Какой подход и почему?
ОтветAnswer
Flink с first-class state. Анализ: (a) Storm+Redis — каждый event требует Redis get+put, типичная latency 1-2ms, для 5 stateful операторов в pipeline получится 5-10ms на Redis-calls alone. Throughput 50k events/sec означает 100-150k Redis ops/sec — нужен Redis cluster с sharding. Главное: exactly-once требует ручной идемпотентности по event-ID, что усложняет логику. (b) Spark Structured Streaming — micro-batch размер 1s = минимальная latency 1s, что > budget 100ms. Уменьшить micro-batch до 100ms — overhead на запуск каждого batch съест ресурсы. State в Spark — in-memory, но для 50k events/sec × 10 KB per user × ~1M users = 10 GiB state, нужно достаточно памяти. Exactly-once через idempotent sink. (c) Flink — state локальный (RocksDB на TM), доступ микросекунды, нет external round-trip. Per-element latency единицы ms, budget 100ms p99 с большим запасом. State 10 GiB на parallelism 16 TM = ~600 MiB на TM, легко влезает. Exactly-once через Kafka transactional sink + 2PC checkpoint — атомарно. Throughput 50k/s — easy для Flink. Это classic use case для Flink — он спроектирован под него.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 3. Что означает 'state как first-class citizen' в Flink, и почему это принципиально отличает Flink от Storm?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3