Window functions: Reduce, Aggregate, Process
После выбора window assigner (tumbling, sliding, session) приходит главное решение: что делать с событиями внутри окна. Flink предоставляет три API: ReduceFunction, AggregateFunction, ProcessWindowFunction. Они отличаются по гибкости, производительности и tradeoffs между ними.
Выбор не косметический: неверная функция превращает window-операцию из эффективной в кошмар по памяти и latency.
ReduceFunction: симметричная парная агрегация
Простейший вариант. Принимает два элемента того же типа, что и поток, и возвращает один объединённый.
DataStream<Order> orders = ...;
DataStream<Order> sumByShop = orders
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.reduce((a, b) -> new Order(a.shopId, a.amount + b.amount, a.timestamp));
Ограничения:
- Тип input = тип output. Если входной поток
Order, и вы хотите получитьRevenue—reduceне подходит, нуженaggregate. - Симметричная функция.
reduce(a, b)должна давать тот же результат при любом порядке аргументов (коммутативность) и любой группировке (ассоциативность).
Python:
sum_by_shop = orders \
.key_by(lambda o: o.shop_id) \
.window(TumblingEventTimeWindows.of(Duration.of_hours(1))) \
.reduce(lambda a, b: Order(a.shop_id, a.amount + b.amount, a.timestamp))
Когда выбирать: простая sum/min/max/concat без преобразования типов.
Aggregations в Spark DataFrame API: batch-семантика vs incrementalAggregateFunction: разные типы input/accumulator/output
Гибкая версия reduce. Имеет три типа:
IN— тип входных элементов.ACC— тип аккумулятора (промежуточного состояния).OUT— тип финального результата.
public class AverageAggregate
implements AggregateFunction<Order, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return Tuple2.of(0L, 0L); // (sum, count)
}
@Override
public Tuple2<Long, Long> add(Order o, Tuple2<Long, Long> acc) {
return Tuple2.of(acc.f0 + o.amount, acc.f1 + 1);
}
@Override
public Double getResult(Tuple2<Long, Long> acc) {
return acc.f1 == 0 ? 0.0 : (double) acc.f0 / acc.f1;
}
@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream<Double> avgAmount = orders
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.aggregate(new AverageAggregate());
Каждый метод:
createAccumulator()— стартовое состояние (вызывается один раз для пустого окна).add(event, acc)— обновить аккумулятор новым событием. Вызывается на каждое событие в окне.getResult(acc)— финальный результат при закрытии окна.merge(a, b)— объединить два аккумулятора. Критично для session windows и mergeable окон.
AggregateFunction — рекомендуемый default для большинства production-агрегаций. Она инкрементальна (стоимость add — O(1)), типобезопасна (разный input и output), и поддерживает merge для session windows.
ProcessWindowFunction: полный доступ к окну
В отличие от Reduce/Aggregate, эта функция получает весь Iterable событий окна и контекст:
public class TopProductInWindow
extends ProcessWindowFunction<Order, Report, String, TimeWindow> {
@Override
public void process(String shopId, Context ctx,
Iterable<Order> orders, Collector<Report> out) {
Map<String, Long> productCounts = new HashMap<>();
for (Order o : orders) {
productCounts.merge(o.productId, 1L, Long::sum);
}
String top = productCounts.entrySet().stream()
.max(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.orElse(null);
out.collect(new Report(
shopId,
ctx.window().getStart(),
ctx.window().getEnd(),
top
));
}
}
DataStream<Report> reports = orders
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.process(new TopProductInWindow());
Достоинства:
- Полный доступ ко всем событиям — можно сортировать, выбирать top-N, делать что угодно.
- Контекст (
ctx.window()) даёт start/end окна, тип окна, доступ к window-state и global-state. - Доступ к side-output через
ctx.output(...).
Главный недостаток — буферизация. Все события окна хранятся в state до закрытия окна. Для окна 1 час с 1M событий в час, на каждый ключ — 1M объектов в памяти. Для миллионов ключей — катастрофа.
ProcessWindowFunction буферизует все события окна. Это годится для маленьких окон (десятки-сотни событий) или когда логика реально требует видеть всю последовательность. Для больших окон с инкрементальной операцией (sum, count, average) ProcessWindowFunction — это ловушка для нового пользователя Flink. Используйте AggregateFunction либо комбинацию.
Сравнение: память, CPU, гибкость
Гибрид: AggregateFunction + ProcessWindowFunction
Самый мощный паттерн — комбинировать инкрементальную агрегацию с финальной обработкой:
DataStream<Report> reports = orders
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.aggregate(
new AverageAggregate(), // инкрементальная: O(1)
new ReportProcessFunction() // финальная обработка + контекст
);
public class ReportProcessFunction
extends ProcessWindowFunction<Double, Report, String, TimeWindow> {
@Override
public void process(String shopId, Context ctx,
Iterable<Double> avgs, Collector<Report> out) {
Double avg = avgs.iterator().next();
out.collect(new Report(
shopId,
ctx.window().getStart(),
ctx.window().getEnd(),
avg
));
}
}
Что происходит:
- На каждое событие в окне
AverageAggregate.add()обновляет accumulator — O(1) state. - При закрытии окна Flink вычисляет финальный результат через
getResult()— это одно число. ReportProcessFunction.process()получает Iterable с одним элементом (результатом aggregate) и доступ к контексту окна (start, end, etc.).- Финальный output —
Reportс метаданными окна.
Этот паттерн объединяет O(1) memory с полным доступом к window-метаданным. Используйте его как default для production, если нужна агрегация + start/end окна.
Window state в ProcessWindowFunction
В ProcessWindowFunction доступно специальное окно-локальное state через ctx.windowState():
public class StatefulProcessFunction
extends ProcessWindowFunction<Order, Output, String, TimeWindow> {
private ValueStateDescriptor<Integer> sentDescriptor;
@Override
public void open(OpenContext ctx) {
sentDescriptor = new ValueStateDescriptor<>("alerts-sent", Integer.class);
}
@Override
public void process(String key, Context ctx,
Iterable<Order> orders, Collector<Output> out) throws Exception {
ValueState<Integer> sentCount = ctx.windowState().getState(sentDescriptor);
Integer prev = sentCount.value();
// ... логика, использующая state в пределах окна
}
}
windowState() живёт ровно сколько окно — при закрытии окна state очищается. Это отличает его от getRuntimeContext().getState(), который живёт по ключу безотносительно к окнам.
ctx.globalState() — это keyed state на весь ключ, переживает закрытия окон. Полезен, например, для хранения “когда последний раз emit-или alert” между окнами.
Когда что выбрать: practical decision tree
Вопрос: что вы вычисляете в окне?
|-- Sum/Min/Max/Concat того же типа что входной поток?
| -> ReduceFunction
|
|-- Sum/Count/Average/etc с инкрементальной агрегацией, разный output?
| -> AggregateFunction
|
|-- Нужны window start/end в output?
| -> AggregateFunction + ProcessWindowFunction
|
|-- Top-K, перцентили, sort-based, что-то требующее всех событий?
| -> ProcessWindowFunction (с осторожностью к размеру окна)
|
|-- Поведение, зависящее от текущей сессии и истории других окон?
| -> ProcessWindowFunction + ctx.globalState()
|
|-- Это вообще не window-задача (нужны timers, side outputs, FSM)?
| -> KeyedProcessFunction (без windows)
Anti-pattern: ProcessWindowFunction для агрегации больших окон
// ПЛОХО
events.keyBy(Event::getUserId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(24)))
.process(new ProcessWindowFunction<Event, Stats, String, TimeWindow>() {
@Override
public void process(String key, Context ctx, Iterable<Event> events, Collector<Stats> out) {
long sum = 0, count = 0;
for (Event e : events) {
sum += e.value;
count++;
}
out.collect(new Stats(key, sum, (double) sum / count));
}
});
При 1M событий per user в 24-часовом окне это означает 1M объектов в памяти на пользователя. Для 100K активных пользователей — 100 миллиардов объектов одновременно в state. OOM гарантирован.
Лечение: заменить на AggregateFunction + ProcessWindowFunction гибрид. Те же 1M событий обновят accumulator O(1) и в state будут только sum и count.
Производительность: сериализация и type info
Все window functions требуют корректной type information для accumulator:
// Java умеет выводить тип Tuple2<Long, Long> автоматически
.aggregate(new AverageAggregate())
// Если AggregateFunction generic — иногда нужны явные подсказки
.aggregate(new AggregateFunction<Order, ?, ?>() { ... },
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
TypeInformation.of(Double.class))
В Python (PyFlink) типы указываются явно:
.aggregate(
AverageAggregate(),
accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]),
output_type=Types.DOUBLE()
)
Неправильный type info -> Kryo fallback -> медленная сериализация в RocksDB. На production-job это может стоить десятки процентов throughput.
Попробуй сам
-
Memory profiling Reduce vs Process. Один и тот же sum, реализованный через ReduceFunction и через ProcessWindowFunction. Прогоните 10M событий per ключ в окне. Снимите heap-снапшот в момент закрытия окна. ReduceFunction должна показать ~10 KB на ключ, ProcessWindowFunction — десятки мегабайт.
-
Hybrid pattern. Реализуйте топ-10 продуктов в часовом окне двумя способами: чистый ProcessWindowFunction (буферизация всех событий) и AggregateFunction + ProcessWindowFunction (incremental top-K через TreeMap). Сравните memory и CPU.
-
Merge stress test. Создайте session window с AggregateFunction. Сгенерируйте поток, где events приходят в обратном порядке времени — это вызовет много merge. Убедитесь, что результаты корректны и метод merge() покрывает все кейсы.