Tumbling и sliding windows
Windows — это основной механизм для агрегации по времени в Flink. Без них вы либо обрабатываете каждое событие отдельно (stateless), либо накапливаете state бесконечно. Window-операция разбивает unbounded поток на конечные “куски”, позволяя считать sum, count, average, percentiles за определённый интервал.
В этом уроке разбираем две самые распространённые window-стратегии: tumbling (без перекрытия) и sliding (с перекрытием). Освоив их, вы покроете 80% production-агрегаций.
Tumbling windows: фиксированные интервалы без перекрытия
Tumbling window разбивает время на смежные не пересекающиеся интервалы фиксированной длины. Каждое событие попадает ровно в одно окно.
DataStream<Order> orders = ...;
DataStream<Revenue> hourlyRevenue = orders
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.reduce((a, b) -> new Order(a.shopId, a.amount + b.amount, a.timestamp))
.map(o -> new Revenue(o.shopId, o.amount, ...));
В этом примере для каждого shopId мы агрегируем выручку по часовым интервалам: [00:00, 01:00), [01:00, 02:00), и так далее. Каждый заказ принадлежит ровно одному часу.
Python:
from pyflink.common import Duration
from pyflink.datastream.window import TumblingEventTimeWindows
hourly_revenue = orders \
.key_by(lambda o: o.shop_id) \
.window(TumblingEventTimeWindows.of(Duration.of_hours(1))) \
.reduce(lambda a, b: Order(a.shop_id, a.amount + b.amount, a.timestamp))
Tumbling windows выравниваются на границы эпохи: окно длиной 1 час начинается на круглый час (00:00, 01:00), окно длиной 5 минут — на :00, :05, :10 и так далее. Это поведение можно сдвинуть параметром offset:
// Окно с offset 30 минут: границы будут [00:30, 01:30), [01:30, 02:30) и т.д.
TumblingEventTimeWindows.of(Duration.ofHours(1), Duration.ofMinutes(30))
Offset полезен для работы с timezone-смещениями: если ваш бизнес-час в часовом поясе UTC+3, ставьте offset = Duration.ofHours(-3) чтобы окна выровнялись по местному времени.
Sliding windows: окна с перекрытием
Sliding window определяется двумя параметрами: длина окна size и шаг slide. Окна перекрываются: каждое следующее окно начинается через slide времени после предыдущего, а длится size.
DataStream<MetricSnapshot> rolling5MinAvg = events
.keyBy(Event::getMetricName)
.window(SlidingEventTimeWindows.of(
Duration.ofMinutes(5), // size
Duration.ofMinutes(1) // slide
))
.aggregate(new AverageAggregate());
Здесь каждая минута мы получаем средние за последние 5 минут. Каждое событие попадает в size / slide = 5 окон одновременно.
Sliding window умножает память: каждое событие хранится в (size/slide) окнах. Для 1-часового окна с 1-минутным шагом одно событие хранится в 60 окнах. Если у вас 1M событий/час, в памяти будет 60M записей в активных окнах одновременно. Используйте ReduceFunction/AggregateFunction (инкрементальная агрегация) вместо ProcessWindowFunction (которая буферизует все события до закрытия окна) — это снижает overhead до константного.
Когда tumbling, когда sliding
Tumbling подходит когда:
- Нужны дискретные периоды отчётности: “выручка за каждый час”, “MAU за каждый месяц”.
- Каждое событие должно учитываться ровно один раз.
- Downstream-системы ожидают неперекрывающихся аналитических окон (например, daily snapshots в data warehouse).
Sliding подходит когда:
- Нужен rolling-average для мониторинга: “среднее CPU за последние 5 минут, обновляемое каждую минуту”.
- Нужно сглаживание метрик с регулярными обновлениями.
- Алертинг должен срабатывать при пересечении порога в любом 5-минутном окне.
Размерности:
- size/slide = 1 — это tumbling.
- slide
< size— перекрытие, классический sliding. - slide > size — gap-окна, события между окнами теряются (редко полезно).
Count windows: альтернатива по количеству
Параллельно временным окнам существуют count-окна — фиксированные по количеству событий:
// Каждые 100 событий per ключ — tumbling
.countWindow(100)
// Каждые 50 новых событий — окно из последних 100
.countWindow(100, 50)
Count-окна не учитывают время — они закрываются только когда накопится достаточно событий. Это плохо для unbounded streams: если поток замедлился, окно никогда не закроется. На практике count windows используют редко — почти всегда лучше time-based.
Count windows не работают с watermarks. Если для ключа долго нет событий, последний накопленный count никогда не emit-ится. Если вам нужна гарантия “обработать накопленное к моменту X”, используйте event-time tumbling с triggers (см. урок про triggers).
Window assigners в Flink
Tumbling и sliding — это два класса встроенных window assigners. Полный список для DataStream API:
| Assigner | Тип |
|---|---|
TumblingEventTimeWindows | Tumbling, event time |
TumblingProcessingTimeWindows | Tumbling, processing time |
SlidingEventTimeWindows | Sliding, event time |
SlidingProcessingTimeWindows | Sliding, processing time |
EventTimeSessionWindows | Session, event time (см. урок про session windows) |
ProcessingTimeSessionWindows | Session, processing time |
GlobalWindows | Один глобальный window, нужен явный trigger |
DynamicEventTimeSessionWindows | Session с динамическим gap |
Event time vs processing time — фундаментальный выбор (см. модуль 06). Кратко: event time использует timestamp из самих событий и watermarks, processing time — системные часы TaskManager. Для production-аналитики практически всегда event time.
Полный пример: agg + ProcessWindowFunction
Часто комбинируют incremental aggregation и ProcessWindowFunction для получения и эффективной агрегации, и метаданных окна:
public class MaxAggregate implements AggregateFunction<Order, Long, Long> {
@Override public Long createAccumulator() { return Long.MIN_VALUE; }
@Override public Long add(Order o, Long max) { return Math.max(max, o.amount); }
@Override public Long getResult(Long max) { return max; }
@Override public Long merge(Long a, Long b) { return Math.max(a, b); }
}
public class MaxOrderProcessFunction
extends ProcessWindowFunction<Long, MaxOrderReport, String, TimeWindow> {
@Override
public void process(String shopId, Context ctx, Iterable<Long> maxes,
Collector<MaxOrderReport> out) {
long max = maxes.iterator().next(); // ровно одно значение от aggregate
out.collect(new MaxOrderReport(
shopId,
ctx.window().getStart(),
ctx.window().getEnd(),
max
));
}
}
DataStream<MaxOrderReport> result = orders
.keyBy(Order::getShopId)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.aggregate(new MaxAggregate(), new MaxOrderProcessFunction());
Преимущество паттерна: MaxAggregate обновляет state на каждое событие O(1), не буферизуя; MaxOrderProcessFunction получает уже агрегированное значение плюс контекст окна (start, end, window kind) для emit с метаданными.
Параметры production-job
Несколько важных настроек, которые влияют на windows:
pipeline.auto-watermark-interval (по умолчанию 200ms). Как часто оператор испускает watermarks. Слишком часто — overhead; слишком редко — задержка закрытия окон.
table.exec.emit.early-fire.enabled (для Table API). Включает early-fire — окно эмиттит промежуточный результат до закрытия.
pipeline.time-characteristic в Flink 1.12+ устарела — теперь определяется выбором assigner-а (TumblingEventTime vs TumblingProcessingTime).
Попробуй сам
-
Tumbling vs Sliding throughput. Запустите job с TumblingEventTime(5min) и SlidingEventTime(5min, 1min) на одном потоке 10K событий/сек. Замерьте память на оператор: sliding должен использовать в ~5 раз больше state.
-
Incremental vs ProcessWindowFunction. Реализуйте sum через ReduceFunction и через ProcessWindowFunction (буфер всех событий). Прогоните на 100K событий per ключ. ProcessWindowFunction должен показать значительно большее использование heap.
-
Late event behavior. В job с TumblingEventTime(1min) пошлите событие с timestamp за 10 минут до текущего watermark. Без allowed lateness оно будет dropped. Включите
.allowedLateness(Duration.ofMinutes(15))и убедитесь, что окно re-emit-ится с включением late события.