Pattern API: begin, where, followedBy, quantifiers
В прошлом уроке мы посмотрели concepts CEP и NFA на high level. Теперь — practical API: как ты описываешь patterns в Java/Scala коде. Это language-level API, который компилируется в NFA под капотом.
В этом уроке: полный гид по Pattern API, contiguity (next vs followedBy vs followedByAny vs notFollowedBy), quantifiers (oneOrMore, times), conditions (simple, iterative, против shared state), и реальные code-патterns.
CEP в Flink: первый паттерн и alertsBasic skeleton
Pattern строится через fluent API:
Pattern<Event, ?> pattern = Pattern
.<Event>begin("step1")
.where(SimpleCondition.of(e -> e.getType().equals("LOGIN")))
.followedBy("step2")
.where(SimpleCondition.of(e -> e.getType().equals("CLICK")))
.within(Time.minutes(5));
Breakdown:
Pattern.<Event>begin("step1"): создаёт pattern с initial pattern step “step1”. Generic<Event>— input element type..where(condition): условие для текущего step. Event matches только если condition true..followedBy("step2"): добавляет next step с relaxed contiguity. Step имеет имя “step2” — по нему достаём events из match..within(Time.minutes(5)): time constraint для всего pattern.
? в Pattern<Event, ?> — это generic parameter для “current pattern step type”, evolves через chain.
Conditions: SimpleCondition
SimpleCondition.of(lambda) — stateless condition на одиночный event:
.where(SimpleCondition.of(event -> event.getAmount() > 1000))
// or
.where(SimpleCondition.of(event -> event.getType().equals("PURCHASE") && event.getAmount() > 1000))
Lambda берёт event, возвращает boolean. Без state — выполнение зависит только от event content.
Под капотом — applied к each candidate event. Если true — proceed (TAKE transition в NFA). Если false — IGNORE (или fail, зависит от contiguity).
Conditions: IterativeCondition
IterativeCondition — stateful, access to previously matched events в текущем partial match:
.where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event candidate, Context<Event> ctx) throws Exception {
// Доступ к events from previous steps
double avgPrevious = StreamSupport.stream(ctx.getEventsForPattern("step1").spliterator(), false)
.mapToDouble(Event::getAmount)
.average()
.orElse(0);
// Candidate matches только если значительно больше average
return candidate.getAmount() > avgPrevious * 2;
}
})
Context API:
ctx.getEventsForPattern("step1"): iterable событий, matched в “step1” в current partial match.ctx.timestamp(): timestamp of current event being evaluated.ctx.currentProcessingTime().
Use case: dynamic conditions where threshold depends on previous events. Например, “purchase amount > 2x average of last 5 purchases”.
IterativeCondition expensive — каждый event evaluation reads previously matched events. Для длинных partial matches (oneOrMore с many events) это O(n) per event evaluation, O(n^2) overall. Use sparingly. Если нужны running aggregations — better: precompute через regular Flink operator before CEP.
Contiguity: next, followedBy, followedByAny, notFollowedBy
Самый важный choice — contiguity. Это определяет, что happens between matched events.
.next("step"): strict
.next("immediate")
.where(SimpleCondition.of(e -> e.getType().equals("X")))
Immediately after previous step’s event. NO intervening events. Если between previous match и this step есть other events — fail.
Example sequence A, B, C:
- Pattern:
begin("a").where(A).next("b").where(B). - Matches [A, B]. Does NOT match [A, C, B] (C between).
Use case: rigid sequences (no allowable interleaving). Rare в practice — most real-world patterns relaxed.
.followedBy("step"): relaxed
.followedBy("later")
.where(SimpleCondition.of(e -> e.getType().equals("Y")))
Eventually after previous step. Other events можно между. Greedy: takes first matching event.
Example: A, X, Y, B match for begin("a").where(A).followedBy("b").where(B):
- A matches “a”, X не match B (ignore), Y не match B (ignore), B matches “b”. Match = [A, B].
- Even if multiple Bs in stream, only first one selected (greedy).
Use case: most common. “Login, then eventually checkout” — events between OK.
.followedByAny("step"): non-deterministic relaxed
.followedByAny("any")
.where(SimpleCondition.of(e -> e.getType().equals("Z")))
Like followedBy, но все matching events создают separate matches.
Example: A, B1, B2 для begin("a").where(A).followedByAny("b").where(B):
- Two matches: [A, B1] AND [A, B2].
Use case: “after login, find each click” — каждый click — separate match.
Cost: state explosion — каждый matching B starts new partial match. Use carefully.
.notFollowedBy("step"): violation
.notFollowedBy("absent")
.where(SimpleCondition.of(e -> e.getType().equals("CANCEL")))
Specifies “previous step should NOT be followed by event matching this”. If matching event found — partial match aborts.
Example: A.followedBy(B).notFollowedBy(C).
- Sequence A, B, C: A matches, B matches, then C found — abort, no match.
- Sequence A, B (без C, within timeout): match emitted as [A, B].
Use case: “user added to cart, did NOT checkout within 30 minutes” — abandoned cart.
notFollowedBy ОБЯЗАТЕЛЬНО с within() — иначе wait forever.
Quantifiers
Default: каждый pattern step matches exactly one event. Quantifiers расширяют это.
.oneOrMore()
.where(SimpleCondition.of(e -> e.getType().equals("CLICK")))
.oneOrMore()
.greedy()
Matches 1+ events. Default greedy — as many as possible.
Sequence X, Y, Y, Y, Z:
- Pattern
begin("clicks").where(Y).oneOrMore()matches [Y, Y, Y].
.times(N)
.times(3) // exactly 3
.times(3, 5) // between 3 and 5
.timesOrMore(3) // 3+
Specifies exact count or range. times(M, N) — bound on both ends.
Combining с contiguity
Quantifiers + contiguity create flexible patterns:
.followedBy("attacks")
.where(SimpleCondition.of(e -> e.getType().equals("FAILED_LOGIN")))
.timesOrMore(5) // 5 or more failed logins
.consecutive() // consecutive — no other events between failed logins
.greedy()
.within(Time.minutes(5))
.consecutive() — events для этого step должны быть consecutive в stream (no interleaving). Без — relaxed (other events allowed between).
.greedy() — для quantifier: take as many events as possible vs as few as possible.
Until conditions
.until(condition) — stop quantifier matching когда condition true:
.where(SimpleCondition.of(e -> e.getType().equals("CLICK")))
.oneOrMore()
.until(SimpleCondition.of(e -> e.getType().equals("PURCHASE")))
Matches CLICKs до тех пор, пока не появится PURCHASE. PURCHASE itself не included в match (it’s terminating event, not part of pattern).
Use case: “all clicks before purchase” — все клики, что были до purchase event.
Optional events
.optional() — event may or may not be present:
.followedBy("optionalEvent")
.where(SimpleCondition.of(e -> e.getType().equals("BONUS")))
.optional()
.followedBy("mainEvent")
.where(SimpleCondition.of(e -> e.getType().equals("PURCHASE")))
Match можно с или без “optionalEvent”. Если BONUS перед PURCHASE — included в match. Если nope — partial match still successful.
Complete realistic example: credit card fraud
Pattern: “small purchase (10), small purchase (10), big purchase ($500+), within 1 hour, all on same card”. Classic stolen card behavior — small tests, then big.
Pattern<Transaction, ?> fraudPattern = Pattern
.<Transaction>begin("test1")
.where(SimpleCondition.of(t -> t.getAmount() >= 1 && t.getAmount() <= 10))
.followedBy("test2")
.where(SimpleCondition.of(t -> t.getAmount() >= 1 && t.getAmount() <= 10))
.followedBy("big")
.where(SimpleCondition.of(t -> t.getAmount() >= 500))
.within(Time.hours(1));
DataStream<Transaction> transactions = env.fromSource(...);
DataStream<Transaction> keyed = transactions.keyBy(Transaction::getCardId);
PatternStream<Transaction> patterns = CEP.pattern(keyed, fraudPattern);
DataStream<FraudAlert> alerts = patterns.process(
new PatternProcessFunction<Transaction, FraudAlert>() {
@Override
public void processMatch(
Map<String, List<Transaction>> match,
Context ctx,
Collector<FraudAlert> out) {
Transaction test1 = match.get("test1").get(0);
Transaction test2 = match.get("test2").get(0);
Transaction big = match.get("big").get(0);
FraudAlert alert = new FraudAlert(
big.getCardId(),
"Suspicious transaction sequence",
List.of(test1.getTxnId(), test2.getTxnId(), big.getTxnId()),
big.getAmount()
);
out.collect(alert);
}
});
alerts.sinkTo(alertSink);
Это full real example. KeyBy by cardId — per-card matching. Conditions select transactions in expected amount ranges. within(1 hour) bounds search window. PatternProcessFunction emits alert when match found.
Within: required для almost every pattern
Без within() — partial matches могут жить forever:
// BAD: no within
Pattern.<Event>begin("login")
.where(e -> e.getType().equals("LOGIN"))
.followedBy("anything")
.where(e -> e.getType().equals("WHATEVER"));
Если LOGIN event arrives, и потом никогда WHATEVER — partial match (with LOGIN) stays в state FOREVER. With 1M users logging in daily, state grows unbounded.
// GOOD: with within
Pattern.<Event>begin("login")
.where(e -> e.getType().equals("LOGIN"))
.followedBy("anything")
.where(e -> e.getType().equals("WHATEVER"))
.within(Time.minutes(30));
После 30 минут без match — partial match discarded. Bounded state.
Watermark-driven cleanup: Flink CEP uses event-time watermarks по умолчанию. Когда watermark > start_time + within, partial match prunable.
AfterMatchSkipStrategy
Когда match found, what do with overlapping partial matches?
// Default: NO_SKIP — все matches emitted, overlap OK
PatternStream<Event> stream = CEP.pattern(keyedStream, pattern);
// Skip to next event
PatternStream<Event> stream = CEP.pattern(
keyedStream,
pattern.withSkipStrategy(AfterMatchSkipStrategy.skipToNext()));
// Skip to first event of next pattern (skip partial matches involving same events)
.withSkipStrategy(AfterMatchSkipStrategy.skipPastLastEvent());
// Skip to last (only most recent match emitted)
.withSkipStrategy(AfterMatchSkipStrategy.skipPastFirstEvent("step1"));
Strategies:
NO_SKIP: all matches emitted (default).skipToNext: after match, skip to next event in stream (drops overlapping matches with same start).skipPastLastEvent: after match, skip past all events in this match (drops matches using same events).skipPastFirstEvent(stepName): skip past first event in specific step.
For fraud detection, often skipPastLastEvent — once detected, don’t double-alert on overlapping.
Pattern.times within Pattern
Можно nested patterns:
Pattern<Event, ?> innerPattern = Pattern
.<Event>begin("a")
.where(SimpleCondition.of(e -> e.getType().equals("A")))
.next("b")
.where(SimpleCondition.of(e -> e.getType().equals("B")));
// outer pattern not directly composable — Flink CEP не supports
// nested patterns в Pattern API
Flink CEP не имеет nested patterns natively. Если нужно more complex composition — manually через ProcessFunction (lower level) или MATCH_RECOGNIZE SQL (next lesson).
Output handling
После CEP detect match, ты handle через PatternProcessFunction (modern) или PatternSelectFunction (legacy):
// PatternProcessFunction — full power
patternStream.process(new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(
Map<String, List<Event>> match,
Context ctx,
Collector<Alert> out) {
// Access matched events
// Access ctx (timestamp, side outputs)
// Emit alerts
}
});
// For timeout matches (partial matches that exceeded within())
patternStream.process(new PatternProcessFunction<Event, Alert>() {
@Override
public void processMatch(...) { ... }
// Override timeout handler
@Override
public void processTimedOutMatch(
Map<String, List<Event>> match,
Context ctx) {
// Partial match timed out
// E.g., LOGIN happened, but no CHECKOUT within window
// Emit "abandoned cart" alert
ctx.output(timedOutTag, new Alert("Abandoned", match));
}
});
processTimedOutMatch — useful для negative patterns (e.g., “abandoned cart” = login + add_to_cart + NO checkout). Emit via side output.
Production-перспектива: pattern complexity tradeoffs
Performance scales с pattern complexity:
| Complexity | Cost factor |
|---|---|
| Simple sequence (3-4 steps, no quantifiers) | 1x |
| Sequence with greedy quantifier | 5-10x state, 2-5x CPU |
| followedByAny | 10-100x state (combinatorial) |
| IterativeCondition | 5-10x CPU |
| Long within() window | linear с window size |
Heuristic: pattern с greedy oneOrMore + followedByAny + long window = potentially impossible to maintain в production. Cap quantifiers с times(M, N), use selective conditions, short within.
Watch metrics:
numberOfPendingPartialMatches— state size proxy.numLateRecordsDropped— events too late vs watermark.- RocksDB state size growth.
If state growing fast — pattern либо incorrectly written (broad conditions), либо genuinely needs that much state. Profile, optimize, или re-architect.