Triggers и evictors
Window assigner определяет, в какое окно попадает событие. Window function определяет, что считать. Между ними есть третий компонент — trigger, который решает когда окно эмиттит результат. И четвёртый, опциональный — evictor, который может удалять элементы из окна до или после вызова window function.
По умолчанию вы получаете “правильное” поведение без явных триггеров: event-time окно эмиттит при прохождении watermark. Но для специфических случаев (раннее emit, повторное emit при поздних событиях, периодические снапшоты) нужны кастомные триггеры.
Что делает trigger
Trigger — это state machine, которая на каждое событие, watermark или таймер принимает решение из четырёх возможностей:
CONTINUE— ничего не делаем, продолжаем буферизовать.FIRE— emit текущий результат окна, но окно остаётся в памяти (state не очищается).PURGE— очистить окно (удалить state), но не emit.FIRE_AND_PURGE— emit + очистить state.
Каждый trigger реализует интерфейс:
public abstract class Trigger<T, W extends Window> {
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx);
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx);
public abstract void clear(W window, TriggerContext ctx);
}
Триггер вызывается для каждого окна отдельно — state триггера живёт в ctx.getPartitionedState(...).
EventTimeTrigger: дефолт для event-time окон
Стандартный trigger для всех event-time window assigners. Логика:
onElement: если новое событие имеет timestamp за пределами текущего watermark — это late event, обрабатывается отдельно. Иначе регистрирует event-time timer наwindow.maxTimestamp()и возвращаетCONTINUE.onEventTime: если timer соответствуетwindow.maxTimestamp()— возвращаетFIRE_AND_PURGE. Это означает: при прохождении watermark за конец окна — эмит и очистка.
Простыми словами: “эмиттит окно, когда watermark уходит за его конец”. Это то поведение, которое 99% случаев и хочется.
events.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(5)))
// EventTimeTrigger используется по умолчанию
.reduce(...);
ProcessingTimeTrigger: дефолт для processing-time окон
Аналог EventTimeTrigger, но работает на wall-clock TaskManager-а:
onElement: регистрирует processing-time timer наwindow.maxTimestamp().onProcessingTime: при срабатывании timer —FIRE_AND_PURGE.
Используется автоматически с TumblingProcessingTimeWindows, SlidingProcessingTimeWindows. Эмит происходит когда системное время TaskManager-а доходит до конца окна — без зависимости от watermarks.
ProcessingTimeTrigger даёт самый быстрый emit (нет ожидания watermarks), но результат может быть некорректным для late или out-of-order событий. Используйте только когда latency критичнее точности — например, для оперативных дашбордов с обновлением раз в минуту, где небольшая погрешность приемлема.
CountTrigger: эмит каждые N событий
Эмиттит окно, когда количество элементов достигает порога:
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
events.keyBy(Event::getKey)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1000)) // FIRE каждые 1000 событий
.reduce(...);
Полезно с GlobalWindows, которые сами по себе не закрываются. CountTrigger превращает globalwindow в окно “каждые N событий”.
Можно комбинировать с другими triggers через PurgingTrigger:
.trigger(PurgingTrigger.of(CountTrigger.of(1000)))
PurgingTrigger превращает любой FIRE в FIRE_AND_PURGE — окно очищается после emit. Без этого CountTrigger будет эмитить накопительный результат каждые N событий без очистки.
Custom trigger: пример
Сценарий: tumbling-окно 1 час, но эмиттит промежуточный результат каждые 10 минут (для дашборда), и финальный — при закрытии окна.
public class EarlyFireTrigger extends Trigger<Object, TimeWindow> {
private final long interval = Duration.ofMinutes(10).toMillis();
private final ValueStateDescriptor<Long> lastFiredDescriptor =
new ValueStateDescriptor<>("last-fired", Long.class);
@Override
public TriggerResult onElement(Object element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerEventTimeTimer(window.maxTimestamp());
ValueState<Long> lastFired = ctx.getPartitionedState(lastFiredDescriptor);
Long last = lastFired.value();
long now = ctx.getCurrentWatermark();
if (last == null) last = window.getStart();
if (now - last >= interval) {
lastFired.update(now);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
if (time == window.maxTimestamp()) return TriggerResult.FIRE_AND_PURGE;
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
ctx.getPartitionedState(lastFiredDescriptor).clear();
}
}
Применение:
events.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
.trigger(new EarlyFireTrigger())
.reduce(...);
Здесь окно будет эмитить каждые 10 минут partial result (без очистки state) и финальный — на закрытии. Если ниже стоит ProcessWindowFunction, при каждом FIRE она получит все события окна на этот момент.
При раннем FIRE без PURGE накапливается state до закрытия окна. Это окей для маленьких окон, но критично для длинных и burst-трафика. Тщательно продумывайте баланс между partial result frequency и memory cost.
Trigger и watermarks
Trigger полагается на watermark для event-time решений. Если watermarks застряли (например, idle partition), event-time triggers не срабатывают:
- EventTimeTrigger никогда не FIRE — окно не закроется.
- Custom triggers, ожидающие onEventTime, тоже зависнут.
Это очень частая проблема в production. Симптомы:
- Все окна копят данные, но не emit-ят.
- В Web UI watermark показан как
Long.MIN_VALUEили давно отстающим. - Backpressure не нарастает, но output отсутствует.
Решение — withIdleness на WatermarkStrategy:
WatermarkStrategy<Event>
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withIdleness(Duration.ofMinutes(1));
Это позволит партиции, не получающей событий, помечать себя idle, что разморозит watermark на уровне всего job-а. См. модуль 06 (event time и watermarks) для деталей.
Idle partitions в Kafka: схожая проблема застрявшего прогрессаEvictors: удаление элементов из окна
Evictor — оператор, который может удалить элементы из окна перед (evictBefore) или после (evictAfter) вызова window function.
Встроенные evictors:
CountEvictor.of(N)— оставить только последние N элементов в окне.TimeEvictor.of(Duration.ofMinutes(5))— удалить элементы старше 5 минут от max timestamp в окне.DeltaEvictor— удалить элементы по custom предикату.
events.keyBy(Event::getKey)
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(10)))
.evictor(CountEvictor.of(100)) // оставить только последние 100
.process(new TopKFunction());
Evictor требует, чтобы все события окна были в памяти — он несовместим с инкрементальной агрегацией. Использование .evictor() автоматически отключает оптимизацию ReduceFunction/AggregateFunction; Flink буферизует все события. Поэтому evictor нужен очень редко и только когда логика требует видеть/фильтровать сырые события.
Триггеры и поздние события
При allowedLateness > 0, окно не сразу очищается при FIRE_AND_PURGE — оно живёт в state дополнительное время для обработки late events. Trigger продолжает получать onElement для late events и может решить эмитить дополнительные результаты.
Стандартное поведение EventTimeTrigger при late events:
@Override
public TriggerResult onElement(...) {
if (timestamp < window.maxTimestamp()) {
// событие нормальное или late внутри allowed lateness
return TriggerResult.FIRE; // re-emit окна с обновлёнными данными
}
return TriggerResult.CONTINUE;
}
То есть: late event может вызвать ре-emit окна с включением этого события. Downstream должен быть готов к “обновлённым” значениям окна — типично через upsert-семантику. См. урок 05 (allowed lateness).
Practical patterns с триггерами
Pattern 1: progressive emit для real-time monitoring
Хочется видеть aggregated value не раз в час, а каждые 30 секунд:
// Custom trigger с periodic FIRE без PURGE
.trigger(ContinuousEventTimeTrigger.of(Duration.ofSeconds(30)))
Output: каждые 30 секунд emit с partial-aggregated данными. Финальный emit на закрытии окна.
Pattern 2: count-based с time fallback
Эмит каждые 1000 событий ИЛИ каждые 10 минут — что наступит раньше:
// Композиция Or-trigger: any of CountTrigger or ContinuousEventTime
// (вам придётся написать собственный composite)
Pattern 3: trigger по бизнес-событию
Окно собирает события сессии; эмит при появлении ‘checkout’ event:
@Override
public TriggerResult onElement(Event e, ...) {
if ("checkout".equals(e.type)) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
Это альтернатива sessionization через KeyedProcessFunction — если business logic простая, кастомный trigger в стандартном окне может быть проще.
Что не делать
- Не используйте PurgingTrigger без понимания. Он добавляет PURGE ко всем FIRE — если ваша downstream-логика ожидает накопительный результат, вы её сломаете.
- Не комбинируйте evictor с большими окнами. Это уничтожает инкрементальную оптимизацию. Если хотите filter — делайте map перед window.
- Не пишите custom trigger ‘просто потому что’. EventTimeTrigger покрывает большинство случаев. Custom triggers — это серьёзный код, который надо тестировать на корректность.
Попробуй сам
-
Early-fire effect on memory. Запустите два job-а: один с EventTimeTrigger по умолчанию, другой с ContinuousEventTimeTrigger каждые 10 секунд. На потоке 100K событий/сек в часовом окне сравните размер heap. Continuous trigger без PURGE должен буферизовать всё до часа — много больше памяти.
-
CountEvictor effect on aggregation. Реализуйте average на ReduceFunction в окне 5 минут. Прогоните 10K событий и снимите heap snapshot. Добавьте .evictor(CountEvictor.of(100)) и повторите — heap должен резко вырасти из-за отключения инкрементальной оптимизации.
-
Custom trigger на checkout-event. Реализуйте session window + custom trigger, который FIRE_AND_PURGE при появлении event.type=‘checkout’. Сравните с реализацией через KeyedProcessFunction (см. предыдущий модуль).