Learning Platform
Глоссарий Troubleshooting
Урок 06.03 · 22 мин
Средний
ReduceFunctionAggregateFunctionProcessWindowFunctionWindow StateIncremental Aggregation

Window functions: Reduce, Aggregate, Process

После выбора window assigner (tumbling, sliding, session) приходит главное решение: что делать с событиями внутри окна. Flink предоставляет три API: ReduceFunction, AggregateFunction, ProcessWindowFunction. Они отличаются по гибкости, производительности и tradeoffs между ними.

Выбор не косметический: неверная функция превращает window-операцию из эффективной в кошмар по памяти и latency.


ReduceFunction: симметричная парная агрегация

Простейший вариант. Принимает два элемента того же типа, что и поток, и возвращает один объединённый.

DataStream<Order> orders = ...;

DataStream<Order> sumByShop = orders
    .keyBy(Order::getShopId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .reduce((a, b) -> new Order(a.shopId, a.amount + b.amount, a.timestamp));

Ограничения:

  • Тип input = тип output. Если входной поток Order, и вы хотите получить Revenuereduce не подходит, нужен aggregate.
  • Симметричная функция. reduce(a, b) должна давать тот же результат при любом порядке аргументов (коммутативность) и любой группировке (ассоциативность).

Python:

sum_by_shop = orders \
    .key_by(lambda o: o.shop_id) \
    .window(TumblingEventTimeWindows.of(Duration.of_hours(1))) \
    .reduce(lambda a, b: Order(a.shop_id, a.amount + b.amount, a.timestamp))

Когда выбирать: простая sum/min/max/concat без преобразования типов.

Aggregations в Spark DataFrame API: batch-семантика vs incremental

AggregateFunction: разные типы input/accumulator/output

Гибкая версия reduce. Имеет три типа:

  • IN — тип входных элементов.
  • ACC — тип аккумулятора (промежуточного состояния).
  • OUT — тип финального результата.
public class AverageAggregate
        implements AggregateFunction<Order, Tuple2<Long, Long>, Double> {
    @Override
    public Tuple2<Long, Long> createAccumulator() {
        return Tuple2.of(0L, 0L);  // (sum, count)
    }

    @Override
    public Tuple2<Long, Long> add(Order o, Tuple2<Long, Long> acc) {
        return Tuple2.of(acc.f0 + o.amount, acc.f1 + 1);
    }

    @Override
    public Double getResult(Tuple2<Long, Long> acc) {
        return acc.f1 == 0 ? 0.0 : (double) acc.f0 / acc.f1;
    }

    @Override
    public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
    }
}

DataStream<Double> avgAmount = orders
    .keyBy(Order::getShopId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .aggregate(new AverageAggregate());

Каждый метод:

  • createAccumulator() — стартовое состояние (вызывается один раз для пустого окна).
  • add(event, acc) — обновить аккумулятор новым событием. Вызывается на каждое событие в окне.
  • getResult(acc) — финальный результат при закрытии окна.
  • merge(a, b) — объединить два аккумулятора. Критично для session windows и mergeable окон.
TIP

AggregateFunction — рекомендуемый default для большинства production-агрегаций. Она инкрементальна (стоимость add — O(1)), типобезопасна (разный input и output), и поддерживает merge для session windows.


ProcessWindowFunction: полный доступ к окну

В отличие от Reduce/Aggregate, эта функция получает весь Iterable событий окна и контекст:

public class TopProductInWindow
        extends ProcessWindowFunction<Order, Report, String, TimeWindow> {
    @Override
    public void process(String shopId, Context ctx,
                        Iterable<Order> orders, Collector<Report> out) {
        Map<String, Long> productCounts = new HashMap<>();
        for (Order o : orders) {
            productCounts.merge(o.productId, 1L, Long::sum);
        }
        String top = productCounts.entrySet().stream()
            .max(Map.Entry.comparingByValue())
            .map(Map.Entry::getKey)
            .orElse(null);
        out.collect(new Report(
            shopId,
            ctx.window().getStart(),
            ctx.window().getEnd(),
            top
        ));
    }
}

DataStream<Report> reports = orders
    .keyBy(Order::getShopId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .process(new TopProductInWindow());

Достоинства:

  • Полный доступ ко всем событиям — можно сортировать, выбирать top-N, делать что угодно.
  • Контекст (ctx.window()) даёт start/end окна, тип окна, доступ к window-state и global-state.
  • Доступ к side-output через ctx.output(...).

Главный недостаток — буферизация. Все события окна хранятся в state до закрытия окна. Для окна 1 час с 1M событий в час, на каждый ключ — 1M объектов в памяти. Для миллионов ключей — катастрофа.

WARNING

ProcessWindowFunction буферизует все события окна. Это годится для маленьких окон (десятки-сотни событий) или когда логика реально требует видеть всю последовательность. Для больших окон с инкрементальной операцией (sum, count, average) ProcessWindowFunction — это ловушка для нового пользователя Flink. Используйте AggregateFunction либо комбинацию.


Сравнение: память, CPU, гибкость

Window functions: tradeoffs
Инкрементальная: O(1) state на окно. Только same-type агрегация. Простейший случай sum/max/min. Подходит для коммутативных и ассоциативных операций.
Инкрементальная: O(1) на окно, разные типы IN/ACC/OUT. Рекомендуемый default для большинства production случаев. Поддерживает merge.
Буферизация: O(N) state, где N — количество событий в окне. Полный доступ ко всем событиям и контексту. Использовать ТОЛЬКО если нужна вся последовательность.
Гибридный паттерн: AggregateFunction делает O(1) инкрементальную работу, ProcessWindowFunction получает уже агрегированное значение + контекст окна. Best of both worlds.

Гибрид: AggregateFunction + ProcessWindowFunction

Самый мощный паттерн — комбинировать инкрементальную агрегацию с финальной обработкой:

DataStream<Report> reports = orders
    .keyBy(Order::getShopId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .aggregate(
        new AverageAggregate(),       // инкрементальная: O(1)
        new ReportProcessFunction()    // финальная обработка + контекст
    );

public class ReportProcessFunction
        extends ProcessWindowFunction<Double, Report, String, TimeWindow> {
    @Override
    public void process(String shopId, Context ctx,
                        Iterable<Double> avgs, Collector<Report> out) {
        Double avg = avgs.iterator().next();
        out.collect(new Report(
            shopId,
            ctx.window().getStart(),
            ctx.window().getEnd(),
            avg
        ));
    }
}

Что происходит:

  1. На каждое событие в окне AverageAggregate.add() обновляет accumulator — O(1) state.
  2. При закрытии окна Flink вычисляет финальный результат через getResult() — это одно число.
  3. ReportProcessFunction.process() получает Iterable с одним элементом (результатом aggregate) и доступ к контексту окна (start, end, etc.).
  4. Финальный output — Report с метаданными окна.

Этот паттерн объединяет O(1) memory с полным доступом к window-метаданным. Используйте его как default для production, если нужна агрегация + start/end окна.


Window state в ProcessWindowFunction

В ProcessWindowFunction доступно специальное окно-локальное state через ctx.windowState():

public class StatefulProcessFunction
        extends ProcessWindowFunction<Order, Output, String, TimeWindow> {
    private ValueStateDescriptor<Integer> sentDescriptor;

    @Override
    public void open(OpenContext ctx) {
        sentDescriptor = new ValueStateDescriptor<>("alerts-sent", Integer.class);
    }

    @Override
    public void process(String key, Context ctx,
                        Iterable<Order> orders, Collector<Output> out) throws Exception {
        ValueState<Integer> sentCount = ctx.windowState().getState(sentDescriptor);
        Integer prev = sentCount.value();
        // ... логика, использующая state в пределах окна
    }
}

windowState() живёт ровно сколько окно — при закрытии окна state очищается. Это отличает его от getRuntimeContext().getState(), который живёт по ключу безотносительно к окнам.

ctx.globalState() — это keyed state на весь ключ, переживает закрытия окон. Полезен, например, для хранения “когда последний раз emit-или alert” между окнами.


Когда что выбрать: practical decision tree

Вопрос: что вы вычисляете в окне?

|-- Sum/Min/Max/Concat того же типа что входной поток?
|     -> ReduceFunction
|
|-- Sum/Count/Average/etc с инкрементальной агрегацией, разный output?
|     -> AggregateFunction
|
|-- Нужны window start/end в output?
|     -> AggregateFunction + ProcessWindowFunction
|
|-- Top-K, перцентили, sort-based, что-то требующее всех событий?
|     -> ProcessWindowFunction (с осторожностью к размеру окна)
|
|-- Поведение, зависящее от текущей сессии и истории других окон?
|     -> ProcessWindowFunction + ctx.globalState()
|
|-- Это вообще не window-задача (нужны timers, side outputs, FSM)?
|     -> KeyedProcessFunction (без windows)

Anti-pattern: ProcessWindowFunction для агрегации больших окон

// ПЛОХО
events.keyBy(Event::getUserId)
      .window(TumblingEventTimeWindows.of(Duration.ofHours(24)))
      .process(new ProcessWindowFunction<Event, Stats, String, TimeWindow>() {
          @Override
          public void process(String key, Context ctx, Iterable<Event> events, Collector<Stats> out) {
              long sum = 0, count = 0;
              for (Event e : events) {
                  sum += e.value;
                  count++;
              }
              out.collect(new Stats(key, sum, (double) sum / count));
          }
      });

При 1M событий per user в 24-часовом окне это означает 1M объектов в памяти на пользователя. Для 100K активных пользователей — 100 миллиардов объектов одновременно в state. OOM гарантирован.

Лечение: заменить на AggregateFunction + ProcessWindowFunction гибрид. Те же 1M событий обновят accumulator O(1) и в state будут только sum и count.


Производительность: сериализация и type info

Все window functions требуют корректной type information для accumulator:

// Java умеет выводить тип Tuple2<Long, Long> автоматически
.aggregate(new AverageAggregate())

// Если AggregateFunction generic — иногда нужны явные подсказки
.aggregate(new AggregateFunction<Order, ?, ?>() { ... },
           TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
           TypeInformation.of(Double.class))

В Python (PyFlink) типы указываются явно:

.aggregate(
    AverageAggregate(),
    accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
    output_type=Types.DOUBLE()
)

Неправильный type info -> Kryo fallback -> медленная сериализация в RocksDB. На production-job это может стоить десятки процентов throughput.


Попробуй сам

  1. Memory profiling Reduce vs Process. Один и тот же sum, реализованный через ReduceFunction и через ProcessWindowFunction. Прогоните 10M событий per ключ в окне. Снимите heap-снапшот в момент закрытия окна. ReduceFunction должна показать ~10 KB на ключ, ProcessWindowFunction — десятки мегабайт.

  2. Hybrid pattern. Реализуйте топ-10 продуктов в часовом окне двумя способами: чистый ProcessWindowFunction (буферизация всех событий) и AggregateFunction + ProcessWindowFunction (incremental top-K через TreeMap). Сравните memory и CPU.

  3. Merge stress test. Создайте session window с AggregateFunction. Сгенерируйте поток, где events приходят в обратном порядке времени — это вызовет много merge. Убедитесь, что результаты корректны и метод merge() покрывает все кейсы.

Проверка знанийKnowledge check
Вы пишете 24-часовое tumbling окно для сбора метрик per device_id. В каждом окне нужно вычислить count, sum, average, и для отчёта — start/end окна и количество уникальных IP. Какая комбинация window functions оптимальна?
ОтветAnswer
Лучший вариант — AggregateFunction + ProcessWindowFunction. AggregateFunction делает инкрементальную часть: accumulator хранит count, sum, и для уникальных IP — HashSet<String> (или для очень больших cardinality — HyperLogLog approximation). На каждое событие add() обновляет состояние O(1). При закрытии окна ProcessWindowFunction получает один Iterable с финальным accumulator и вычисляет average = sum/count, плюс emit-ит start/end из ctx.window(). Чистый ProcessWindowFunction плох — потребуется буферизовать все события 24-часового окна (потенциально миллионы на устройство), что взорвёт память. Чистый AggregateFunction плох — не даёт доступа к start/end окна для отчёта. ReduceFunction не подходит — output (Report) отличается от input (Event). Дополнительно: для уникальных IP при больших cardinality используйте HyperLogLog в accumulator вместо HashSet — это даёт фиксированный размер state независимо от количества уникальных значений.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какое ключевое ограничение ReduceFunction по сравнению с AggregateFunction?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5