Learning Platform
Глоссарий Troubleshooting
Урок 08.01 · 22 мин
Средний
ProcessFunctionKeyedProcessFunctionprocessElementContextLow-Level API

ProcessFunction: основы

ProcessFunction — это низкоуровневое API Flink для работы с потоками. Когда вы упираетесь в ограничения высокоуровневых операций (windows, joins, reduce/aggregate), ProcessFunction даёт полный контроль: доступ к event time и watermarks, регистрация timers, работа со state, side outputs, fine-grained управление emit.

В этом уроке — базы: KeyedProcessFunction, метод processElement, контекст и его возможности. Timers и side outputs — в следующих уроках.


Когда нужна ProcessFunction

ProcessFunction оправдана, когда:

  • Нужна логика, не вписывающаяся в windows. Например, многоступенчатая FSM, sessionization с custom rules, transformations зависящие от истории.
  • Нужны timers для отложенных действий (timeout pending orders, watchdog для unbounded state).
  • Нужны side outputs для maintenance вроде DLQ или alerts вне основного потока.
  • Custom watermark handling — реакция на onTimer для work поверх watermarks.

Высокоуровневое API (window, reduce, aggregate) проще и понятнее — используйте его, когда оно покрывает требования. ProcessFunction — это инструмент когда нужно больше контроля.

Внутренняя архитектура state backends: как хранятся keyed state данные

KeyedProcessFunction: типичный сигнатура

Большинство production использует KeyedProcessFunction<K, IN, OUT> — применяется к keyed stream, имеет доступ к keyed state.

public class FraudDetector extends KeyedProcessFunction<String, Transaction, Alert> {

    private transient ValueState<Double> dailySpending;

    @Override
    public void open(OpenContext openContext) {
        dailySpending = getRuntimeContext().getState(
            new ValueStateDescriptor<>("daily-spending", Double.class));
    }

    @Override
    public void processElement(Transaction tx,
                                Context ctx,
                                Collector<Alert> out) throws Exception {
        Double current = dailySpending.value();
        if (current == null) current = 0.0;
        current += tx.amount;
        dailySpending.update(current);

        if (current > 10000) {
            out.collect(new Alert(ctx.getCurrentKey(), "DAILY_LIMIT_EXCEEDED", current));
        }
    }
}

transactions
    .keyBy(Transaction::getAccountId)
    .process(new FraudDetector());

Типовые параметры:

  • K — тип ключа (String для account_id).
  • IN — тип входных событий (Transaction).
  • OUT — тип выходных (Alert).

Методы:

  • processElement(IN value, Context ctx, Collector<OUT> out) — обрабатывает каждое событие.
  • onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) — обрабатывает timers (см. следующий урок).
  • open(OpenContext) — инициализация state и других ресурсов.
  • close() — cleanup.

Python:

from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types

class FraudDetector(KeyedProcessFunction):
    def __init__(self):
        self.daily_spending = None

    def open(self, runtime_context):
        descriptor = ValueStateDescriptor("daily-spending", Types.DOUBLE())
        self.daily_spending = runtime_context.get_state(descriptor)

    def process_element(self, tx, ctx):
        current = self.daily_spending.value() or 0.0
        current += tx.amount
        self.daily_spending.update(current)
        if current > 10000:
            yield Alert(ctx.get_current_key(), "DAILY_LIMIT_EXCEEDED", current)

transactions \
    .key_by(lambda t: t.account_id) \
    .process(FraudDetector())

Context: API для работы с runtime

Context (для processElement) и OnTimerContext (для onTimer) — это API, через которые ProcessFunction взаимодействует с runtime Flink:

@Override
public void processElement(Transaction tx, Context ctx, Collector<Alert> out) throws Exception {
    // Текущий ключ
    String accountId = ctx.getCurrentKey();

    // Timestamp текущего события (может быть null если нет timestamp assigner)
    Long eventTimestamp = ctx.timestamp();

    // Текущий watermark
    long currentWatermark = ctx.timerService().currentWatermark();

    // Текущий processing time
    long currentProcessingTime = ctx.timerService().currentProcessingTime();

    // Регистрация timers (детально в след. уроке)
    ctx.timerService().registerEventTimeTimer(eventTimestamp + 60000);
    ctx.timerService().registerProcessingTimeTimer(currentProcessingTime + 10000);

    // Side output (детально в уроке про side outputs)
    OutputTag<String> debugTag = new OutputTag<>("debug"){};
    ctx.output(debugTag, "processed " + tx.id);

    // Emit в основной поток
    out.collect(new Alert(...));
}

Ключевые возможности:

  • getCurrentKey() — узнать ключ, для которого вызывается метод (полезно для логирования и emit).
  • timestamp() — event timestamp текущего события (null если timestamp assigner не настроен).
  • timerService() — доступ к watermarks, processing time, регистрации timers.
  • output(tag, value) — отправка в side output.

ProcessFunction vs KeyedProcessFunction

Помимо KeyedProcessFunction существует базовая ProcessFunction<IN, OUT> — для не-keyed streams. Различия:

АспектProcessFunctionKeyedProcessFunction
Применениеstream.process(...)keyedStream.process(...)
Доступ к keyed stateНЕТДА
TimersPer-operator (один на весь оператор)Per-key (отдельный для каждого ключа)
getCurrentKey()НЕТДА

В подавляющем большинстве случаев нужен KeyedProcessFunction — keyed state и per-key timers критичны для production-логики. Базовый ProcessFunction используется редко, обычно для broadcast-state pattern или для логики, реально не зависящей от ключа.


Доступ к state в processElement

State работает так же, как в RichFunction (см. модуль 04):

public class StatefulProcessor extends KeyedProcessFunction<String, Event, Result> {

    private transient ValueState<Long> lastSeenTs;
    private transient MapState<String, Long> sessionCounters;
    private transient ListState<String> recentEventIds;

    @Override
    public void open(OpenContext ctx) {
        lastSeenTs = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-seen", Long.class));
        sessionCounters = getRuntimeContext().getMapState(
            new MapStateDescriptor<>("sessions", String.class, Long.class));
        recentEventIds = getRuntimeContext().getListState(
            new ListStateDescriptor<>("recent-ids", String.class));
    }

    @Override
    public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
        Long last = lastSeenTs.value();
        if (last == null || e.timestamp - last > 60000) {
            // ... логика new session
        }
        lastSeenTs.update(e.timestamp);
    }
}

State в ProcessFunction подчиняется тем же правилам:

  • Декларируется в open() через дескриптор.
  • Поле transient.
  • Состояние per-key (изолированное между ключами).
  • TTL через StateTtlConfig (см. модуль 04).

Пример: оповещение при первом событии за день

Реальный сценарий: оповещение admin-у, когда новый пользователь делает первое действие за день.

public class FirstSeenDailyDetector
        extends KeyedProcessFunction<String, UserAction, Notification> {

    private transient ValueState<Long> lastSeenDay;

    @Override
    public void open(OpenContext ctx) {
        lastSeenDay = getRuntimeContext().getState(
            new ValueStateDescriptor<>("last-day", Long.class));
    }

    @Override
    public void processElement(UserAction action, Context ctx,
                                Collector<Notification> out) throws Exception {
        long currentDay = action.timestamp / (86400 * 1000L);
        Long lastDay = lastSeenDay.value();

        if (lastDay == null || currentDay > lastDay) {
            // Первое событие пользователя за этот день
            out.collect(new Notification(
                ctx.getCurrentKey(),
                "First action of " + currentDay,
                action.timestamp
            ));
            lastSeenDay.update(currentDay);
        }
    }
}

userActions
    .keyBy(UserAction::getUserId)
    .process(new FirstSeenDailyDetector());

Эту задачу было бы сложно решить через windows: нужна не сумма за окно, а изменение состояния от события к событию. ProcessFunction — естественный выбор.


getRuntimeContext() vs Context

Часто путают:

  • getRuntimeContext() (наследуется от RichFunction) — даёт доступ к state, метрикам, distributed cache. Вызывается в open() обычно.
  • Context / OnTimerContext (параметр processElement / onTimer) — даёт доступ к timestamps, watermarks, timers, side outputs, current key. Вызывается внутри метода.
@Override
public void open(OpenContext ctx) {
    // RuntimeContext через getRuntimeContext()
    state = getRuntimeContext().getState(descriptor);
    metric = getRuntimeContext().getMetricGroup().counter("processed");
}

@Override
public void processElement(Event e, Context ctx, Collector<Result> out) throws Exception {
    // Context используется здесь
    String key = ctx.getCurrentKey();
    long ts = ctx.timestamp();
    ctx.timerService().registerEventTimeTimer(ts + 1000);
}

Производительность

ProcessFunction не имеет встроенной оптимизации windows (incremental aggregation, key group state co-location). Каждое событие — это вызов user-defined метода, который может делать что угодно.

Что важно для производительности:

  • Cheap state access. Минимизировать вызовы value()/update() в processElement. Если можно — кэшировать в transient field в пределах одного метода.
  • Лёгкая логика. Тяжёлая обработка (парсинг больших JSON, regex) — это backpressure. Часто стоит делать pre-processing в map перед process().
  • Async I/O. Если нужны внешние вызовы (DB lookup, API call) — используйте Async I/O Function (модуль 09), не блокирующие синхронные вызовы.

ProcessFunction vs window function: когда что

СценарийЧто использовать
Sum/count/avg в фиксированном окнеwindow + AggregateFunction
Top-K, percentiles в малом окнеwindow + ProcessWindowFunction
Sessionization с gap-onlywindow + EventTimeSessionWindows
Sessionization с business-event closeKeyedProcessFunction
FSM (multi-state, transitions)KeyedProcessFunction
Timeout pending operationsKeyedProcessFunction + timers
Pattern detection (CEP)KeyedProcessFunction или Flink CEP
Stream-to-stream lookupsAsync I/O или KeyedProcessFunction
Side outputs (DLQ, debug)KeyedProcessFunction или window function

Production-чеклист

  • Декларируйте все state в open(). Не используйте transient ленивую инициализацию в processElement.
  • Метрики через getRuntimeContext().getMetricGroup() — счётчики event-ов, latency-гистограммы.
  • Логирование — не на каждое событие. Лучше — sampled (например, каждое 1000-е) или через metrics.
  • При работе с timers — обязательно clear() в onTimer или logic для предотвращения утечек (см. след. урок).
  • Тестирование — используйте KeyedOneInputStreamOperatorTestHarness (есть в flink-test-utils) для unit-тестов с timer manipulation.

Попробуй сам

  1. Simple counter. Реализуйте KeyedProcessFunction, который для каждого user_id считает количество событий и emit-ит каждое 100-е. Используйте ValueState<Long>.

  2. Get current key check. Реализуйте функцию, которая проверяет, что ctx.getCurrentKey() возвращает ожидаемое значение в processElement. Сравните с тем, что вы передавали в keyBy — должны совпадать.

  3. Context vs RuntimeContext. Логируйте в processElement: timestamp, watermark, currentProcessingTime через ctx; и состояние через getRuntimeContext().getState(). Убедитесь, что вы понимаете, какой API даёт что.

Проверка знанийKnowledge check
Команда решает: реализовать sessionization для интернет-магазина через EventTimeSessionWindows или через KeyedProcessFunction. Бизнес-логика: сессия = последовательность событий с gap < 30min, ИЛИ событие logout, ИЛИ checkout-completion — что наступит раньше. Какой подход выбрать и почему?
ОтветAnswer
Однозначно KeyedProcessFunction. EventTimeSessionWindows закрывает сессию только когда watermark проходит за window.end (= last_event_ts + gap). Нет встроенного механизма досрочного закрытия по бизнес-событию. KeyedProcessFunction даёт полный контроль: в processElement при обнаружении event.type='logout' или 'checkout_complete' сразу emit Session с накопленными данными (ValueState<SessionMeta> + ListState<Event>), clear state, и опционально удалить накопленные event-time timers через deleteEventTimeTimer. Это превращает 'gap-only' семантику в 'gap OR business event'. Альтернатива через custom Trigger в window — формально возможна, но сложнее: trigger вынужден взаимодействовать с window state и merge-операциями session windows, легко получить баги в edge cases (особенно при late events и merge нескольких окон). KeyedProcessFunction — более прямолинейное решение для composite session closing logic.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. В чём основная разница между ProcessFunction и KeyedProcessFunction?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3