Learning Platform
Глоссарий Troubleshooting
Урок 06.01 · 18 мин
Средний
Tumbling WindowsSliding WindowsTime WindowsCount WindowsWindow Assigners

Tumbling и sliding windows

Windows — это основной механизм для агрегации по времени в Flink. Без них вы либо обрабатываете каждое событие отдельно (stateless), либо накапливаете state бесконечно. Window-операция разбивает unbounded поток на конечные “куски”, позволяя считать sum, count, average, percentiles за определённый интервал.

В этом уроке разбираем две самые распространённые window-стратегии: tumbling (без перекрытия) и sliding (с перекрытием). Освоив их, вы покроете 80% production-агрегаций.


Tumbling windows: фиксированные интервалы без перекрытия

Tumbling window разбивает время на смежные не пересекающиеся интервалы фиксированной длины. Каждое событие попадает ровно в одно окно.

DataStream<Order> orders = ...;

DataStream<Revenue> hourlyRevenue = orders
    .keyBy(Order::getShopId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .reduce((a, b) -> new Order(a.shopId, a.amount + b.amount, a.timestamp))
    .map(o -> new Revenue(o.shopId, o.amount, ...));

В этом примере для каждого shopId мы агрегируем выручку по часовым интервалам: [00:00, 01:00), [01:00, 02:00), и так далее. Каждый заказ принадлежит ровно одному часу.

Python:

from pyflink.common import Duration
from pyflink.datastream.window import TumblingEventTimeWindows

hourly_revenue = orders \
    .key_by(lambda o: o.shop_id) \
    .window(TumblingEventTimeWindows.of(Duration.of_hours(1))) \
    .reduce(lambda a, b: Order(a.shop_id, a.amount + b.amount, a.timestamp))
Tumbling window: дискретные интервалы без перекрытия
События распределены по часовым окнам. Каждое событие — ровно в одно окно.
Первое часовое окно. Содержит все события с timestamp в диапазоне [00:00:00.000, 01:00:00.000). Закрывается, когда watermark >= 01:00:00.
Второе окно — без перекрытия с первым. Window length фиксирован, нет sliding. Каждое окно агрегируется и emit-ится независимо.
Третье окно. Если watermark задержался и до сих пор не прошёл 02:00, окно пока не закрыто. Поздние события до закрытия попадают в правильное окно.

Tumbling windows выравниваются на границы эпохи: окно длиной 1 час начинается на круглый час (00:00, 01:00), окно длиной 5 минут — на :00, :05, :10 и так далее. Это поведение можно сдвинуть параметром offset:

Tumbling и sliding windows в Spark Structured Streaming
// Окно с offset 30 минут: границы будут [00:30, 01:30), [01:30, 02:30) и т.д.
TumblingEventTimeWindows.of(Duration.ofHours(1), Duration.ofMinutes(30))

Offset полезен для работы с timezone-смещениями: если ваш бизнес-час в часовом поясе UTC+3, ставьте offset = Duration.ofHours(-3) чтобы окна выровнялись по местному времени.


Sliding windows: окна с перекрытием

Sliding window определяется двумя параметрами: длина окна size и шаг slide. Окна перекрываются: каждое следующее окно начинается через slide времени после предыдущего, а длится size.

DataStream<MetricSnapshot> rolling5MinAvg = events
    .keyBy(Event::getMetricName)
    .window(SlidingEventTimeWindows.of(
        Duration.ofMinutes(5),  // size
        Duration.ofMinutes(1)   // slide
    ))
    .aggregate(new AverageAggregate());

Здесь каждая минута мы получаем средние за последние 5 минут. Каждое событие попадает в size / slide = 5 окон одновременно.

Sliding window 5min/1min: каждое событие в 5 окнах
slide=1min, size=5min. Окна стартуют каждую минуту и длятся 5 минут.
Окно, начавшееся в 00:00. Включает события с timestamp 00:00-00:04. Emit-ится при watermark >= 00:05.
Следующее окно, начавшееся через 1 минуту. Перекрывается с W1: события 00:01-00:04 попадают и сюда, и в W1.
Третье окно. e2 с timestamp 00:04 попадает и сюда — она 'видна' в трёх окнах одновременно.
WARNING

Sliding window умножает память: каждое событие хранится в (size/slide) окнах. Для 1-часового окна с 1-минутным шагом одно событие хранится в 60 окнах. Если у вас 1M событий/час, в памяти будет 60M записей в активных окнах одновременно. Используйте ReduceFunction/AggregateFunction (инкрементальная агрегация) вместо ProcessWindowFunction (которая буферизует все события до закрытия окна) — это снижает overhead до константного.


Когда tumbling, когда sliding

Tumbling подходит когда:

  • Нужны дискретные периоды отчётности: “выручка за каждый час”, “MAU за каждый месяц”.
  • Каждое событие должно учитываться ровно один раз.
  • Downstream-системы ожидают неперекрывающихся аналитических окон (например, daily snapshots в data warehouse).

Sliding подходит когда:

  • Нужен rolling-average для мониторинга: “среднее CPU за последние 5 минут, обновляемое каждую минуту”.
  • Нужно сглаживание метрик с регулярными обновлениями.
  • Алертинг должен срабатывать при пересечении порога в любом 5-минутном окне.

Размерности:

  • size/slide = 1 — это tumbling.
  • slide < size — перекрытие, классический sliding.
  • slide > size — gap-окна, события между окнами теряются (редко полезно).

Count windows: альтернатива по количеству

Параллельно временным окнам существуют count-окна — фиксированные по количеству событий:

// Каждые 100 событий per ключ — tumbling
.countWindow(100)

// Каждые 50 новых событий — окно из последних 100
.countWindow(100, 50)

Count-окна не учитывают время — они закрываются только когда накопится достаточно событий. Это плохо для unbounded streams: если поток замедлился, окно никогда не закроется. На практике count windows используют редко — почти всегда лучше time-based.

NOTE

Count windows не работают с watermarks. Если для ключа долго нет событий, последний накопленный count никогда не emit-ится. Если вам нужна гарантия “обработать накопленное к моменту X”, используйте event-time tumbling с triggers (см. урок про triggers).


Tumbling и sliding — это два класса встроенных window assigners. Полный список для DataStream API:

AssignerТип
TumblingEventTimeWindowsTumbling, event time
TumblingProcessingTimeWindowsTumbling, processing time
SlidingEventTimeWindowsSliding, event time
SlidingProcessingTimeWindowsSliding, processing time
EventTimeSessionWindowsSession, event time (см. урок про session windows)
ProcessingTimeSessionWindowsSession, processing time
GlobalWindowsОдин глобальный window, нужен явный trigger
DynamicEventTimeSessionWindowsSession с динамическим gap

Event time vs processing time — фундаментальный выбор (см. модуль 06). Кратко: event time использует timestamp из самих событий и watermarks, processing time — системные часы TaskManager. Для production-аналитики практически всегда event time.


Полный пример: agg + ProcessWindowFunction

Часто комбинируют incremental aggregation и ProcessWindowFunction для получения и эффективной агрегации, и метаданных окна:

public class MaxAggregate implements AggregateFunction<Order, Long, Long> {
    @Override public Long createAccumulator() { return Long.MIN_VALUE; }
    @Override public Long add(Order o, Long max) { return Math.max(max, o.amount); }
    @Override public Long getResult(Long max) { return max; }
    @Override public Long merge(Long a, Long b) { return Math.max(a, b); }
}

public class MaxOrderProcessFunction
        extends ProcessWindowFunction<Long, MaxOrderReport, String, TimeWindow> {
    @Override
    public void process(String shopId, Context ctx, Iterable<Long> maxes,
                        Collector<MaxOrderReport> out) {
        long max = maxes.iterator().next();  // ровно одно значение от aggregate
        out.collect(new MaxOrderReport(
            shopId,
            ctx.window().getStart(),
            ctx.window().getEnd(),
            max
        ));
    }
}

DataStream<MaxOrderReport> result = orders
    .keyBy(Order::getShopId)
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .aggregate(new MaxAggregate(), new MaxOrderProcessFunction());

Преимущество паттерна: MaxAggregate обновляет state на каждое событие O(1), не буферизуя; MaxOrderProcessFunction получает уже агрегированное значение плюс контекст окна (start, end, window kind) для emit с метаданными.


Параметры production-job

Несколько важных настроек, которые влияют на windows:

pipeline.auto-watermark-interval (по умолчанию 200ms). Как часто оператор испускает watermarks. Слишком часто — overhead; слишком редко — задержка закрытия окон.

table.exec.emit.early-fire.enabled (для Table API). Включает early-fire — окно эмиттит промежуточный результат до закрытия.

pipeline.time-characteristic в Flink 1.12+ устарела — теперь определяется выбором assigner-а (TumblingEventTime vs TumblingProcessingTime).


Попробуй сам

  1. Tumbling vs Sliding throughput. Запустите job с TumblingEventTime(5min) и SlidingEventTime(5min, 1min) на одном потоке 10K событий/сек. Замерьте память на оператор: sliding должен использовать в ~5 раз больше state.

  2. Incremental vs ProcessWindowFunction. Реализуйте sum через ReduceFunction и через ProcessWindowFunction (буфер всех событий). Прогоните на 100K событий per ключ. ProcessWindowFunction должен показать значительно большее использование heap.

  3. Late event behavior. В job с TumblingEventTime(1min) пошлите событие с timestamp за 10 минут до текущего watermark. Без allowed lateness оно будет dropped. Включите .allowedLateness(Duration.ofMinutes(15)) и убедитесь, что окно re-emit-ится с включением late события.

Проверка знанийKnowledge check
Команда хочет вычислять moving average response time для алертинга: 'если средний за последние 5 минут > 200ms, алерт'. Хотят обновление каждые 30 секунд для быстрой реакции. Какой window assigner выбрать и какие параметры?
ОтветAnswer
Это типичный кейс для sliding window. SlidingEventTimeWindows.of(Duration.ofMinutes(5), Duration.ofSeconds(30)) — size 5 минут, slide 30 секунд. Каждые 30 секунд закрывается окно, покрывающее последние 5 минут. Альтернатива через tumbling 30-секундных окон + ручная агрегация последних 10 окон в downstream-операторе сложнее и хуже соответствует семантике 'moving average'. Важно использовать AggregateFunction для инкрементальной агрегации (sum + count для среднего), не ProcessWindowFunction — иначе при slide=30s и size=5min каждое событие хранится в 10 окнах в виде сырых данных, что катастрофично для памяти. Дополнительно: рассмотрите ProcessingTimeWindows если не нужна 100% корректность по event time и latency критична — processing time не зависит от watermarks и быстрее emit-ит.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Команда выбрала SlidingEventTimeWindows.of(Duration.ofHours(1), Duration.ofMinutes(1)) для расчёта rolling-average последнего часа с обновлением каждую минуту. При throughput 10K событий/сек и 100K активных ключей оператор начинает падать с OutOfMemoryError. В чём причина и как лечить?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5