MATCH_RECOGNIZE SQL: Calcite-based pattern matching
В прошлом уроке мы посмотрели Java/Scala Pattern API для CEP. SQL мир имеет свой подход — MATCH_RECOGNIZE clause, стандарт SQL 2016 от Oracle. Это extension стандартного SELECT для pattern matching на отсортированных rows.
Flink implements MATCH_RECOGNIZE через Apache Calcite (planner). Это даёт SQL-users CEP-power без Java code. Под капотом — компиляция в тот же NFA как Pattern API.
Этот урок: syntax MATCH_RECOGNIZE, как Calcite compile его, comparison с Pattern API, и production-worthy examples.
Windowed aggregations в ksqlDBБазовая структура
SELECT *
FROM transactions
MATCH_RECOGNIZE (
PARTITION BY card_id
ORDER BY ts
MEASURES
A.amount AS first_amount,
B.amount AS big_amount,
LAST(B.ts) AS detected_at
PATTERN (A B)
DEFINE
A AS A.amount BETWEEN 1 AND 10,
B AS B.amount > 500
);
Breakdown:
MATCH_RECOGNIZE: opens the clause после FROM source.PARTITION BY card_id: group rows by partition (like keyBy в DataStream).ORDER BY ts: sort within partition by timestamp.MEASURES: what columns to output from matches.PATTERN (A B): pattern definition — sequence of variables.DEFINE: conditions для каждой variable.
This is direct equivalent to Pattern API example previous lesson — small txn followed by big txn.
Pattern variables и quantifiers
Pattern имеет regex-like syntax:
PATTERN (A B C) -- sequence: A then B then C
PATTERN (A B?) -- A then optional B
PATTERN (A B*) -- A then 0+ B
PATTERN (A B+) -- A then 1+ B
PATTERN (A B{3}) -- A then exactly 3 B
PATTERN (A B{2,5}) -- A then 2 to 5 B
PATTERN (A B{3,}) -- A then 3+ B
PATTERN (A (B|C)) -- A then B or C
PATTERN (A ^ B) -- A reluctant (non-greedy)
PATTERN (A B+?) -- A then non-greedy B+
Variables (A, B, C) — placeholders. Каждая variable matches при evaluation true DEFINE A AS condition. Если no condition defined for variable, matches all rows.
Contiguity в MATCH_RECOGNIZE
Default contiguity — STRICT (strict NEXT в Pattern API): rows must be consecutive в ORDER BY order.
PATTERN (A B C)
-- Strict: A immediately followed by B immediately followed by C
-- Equivalent to .next("a").next("b").next("c") в Pattern API
Для relaxed contiguity (Pattern API .followedBy) — use intermediate variable matching all:
PATTERN (A {- X* -} B {- X* -} C)
DEFINE
A AS A.type = 'LOGIN',
B AS B.type = 'CLICK',
C AS C.type = 'PURCHASE',
X AS true -- match anything (intermediate events ignored)
{- ... -} — это exclusion: matched, но не included в output measures. Это equivalent followedBy.
Alternatively, AFTER MATCH SKIP TO NEXT ROW:
MATCH_RECOGNIZE (
...
AFTER MATCH SKIP TO NEXT ROW
PATTERN (A B)
...
)
AFTER MATCH SKIP — controls overlap behaviour (similar to AfterMatchSkipStrategy в Pattern API).
Полный пример: credit card fraud
SELECT *
FROM transactions
MATCH_RECOGNIZE (
PARTITION BY card_id
ORDER BY ts
MEASURES
FIRST(SMALL1.ts) AS first_test_ts,
FIRST(SMALL1.amount) AS first_test_amount,
LAST(SMALL2.amount) AS second_test_amount,
BIG.amount AS fraud_amount,
BIG.ts AS fraud_ts,
COUNT(*) AS event_count
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (SMALL1 {- X* -} SMALL2 {- X* -} BIG) WITHIN INTERVAL '1' HOUR
DEFINE
SMALL1 AS SMALL1.amount BETWEEN 1 AND 10,
SMALL2 AS SMALL2.amount BETWEEN 1 AND 10,
BIG AS BIG.amount >= 500,
X AS true
);
Wide pattern explained:
- PARTITION BY card_id: per-card matching, как keyBy.
- ORDER BY ts: ordering by event time.
- MEASURES: output columns — first transaction info, second, big transaction info, total event count в match.
- ONE ROW PER MATCH: emit one row per detected match (alternative: ALL ROWS PER MATCH emits all matched rows individually).
- AFTER MATCH SKIP PAST LAST ROW: after match, skip past all events used — no overlapping matches.
PATTERN (SMALL1 {- X* -} SMALL2 {- X* -} BIG): SMALL1, possibly other events, SMALL2, possibly other events, BIG.- WITHIN INTERVAL ‘1’ HOUR: time bound for whole match.
- DEFINE: conditions для each variable.
Это полный equivalent Pattern API example из прошлого урока, но в SQL форме.
Calcite implementation
Flink использует Apache Calcite для SQL planning. MATCH_RECOGNIZE — standard SQL construct, который Calcite recognizes и parses в logical plan.
Compilation path:
- Parse: SQL string -> AST.
- Validate: schemas, types, references.
- Plan: logical relational tree (
LogicalMatchRecognize). - Optimize: Calcite rules for MATCH_RECOGNIZE optimization.
- Translate: logical plan -> physical plan with Flink CEP operator.
- Execute: CEP operator runs.
Critically, MATCH_RECOGNIZE compiles to same CEP operator что и Pattern API. Это не parallel implementation — same underlying NFA / SharedBuffer. SQL syntax thin layer over Pattern API.
CalciteMatchRecognizeTranslator builds equivalent Pattern из MATCH_RECOGNIZE clauses. Это translation:
| MATCH_RECOGNIZE | Pattern API |
|---|---|
PATTERN (A B) | begin("A").next("B") (strict) |
PATTERN (A {- X* -} B) | begin("A").followedBy("B") (relaxed) |
PATTERN (A B*) | begin("A").followedBy("B").oneOrMore().optional() |
PATTERN (A B{3,5}) | begin("A").followedBy("B").times(3, 5) |
DEFINE A AS condition | .where(condition) |
WITHIN INTERVAL '5' MINUTE | .within(Time.minutes(5)) |
PARTITION BY x | keyBy(x) |
AFTER MATCH SKIP TO NEXT ROW | withSkipStrategy(skipToNext()) |
Comparison: MATCH_RECOGNIZE vs Pattern API
| Aspect | MATCH_RECOGNIZE | Pattern API |
|---|---|---|
| Language | SQL | Java/Scala |
| Audience | Data analysts, SQL-first teams | Engineers |
| Expressiveness | Standard SQL constructs | More flexible (IterativeCondition, custom ProcessFunction) |
| Type safety | Runtime errors only | Compile-time типы |
| Reuse | Just SQL files | Java code, can be unit-tested |
| Tooling | SQL editor, dashboards | IDE, debugger |
| Performance | Same (same operator) | Same (same operator) |
| Custom logic | Limited to DEFINE conditions | Full Java power |
Choose MATCH_RECOGNIZE if:
- SQL-first team (analysts, BI engineers).
- Pattern simple enough to express in SQL.
- Integration into existing SQL pipelines (CREATE TABLE + INSERT INTO).
Choose Pattern API if:
- Complex conditions с running aggregations.
- Need custom logic в conditions (calling external services, ML inference).
- Strict typing important.
Use case: funnel analysis
Classic e-commerce funnel: view -> add_to_cart -> checkout -> purchase. Find users who reached step N but не N+1.
-- Find users who add_to_cart but never purchase within session
SELECT
user_id,
cart_event_ts,
cart_session_id
FROM user_events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_ts
MEASURES
CART.event_ts AS cart_event_ts,
CART.session_id AS cart_session_id
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (VIEW {- X* -} CART {- Y* -} NO_PURCHASE) WITHIN INTERVAL '30' MINUTE
DEFINE
VIEW AS VIEW.event_type = 'PRODUCT_VIEW',
CART AS CART.event_type = 'ADD_TO_CART',
X AS X.event_type NOT IN ('ADD_TO_CART'),
Y AS Y.event_type NOT IN ('PURCHASE'),
NO_PURCHASE AS NO_PURCHASE.event_type = 'SESSION_END'
AND NO_PURCHASE.session_id = CART.session_id
);
Trick: используем SESSION_END event как explicit terminator. Pattern matches “VIEW happened, then CART happened, then SESSION_END (in same session) — meaning user left without purchasing”.
X AS X.event_type NOT IN ('ADD_TO_CART') — intermediate events allowed но не другой ADD_TO_CART (это будет separate cart).
Result — list cart-abandonments. Can join с user table, send re-engagement campaign.
Use case: stock price patterns
Classic financial use case — detect “V-shape” pattern в stock prices (drop then recovery).
SELECT
symbol,
start_price,
bottom_price,
end_price,
bottom_ts,
end_ts
FROM stock_ticks
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY ts
MEASURES
FIRST(price) AS start_price,
LAST(DOWN.price) AS bottom_price,
LAST(UP.price) AS end_price,
LAST(DOWN.ts) AS bottom_ts,
LAST(UP.ts) AS end_ts
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (START DOWN+ UP+) WITHIN INTERVAL '1' HOUR
DEFINE
DOWN AS DOWN.price < LAST(START.price)
AND (LAST(DOWN.price) IS NULL OR DOWN.price < LAST(DOWN.price)),
UP AS UP.price > LAST(DOWN.price)
AND (LAST(UP.price) IS NULL OR UP.price > LAST(UP.price))
);
Pattern: START (anchor), DOWN+ (consecutive decreasing prices), UP+ (consecutive increasing prices). Within 1 hour. Recovery wenn UP price > all DOWN prices.
DEFINE uses LAST() and FIRST() references previous matches within current pattern — iterative condition equivalent.
This is non-trivial pattern, demonstrates power of MATCH_RECOGNIZE for time-series analysis.
ONE ROW PER MATCH vs ALL ROWS PER MATCH
ONE ROW PER MATCH -- default: 1 row per match
ALL ROWS PER MATCH -- per matched row, 1 output row
ONE ROW PER MATCH — collapse match to one summary row. MEASURES define columns.
ALL ROWS PER MATCH — emit each matched event with annotation (pattern variable name, match number). Useful для downstream processing that wants individual events but annotated.
SELECT * FROM ticks
MATCH_RECOGNIZE (
PARTITION BY symbol
ORDER BY ts
MEASURES
CLASSIFIER() AS pattern_var, -- which variable matched this row
MATCH_NUMBER() AS match_num
ALL ROWS PER MATCH
PATTERN (A B+ C)
DEFINE ...
);
-- Output: rows like
-- symbol | ts | pattern_var | match_num
-- AAPL | t1 | A | 1
-- AAPL | t2 | B | 1
-- AAPL | t3 | B | 1
-- AAPL | t4 | C | 1
CLASSIFIER() — built-in function, returns name of pattern variable that matched current row.
MATCH_NUMBER() — sequential match number.
SKIP strategies
AFTER MATCH SKIP PAST LAST ROW -- skip всех events этого match
AFTER MATCH SKIP TO NEXT ROW -- skip только next event
AFTER MATCH SKIP TO FIRST var -- skip к first event этой variable
AFTER MATCH SKIP TO LAST var -- skip к last event этой variable
Same semantics as AfterMatchSkipStrategy в Pattern API.
Common patterns:
- Fraud detection:
SKIP PAST LAST ROW— don’t alert on overlapping matches. - Trend detection:
SKIP TO NEXT ROW— allow overlapping, find all возможных trends.
Streaming vs batch SQL
MATCH_RECOGNIZE works в both modes:
-- Batch read
SELECT * FROM historical_data MATCH_RECOGNIZE (...);
-- Runs once, scans all data, emits matches
-- Streaming read
SELECT * FROM live_stream MATCH_RECOGNIZE (...);
-- Runs continuously, emits matches as patterns complete
Streaming имеет watermark-driven cleanup partial matches (как Pattern API). WITHIN INTERVAL critical для bounded state.
Output stream — append-only (matches added over time, никогда не deleted). Это work с downstream sinks that support append (Kafka, Paimon).
Production-перспектива: integration
Реальный production pipeline:
-- Source: Kafka events
CREATE TABLE events (
user_id BIGINT,
event_type STRING,
amount DECIMAL(10,2),
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'avro-confluent'
);
-- Sink: Paimon alerts table
CREATE TABLE fraud_alerts (
card_id BIGINT,
first_test_amount DECIMAL(10,2),
fraud_amount DECIMAL(10,2),
detected_at TIMESTAMP(3)
) WITH (
'connector' = 'paimon',
'path' = 's3://lakehouse/alerts'
);
-- Pattern detection pipeline
INSERT INTO fraud_alerts
SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY card_id
ORDER BY ts
MEASURES
FIRST(SMALL1.amount) AS first_test_amount,
BIG.amount AS fraud_amount,
BIG.ts AS detected_at
ONE ROW PER MATCH
AFTER MATCH SKIP PAST LAST ROW
PATTERN (SMALL1 {- X* -} SMALL2 {- X* -} BIG) WITHIN INTERVAL '1' HOUR
DEFINE
SMALL1 AS SMALL1.amount BETWEEN 1 AND 10,
SMALL2 AS SMALL2.amount BETWEEN 1 AND 10,
BIG AS BIG.amount >= 500,
X AS true
);
Это complete pipeline: Kafka -> MATCH_RECOGNIZE -> Paimon. No Java code. SQL-only.
Operations:
- Monitor через Flink SQL Gateway metrics.
- Iterate query через SQL editor.
- Version control patterns just как code.
- Test patterns на batch data (same syntax!).
Comparison: MATCH_RECOGNIZE vs window aggregations
Common mistake — using window aggregations для pattern detection. Это не equivalent:
-- WRONG: window aggregation for pattern
SELECT user_id, count(*) AS click_count
FROM events
WHERE event_type = 'CLICK'
GROUP BY user_id, TUMBLE(ts, INTERVAL '5' MINUTE);
-- This counts clicks, doesn't detect "click -> no purchase" pattern
-- CORRECT: MATCH_RECOGNIZE for pattern
SELECT user_id, click_ts
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY ts
MEASURES CLICK.ts AS click_ts
ONE ROW PER MATCH
PATTERN (CLICK {- X* -} ABANDONMENT) WITHIN INTERVAL '5' MINUTE
DEFINE
CLICK AS CLICK.event_type = 'CLICK',
X AS X.event_type NOT IN ('PURCHASE'),
ABANDONMENT AS ABANDONMENT.event_type = 'SESSION_END'
);
-- Detects pattern: click happened, then no purchase, then session ended
Window aggregations — for counting/summing/grouping. MATCH_RECOGNIZE — for sequences and patterns.
MATCH_RECOGNIZE complexity grows fast. Pattern with greedy quantifier (B+) + selective DEFINE conditions может explode state так же как Pattern API. Same rules apply: WITHIN required, selective conditions, monitor pending partial matches metric. SQL syntax doesn’t make it cheap — it just makes it accessible.