Learning Platform
Глоссарий Troubleshooting
Урок 16.02 · 25 мин
Продвинутый
Pattern APIFlinkCEPConditionsQuantifiersContiguityIterative conditions

Pattern API: begin, where, followedBy, quantifiers

В прошлом уроке мы посмотрели concepts CEP и NFA на high level. Теперь — practical API: как ты описываешь patterns в Java/Scala коде. Это language-level API, который компилируется в NFA под капотом.

В этом уроке: полный гид по Pattern API, contiguity (next vs followedBy vs followedByAny vs notFollowedBy), quantifiers (oneOrMore, times), conditions (simple, iterative, против shared state), и реальные code-патterns.

CEP в Flink: первый паттерн и alerts

Basic skeleton

Pattern строится через fluent API:

Pattern<Event, ?> pattern = Pattern
    .<Event>begin("step1")
    .where(SimpleCondition.of(e -> e.getType().equals("LOGIN")))
    .followedBy("step2")
    .where(SimpleCondition.of(e -> e.getType().equals("CLICK")))
    .within(Time.minutes(5));

Breakdown:

  • Pattern.<Event>begin("step1"): создаёт pattern с initial pattern step “step1”. Generic <Event> — input element type.
  • .where(condition): условие для текущего step. Event matches только если condition true.
  • .followedBy("step2"): добавляет next step с relaxed contiguity. Step имеет имя “step2” — по нему достаём events из match.
  • .within(Time.minutes(5)): time constraint для всего pattern.

? в Pattern<Event, ?> — это generic parameter для “current pattern step type”, evolves через chain.


Conditions: SimpleCondition

SimpleCondition.of(lambda) — stateless condition на одиночный event:

.where(SimpleCondition.of(event -> event.getAmount() > 1000))

// or
.where(SimpleCondition.of(event -> event.getType().equals("PURCHASE") && event.getAmount() > 1000))

Lambda берёт event, возвращает boolean. Без state — выполнение зависит только от event content.

Под капотом — applied к each candidate event. Если true — proceed (TAKE transition в NFA). Если false — IGNORE (или fail, зависит от contiguity).


Conditions: IterativeCondition

IterativeCondition — stateful, access to previously matched events в текущем partial match:

.where(new IterativeCondition<Event>() {
    @Override
    public boolean filter(Event candidate, Context<Event> ctx) throws Exception {
        // Доступ к events from previous steps
        double avgPrevious = StreamSupport.stream(ctx.getEventsForPattern("step1").spliterator(), false)
            .mapToDouble(Event::getAmount)
            .average()
            .orElse(0);

        // Candidate matches только если значительно больше average
        return candidate.getAmount() > avgPrevious * 2;
    }
})

Context API:

  • ctx.getEventsForPattern("step1"): iterable событий, matched в “step1” в current partial match.
  • ctx.timestamp(): timestamp of current event being evaluated.
  • ctx.currentProcessingTime().

Use case: dynamic conditions where threshold depends on previous events. Например, “purchase amount > 2x average of last 5 purchases”.

WARNING

IterativeCondition expensive — каждый event evaluation reads previously matched events. Для длинных partial matches (oneOrMore с many events) это O(n) per event evaluation, O(n^2) overall. Use sparingly. Если нужны running aggregations — better: precompute через regular Flink operator before CEP.


Contiguity: next, followedBy, followedByAny, notFollowedBy

Самый важный choice — contiguity. Это определяет, что happens between matched events.

.next("step"): strict

.next("immediate")
.where(SimpleCondition.of(e -> e.getType().equals("X")))

Immediately after previous step’s event. NO intervening events. Если between previous match и this step есть other events — fail.

Example sequence A, B, C:

  • Pattern: begin("a").where(A).next("b").where(B).
  • Matches [A, B]. Does NOT match [A, C, B] (C between).

Use case: rigid sequences (no allowable interleaving). Rare в practice — most real-world patterns relaxed.

Contiguity сравнение: A followed by B
SequenceA, X, BTest sequence — A, intervening event X, потом B
.next(B)NO matchStrict contiguity. X между A и B — violation. Match не emitted
.followedBy(B)[A, B] matchRelaxed contiguity. X ignored, first matching B used. Match [A, B] emitted (X не included)
.followedByAny(B)[A, B] matchSame as followedBy для single B. С multiple B-events (A, B1, B2) emit [A, B1] AND [A, B2] — multiple matches
SequenceA, B1, B2Sequence — A, потом два B-events
.next(B)[A, B1]Strict — first B immediately after A. B2 не considered (после first match strategy default)
.followedBy(B)[A, B1]Relaxed greedy — first matching B. B2 не used
.followedByAny(B)[A, B1] AND [A, B2]Non-deterministic — все matching B-events create separate matches. State explosion warning

.followedBy("step"): relaxed

.followedBy("later")
.where(SimpleCondition.of(e -> e.getType().equals("Y")))

Eventually after previous step. Other events можно между. Greedy: takes first matching event.

Example: A, X, Y, B match for begin("a").where(A).followedBy("b").where(B):

  • A matches “a”, X не match B (ignore), Y не match B (ignore), B matches “b”. Match = [A, B].
  • Even if multiple Bs in stream, only first one selected (greedy).

Use case: most common. “Login, then eventually checkout” — events between OK.

.followedByAny("step"): non-deterministic relaxed

.followedByAny("any")
.where(SimpleCondition.of(e -> e.getType().equals("Z")))

Like followedBy, но все matching events создают separate matches.

Example: A, B1, B2 для begin("a").where(A).followedByAny("b").where(B):

  • Two matches: [A, B1] AND [A, B2].

Use case: “after login, find each click” — каждый click — separate match.

Cost: state explosion — каждый matching B starts new partial match. Use carefully.

.notFollowedBy("step"): violation

.notFollowedBy("absent")
.where(SimpleCondition.of(e -> e.getType().equals("CANCEL")))

Specifies “previous step should NOT be followed by event matching this”. If matching event found — partial match aborts.

Example: A.followedBy(B).notFollowedBy(C).

  • Sequence A, B, C: A matches, B matches, then C found — abort, no match.
  • Sequence A, B (без C, within timeout): match emitted as [A, B].

Use case: “user added to cart, did NOT checkout within 30 minutes” — abandoned cart.

notFollowedBy ОБЯЗАТЕЛЬНО с within() — иначе wait forever.


Quantifiers

Default: каждый pattern step matches exactly one event. Quantifiers расширяют это.

.oneOrMore()

.where(SimpleCondition.of(e -> e.getType().equals("CLICK")))
.oneOrMore()
.greedy()

Matches 1+ events. Default greedy — as many as possible.

Sequence X, Y, Y, Y, Z:

  • Pattern begin("clicks").where(Y).oneOrMore() matches [Y, Y, Y].

.times(N)

.times(3)  // exactly 3
.times(3, 5)  // between 3 and 5
.timesOrMore(3)  // 3+

Specifies exact count or range. times(M, N) — bound on both ends.

Combining с contiguity

Quantifiers + contiguity create flexible patterns:

.followedBy("attacks")
.where(SimpleCondition.of(e -> e.getType().equals("FAILED_LOGIN")))
.timesOrMore(5)         // 5 or more failed logins
.consecutive()          // consecutive — no other events between failed logins
.greedy()
.within(Time.minutes(5))

.consecutive() — events для этого step должны быть consecutive в stream (no interleaving). Без — relaxed (other events allowed between).

.greedy() — для quantifier: take as many events as possible vs as few as possible.


Until conditions

.until(condition) — stop quantifier matching когда condition true:

.where(SimpleCondition.of(e -> e.getType().equals("CLICK")))
.oneOrMore()
.until(SimpleCondition.of(e -> e.getType().equals("PURCHASE")))

Matches CLICKs до тех пор, пока не появится PURCHASE. PURCHASE itself не included в match (it’s terminating event, not part of pattern).

Use case: “all clicks before purchase” — все клики, что были до purchase event.


Optional events

.optional() — event may or may not be present:

.followedBy("optionalEvent")
.where(SimpleCondition.of(e -> e.getType().equals("BONUS")))
.optional()
.followedBy("mainEvent")
.where(SimpleCondition.of(e -> e.getType().equals("PURCHASE")))

Match можно с или без “optionalEvent”. Если BONUS перед PURCHASE — included в match. Если nope — partial match still successful.


Complete realistic example: credit card fraud

Pattern: “small purchase (11-10), small purchase (11-10), big purchase ($500+), within 1 hour, all on same card”. Classic stolen card behavior — small tests, then big.

Pattern<Transaction, ?> fraudPattern = Pattern
    .<Transaction>begin("test1")
    .where(SimpleCondition.of(t -> t.getAmount() >= 1 && t.getAmount() <= 10))
    .followedBy("test2")
    .where(SimpleCondition.of(t -> t.getAmount() >= 1 && t.getAmount() <= 10))
    .followedBy("big")
    .where(SimpleCondition.of(t -> t.getAmount() >= 500))
    .within(Time.hours(1));

DataStream<Transaction> transactions = env.fromSource(...);

DataStream<Transaction> keyed = transactions.keyBy(Transaction::getCardId);

PatternStream<Transaction> patterns = CEP.pattern(keyed, fraudPattern);

DataStream<FraudAlert> alerts = patterns.process(
    new PatternProcessFunction<Transaction, FraudAlert>() {
        @Override
        public void processMatch(
            Map<String, List<Transaction>> match,
            Context ctx,
            Collector<FraudAlert> out) {

            Transaction test1 = match.get("test1").get(0);
            Transaction test2 = match.get("test2").get(0);
            Transaction big = match.get("big").get(0);

            FraudAlert alert = new FraudAlert(
                big.getCardId(),
                "Suspicious transaction sequence",
                List.of(test1.getTxnId(), test2.getTxnId(), big.getTxnId()),
                big.getAmount()
            );
            out.collect(alert);
        }
    });

alerts.sinkTo(alertSink);

Это full real example. KeyBy by cardId — per-card matching. Conditions select transactions in expected amount ranges. within(1 hour) bounds search window. PatternProcessFunction emits alert when match found.


Within: required для almost every pattern

Без within() — partial matches могут жить forever:

// BAD: no within
Pattern.<Event>begin("login")
    .where(e -> e.getType().equals("LOGIN"))
    .followedBy("anything")
    .where(e -> e.getType().equals("WHATEVER"));

Если LOGIN event arrives, и потом никогда WHATEVER — partial match (with LOGIN) stays в state FOREVER. With 1M users logging in daily, state grows unbounded.

// GOOD: with within
Pattern.<Event>begin("login")
    .where(e -> e.getType().equals("LOGIN"))
    .followedBy("anything")
    .where(e -> e.getType().equals("WHATEVER"))
    .within(Time.minutes(30));

После 30 минут без match — partial match discarded. Bounded state.

Watermark-driven cleanup: Flink CEP uses event-time watermarks по умолчанию. Когда watermark > start_time + within, partial match prunable.


AfterMatchSkipStrategy

Когда match found, what do with overlapping partial matches?

// Default: NO_SKIP — все matches emitted, overlap OK
PatternStream<Event> stream = CEP.pattern(keyedStream, pattern);

// Skip to next event
PatternStream<Event> stream = CEP.pattern(
    keyedStream,
    pattern.withSkipStrategy(AfterMatchSkipStrategy.skipToNext()));

// Skip to first event of next pattern (skip partial matches involving same events)
.withSkipStrategy(AfterMatchSkipStrategy.skipPastLastEvent());

// Skip to last (only most recent match emitted)
.withSkipStrategy(AfterMatchSkipStrategy.skipPastFirstEvent("step1"));

Strategies:

  • NO_SKIP: all matches emitted (default).
  • skipToNext: after match, skip to next event in stream (drops overlapping matches with same start).
  • skipPastLastEvent: after match, skip past all events in this match (drops matches using same events).
  • skipPastFirstEvent(stepName): skip past first event in specific step.

For fraud detection, often skipPastLastEvent — once detected, don’t double-alert on overlapping.


Pattern.times within Pattern

Можно nested patterns:

Pattern<Event, ?> innerPattern = Pattern
    .<Event>begin("a")
    .where(SimpleCondition.of(e -> e.getType().equals("A")))
    .next("b")
    .where(SimpleCondition.of(e -> e.getType().equals("B")));

// outer pattern not directly composable — Flink CEP не supports
// nested patterns в Pattern API

Flink CEP не имеет nested patterns natively. Если нужно more complex composition — manually через ProcessFunction (lower level) или MATCH_RECOGNIZE SQL (next lesson).


Output handling

После CEP detect match, ты handle через PatternProcessFunction (modern) или PatternSelectFunction (legacy):

// PatternProcessFunction — full power
patternStream.process(new PatternProcessFunction<Event, Alert>() {
    @Override
    public void processMatch(
        Map<String, List<Event>> match,
        Context ctx,
        Collector<Alert> out) {
        // Access matched events
        // Access ctx (timestamp, side outputs)
        // Emit alerts
    }
});

// For timeout matches (partial matches that exceeded within())
patternStream.process(new PatternProcessFunction<Event, Alert>() {
    @Override
    public void processMatch(...) { ... }

    // Override timeout handler
    @Override
    public void processTimedOutMatch(
        Map<String, List<Event>> match,
        Context ctx) {
        // Partial match timed out
        // E.g., LOGIN happened, but no CHECKOUT within window
        // Emit "abandoned cart" alert
        ctx.output(timedOutTag, new Alert("Abandoned", match));
    }
});

processTimedOutMatch — useful для negative patterns (e.g., “abandoned cart” = login + add_to_cart + NO checkout). Emit via side output.


Production-перспектива: pattern complexity tradeoffs

Performance scales с pattern complexity:

ComplexityCost factor
Simple sequence (3-4 steps, no quantifiers)1x
Sequence with greedy quantifier5-10x state, 2-5x CPU
followedByAny10-100x state (combinatorial)
IterativeCondition5-10x CPU
Long within() windowlinear с window size

Heuristic: pattern с greedy oneOrMore + followedByAny + long window = potentially impossible to maintain в production. Cap quantifiers с times(M, N), use selective conditions, short within.

Watch metrics:

  • numberOfPendingPartialMatches — state size proxy.
  • numLateRecordsDropped — events too late vs watermark.
  • RocksDB state size growth.

If state growing fast — pattern либо incorrectly written (broad conditions), либо genuinely needs that much state. Profile, optimize, или re-architect.


Проверка знанийKnowledge check
Команда хочет detect 'user did login -> did at least 1 click -> did NOT make purchase within 10 minutes'. Реализация: Pattern.begin('login').followedBy('click').oneOrMore().followedBy('purchase').optional().within(10 min). Какие 2 problems с этой реализацией и как fix?
ОтветAnswer
Problem 1: 'purchase'.optional() не делает то что хочется. Optional значит 'может быть или нет' — но pattern всё равно ждёт его и emit match (с purchase or without). Если purchase happens, regular match emitted — это OPPOSITE of intended (хотят alert when NO purchase). Если purchase не happens, partial match expires by within без emit (timeout handler нужен). Так что 'no purchase' alert не emitted вообще. Problem 2: oneOrMore() greedy — accumulate clicks indefinitely. Если user clicks 1000 раз в 10 min без purchase, partial match holds 1000 click events в SharedBuffer. State explodes. Fixes: (1) Использовать processTimedOutMatch для emit 'abandoned cart' alert: при partial match timeout (within() exceeded), it means LOGIN + CLICK happened но NO PURCHASE — exactly what we want. PatternProcessFunction.processTimedOutMatch(match, ctx) -> emit alert. Не нужно .optional() для purchase — instead use notFollowedBy implicit: pattern = login.followedBy(click).oneOrMore() with within. Если purchase doesn't happen -> timeout -> alert. (2) For click oneOrMore — cap с times(1, 100) reasonable upper bound, OR use times(1) (just one click required, не all clicks tracked). Также: .consecutive() если хочется ignore other events between clicks (only count consecutive clicks). Better pattern: login.followedBy(click).times(1).within(10 min). Process timeout -> alert 'abandoned'. State limited per user к 1 login + 1 click + timer, much smaller.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Разница между .next(), .followedBy(), .followedByAny() в Flink CEP Pattern API?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4