Learning Platform
Глоссарий Troubleshooting
Урок 16.03 · 25 мин
Продвинутый
MATCH_RECOGNIZESQL pattern matchingCalciteRow pattern matchingFraud detection SQL

MATCH_RECOGNIZE SQL: Calcite-based pattern matching

В прошлом уроке мы посмотрели Java/Scala Pattern API для CEP. SQL мир имеет свой подход — MATCH_RECOGNIZE clause, стандарт SQL 2016 от Oracle. Это extension стандартного SELECT для pattern matching на отсортированных rows.

Flink implements MATCH_RECOGNIZE через Apache Calcite (planner). Это даёт SQL-users CEP-power без Java code. Под капотом — компиляция в тот же NFA как Pattern API.

Этот урок: syntax MATCH_RECOGNIZE, как Calcite compile его, comparison с Pattern API, и production-worthy examples.

Windowed aggregations в ksqlDB

Базовая структура

SELECT *
FROM transactions
  MATCH_RECOGNIZE (
    PARTITION BY card_id
    ORDER BY ts
    MEASURES
      A.amount AS first_amount,
      B.amount AS big_amount,
      LAST(B.ts) AS detected_at
    PATTERN (A B)
    DEFINE
      A AS A.amount BETWEEN 1 AND 10,
      B AS B.amount > 500
  );

Breakdown:

  • MATCH_RECOGNIZE: opens the clause после FROM source.
  • PARTITION BY card_id: group rows by partition (like keyBy в DataStream).
  • ORDER BY ts: sort within partition by timestamp.
  • MEASURES: what columns to output from matches.
  • PATTERN (A B): pattern definition — sequence of variables.
  • DEFINE: conditions для каждой variable.

This is direct equivalent to Pattern API example previous lesson — small txn followed by big txn.


Pattern variables и quantifiers

Pattern имеет regex-like syntax:

PATTERN (A B C)            -- sequence: A then B then C
PATTERN (A B?)             -- A then optional B
PATTERN (A B*)             -- A then 0+ B
PATTERN (A B+)             -- A then 1+ B
PATTERN (A B{3})           -- A then exactly 3 B
PATTERN (A B{2,5})         -- A then 2 to 5 B
PATTERN (A B{3,})          -- A then 3+ B
PATTERN (A (B|C))          -- A then B or C
PATTERN (A ^ B)            -- A reluctant (non-greedy)
PATTERN (A B+?)            -- A then non-greedy B+

Variables (A, B, C) — placeholders. Каждая variable matches при evaluation true DEFINE A AS condition. Если no condition defined for variable, matches all rows.


Contiguity в MATCH_RECOGNIZE

Default contiguity — STRICT (strict NEXT в Pattern API): rows must be consecutive в ORDER BY order.

PATTERN (A B C)
-- Strict: A immediately followed by B immediately followed by C
-- Equivalent to .next("a").next("b").next("c") в Pattern API

Для relaxed contiguity (Pattern API .followedBy) — use intermediate variable matching all:

PATTERN (A {- X* -} B {- X* -} C)
DEFINE
  A AS A.type = 'LOGIN',
  B AS B.type = 'CLICK',
  C AS C.type = 'PURCHASE',
  X AS true  -- match anything (intermediate events ignored)

{- ... -} — это exclusion: matched, но не included в output measures. Это equivalent followedBy.

Alternatively, AFTER MATCH SKIP TO NEXT ROW:

MATCH_RECOGNIZE (
  ...
  AFTER MATCH SKIP TO NEXT ROW
  PATTERN (A B)
  ...
)

AFTER MATCH SKIP — controls overlap behaviour (similar to AfterMatchSkipStrategy в Pattern API).

MATCH_RECOGNIZE compilation pipeline
SQL queryUser writes SQL with MATCH_RECOGNIZE clause
Parse (Calcite)Calcite parser: SQL string -> AST. Recognizes MATCH_RECOGNIZE syntax (SQL 2016 standard)
LogicalMatchRecognize
Logical planCalcite logical plan. Validates schema, references, types. Optimizer applies rules
Pattern API translationTranslator converts MATCH_RECOGNIZE clauses в Pattern API equivalent: PATTERN (A B) -> begin('A').next('B'); DEFINE A AS x -> .where(x)
Pattern object
NFA compilationNFACompiler: same path как Pattern API. Builds NFA — states, transitions, conditions. SharedBuffer для partial matches
CepOperator (runtime)CepOperator — runtime executor. Same operator для SQL и Pattern API. Same performance, same state, same exactly-once

Полный пример: credit card fraud

SELECT *
FROM transactions
  MATCH_RECOGNIZE (
    PARTITION BY card_id
    ORDER BY ts
    MEASURES
      FIRST(SMALL1.ts) AS first_test_ts,
      FIRST(SMALL1.amount) AS first_test_amount,
      LAST(SMALL2.amount) AS second_test_amount,
      BIG.amount AS fraud_amount,
      BIG.ts AS fraud_ts,
      COUNT(*) AS event_count
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (SMALL1 {- X* -} SMALL2 {- X* -} BIG) WITHIN INTERVAL '1' HOUR
    DEFINE
      SMALL1 AS SMALL1.amount BETWEEN 1 AND 10,
      SMALL2 AS SMALL2.amount BETWEEN 1 AND 10,
      BIG    AS BIG.amount >= 500,
      X      AS true
  );

Wide pattern explained:

  • PARTITION BY card_id: per-card matching, как keyBy.
  • ORDER BY ts: ordering by event time.
  • MEASURES: output columns — first transaction info, second, big transaction info, total event count в match.
  • ONE ROW PER MATCH: emit one row per detected match (alternative: ALL ROWS PER MATCH emits all matched rows individually).
  • AFTER MATCH SKIP PAST LAST ROW: after match, skip past all events used — no overlapping matches.
  • PATTERN (SMALL1 {- X* -} SMALL2 {- X* -} BIG): SMALL1, possibly other events, SMALL2, possibly other events, BIG.
  • WITHIN INTERVAL ‘1’ HOUR: time bound for whole match.
  • DEFINE: conditions для each variable.

Это полный equivalent Pattern API example из прошлого урока, но в SQL форме.


Calcite implementation

Flink использует Apache Calcite для SQL planning. MATCH_RECOGNIZE — standard SQL construct, который Calcite recognizes и parses в logical plan.

Compilation path:

  1. Parse: SQL string -> AST.
  2. Validate: schemas, types, references.
  3. Plan: logical relational tree (LogicalMatchRecognize).
  4. Optimize: Calcite rules for MATCH_RECOGNIZE optimization.
  5. Translate: logical plan -> physical plan with Flink CEP operator.
  6. Execute: CEP operator runs.

Critically, MATCH_RECOGNIZE compiles to same CEP operator что и Pattern API. Это не parallel implementation — same underlying NFA / SharedBuffer. SQL syntax thin layer over Pattern API.

CalciteMatchRecognizeTranslator builds equivalent Pattern из MATCH_RECOGNIZE clauses. Это translation:

MATCH_RECOGNIZEPattern API
PATTERN (A B)begin("A").next("B") (strict)
PATTERN (A {- X* -} B)begin("A").followedBy("B") (relaxed)
PATTERN (A B*)begin("A").followedBy("B").oneOrMore().optional()
PATTERN (A B{3,5})begin("A").followedBy("B").times(3, 5)
DEFINE A AS condition.where(condition)
WITHIN INTERVAL '5' MINUTE.within(Time.minutes(5))
PARTITION BY xkeyBy(x)
AFTER MATCH SKIP TO NEXT ROWwithSkipStrategy(skipToNext())

Comparison: MATCH_RECOGNIZE vs Pattern API

AspectMATCH_RECOGNIZEPattern API
LanguageSQLJava/Scala
AudienceData analysts, SQL-first teamsEngineers
ExpressivenessStandard SQL constructsMore flexible (IterativeCondition, custom ProcessFunction)
Type safetyRuntime errors onlyCompile-time типы
ReuseJust SQL filesJava code, can be unit-tested
ToolingSQL editor, dashboardsIDE, debugger
PerformanceSame (same operator)Same (same operator)
Custom logicLimited to DEFINE conditionsFull Java power

Choose MATCH_RECOGNIZE if:

  • SQL-first team (analysts, BI engineers).
  • Pattern simple enough to express in SQL.
  • Integration into existing SQL pipelines (CREATE TABLE + INSERT INTO).

Choose Pattern API if:

  • Complex conditions с running aggregations.
  • Need custom logic в conditions (calling external services, ML inference).
  • Strict typing important.

Use case: funnel analysis

Classic e-commerce funnel: view -> add_to_cart -> checkout -> purchase. Find users who reached step N but не N+1.

-- Find users who add_to_cart but never purchase within session
SELECT
  user_id,
  cart_event_ts,
  cart_session_id
FROM user_events
  MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY event_ts
    MEASURES
      CART.event_ts AS cart_event_ts,
      CART.session_id AS cart_session_id
    ONE ROW PER MATCH
    AFTER MATCH SKIP TO NEXT ROW
    PATTERN (VIEW {- X* -} CART {- Y* -} NO_PURCHASE) WITHIN INTERVAL '30' MINUTE
    DEFINE
      VIEW        AS VIEW.event_type = 'PRODUCT_VIEW',
      CART        AS CART.event_type = 'ADD_TO_CART',
      X           AS X.event_type NOT IN ('ADD_TO_CART'),
      Y           AS Y.event_type NOT IN ('PURCHASE'),
      NO_PURCHASE AS NO_PURCHASE.event_type = 'SESSION_END'
                     AND NO_PURCHASE.session_id = CART.session_id
  );

Trick: используем SESSION_END event как explicit terminator. Pattern matches “VIEW happened, then CART happened, then SESSION_END (in same session) — meaning user left without purchasing”.

X AS X.event_type NOT IN ('ADD_TO_CART') — intermediate events allowed но не другой ADD_TO_CART (это будет separate cart).

Result — list cart-abandonments. Can join с user table, send re-engagement campaign.


Use case: stock price patterns

Classic financial use case — detect “V-shape” pattern в stock prices (drop then recovery).

SELECT
  symbol,
  start_price,
  bottom_price,
  end_price,
  bottom_ts,
  end_ts
FROM stock_ticks
  MATCH_RECOGNIZE (
    PARTITION BY symbol
    ORDER BY ts
    MEASURES
      FIRST(price) AS start_price,
      LAST(DOWN.price) AS bottom_price,
      LAST(UP.price) AS end_price,
      LAST(DOWN.ts) AS bottom_ts,
      LAST(UP.ts) AS end_ts
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (START DOWN+ UP+) WITHIN INTERVAL '1' HOUR
    DEFINE
      DOWN AS DOWN.price < LAST(START.price)
             AND (LAST(DOWN.price) IS NULL OR DOWN.price < LAST(DOWN.price)),
      UP   AS UP.price > LAST(DOWN.price)
             AND (LAST(UP.price) IS NULL OR UP.price > LAST(UP.price))
  );

Pattern: START (anchor), DOWN+ (consecutive decreasing prices), UP+ (consecutive increasing prices). Within 1 hour. Recovery wenn UP price > all DOWN prices.

DEFINE uses LAST() and FIRST() references previous matches within current pattern — iterative condition equivalent.

This is non-trivial pattern, demonstrates power of MATCH_RECOGNIZE for time-series analysis.


ONE ROW PER MATCH vs ALL ROWS PER MATCH

ONE ROW PER MATCH    -- default: 1 row per match
ALL ROWS PER MATCH   -- per matched row, 1 output row

ONE ROW PER MATCH — collapse match to one summary row. MEASURES define columns.

ALL ROWS PER MATCH — emit each matched event with annotation (pattern variable name, match number). Useful для downstream processing that wants individual events but annotated.

SELECT * FROM ticks
MATCH_RECOGNIZE (
  PARTITION BY symbol
  ORDER BY ts
  MEASURES
    CLASSIFIER() AS pattern_var,  -- which variable matched this row
    MATCH_NUMBER() AS match_num
  ALL ROWS PER MATCH
  PATTERN (A B+ C)
  DEFINE ...
);

-- Output: rows like
-- symbol | ts | pattern_var | match_num
-- AAPL   | t1 | A           | 1
-- AAPL   | t2 | B           | 1
-- AAPL   | t3 | B           | 1
-- AAPL   | t4 | C           | 1

CLASSIFIER() — built-in function, returns name of pattern variable that matched current row.

MATCH_NUMBER() — sequential match number.


SKIP strategies

AFTER MATCH SKIP PAST LAST ROW    -- skip всех events этого match
AFTER MATCH SKIP TO NEXT ROW       -- skip только next event
AFTER MATCH SKIP TO FIRST var      -- skip к first event этой variable
AFTER MATCH SKIP TO LAST var       -- skip к last event этой variable

Same semantics as AfterMatchSkipStrategy в Pattern API.

Common patterns:

  • Fraud detection: SKIP PAST LAST ROW — don’t alert on overlapping matches.
  • Trend detection: SKIP TO NEXT ROW — allow overlapping, find all возможных trends.

Streaming vs batch SQL

MATCH_RECOGNIZE works в both modes:

-- Batch read
SELECT * FROM historical_data MATCH_RECOGNIZE (...);
-- Runs once, scans all data, emits matches

-- Streaming read
SELECT * FROM live_stream MATCH_RECOGNIZE (...);
-- Runs continuously, emits matches as patterns complete

Streaming имеет watermark-driven cleanup partial matches (как Pattern API). WITHIN INTERVAL critical для bounded state.

Output stream — append-only (matches added over time, никогда не deleted). Это work с downstream sinks that support append (Kafka, Paimon).


Production-перспектива: integration

Реальный production pipeline:

-- Source: Kafka events
CREATE TABLE events (
  user_id BIGINT,
  event_type STRING,
  amount DECIMAL(10,2),
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-events',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'avro-confluent'
);

-- Sink: Paimon alerts table
CREATE TABLE fraud_alerts (
  card_id BIGINT,
  first_test_amount DECIMAL(10,2),
  fraud_amount DECIMAL(10,2),
  detected_at TIMESTAMP(3)
) WITH (
  'connector' = 'paimon',
  'path' = 's3://lakehouse/alerts'
);

-- Pattern detection pipeline
INSERT INTO fraud_alerts
SELECT *
FROM events
  MATCH_RECOGNIZE (
    PARTITION BY card_id
    ORDER BY ts
    MEASURES
      FIRST(SMALL1.amount) AS first_test_amount,
      BIG.amount AS fraud_amount,
      BIG.ts AS detected_at
    ONE ROW PER MATCH
    AFTER MATCH SKIP PAST LAST ROW
    PATTERN (SMALL1 {- X* -} SMALL2 {- X* -} BIG) WITHIN INTERVAL '1' HOUR
    DEFINE
      SMALL1 AS SMALL1.amount BETWEEN 1 AND 10,
      SMALL2 AS SMALL2.amount BETWEEN 1 AND 10,
      BIG AS BIG.amount >= 500,
      X AS true
  );

Это complete pipeline: Kafka -> MATCH_RECOGNIZE -> Paimon. No Java code. SQL-only.

Operations:

  • Monitor через Flink SQL Gateway metrics.
  • Iterate query через SQL editor.
  • Version control patterns just как code.
  • Test patterns на batch data (same syntax!).

Comparison: MATCH_RECOGNIZE vs window aggregations

Common mistake — using window aggregations для pattern detection. Это не equivalent:

-- WRONG: window aggregation for pattern
SELECT user_id, count(*) AS click_count
FROM events
WHERE event_type = 'CLICK'
GROUP BY user_id, TUMBLE(ts, INTERVAL '5' MINUTE);
-- This counts clicks, doesn't detect "click -> no purchase" pattern

-- CORRECT: MATCH_RECOGNIZE for pattern
SELECT user_id, click_ts
FROM events
  MATCH_RECOGNIZE (
    PARTITION BY user_id
    ORDER BY ts
    MEASURES CLICK.ts AS click_ts
    ONE ROW PER MATCH
    PATTERN (CLICK {- X* -} ABANDONMENT) WITHIN INTERVAL '5' MINUTE
    DEFINE
      CLICK AS CLICK.event_type = 'CLICK',
      X AS X.event_type NOT IN ('PURCHASE'),
      ABANDONMENT AS ABANDONMENT.event_type = 'SESSION_END'
  );
-- Detects pattern: click happened, then no purchase, then session ended

Window aggregations — for counting/summing/grouping. MATCH_RECOGNIZE — for sequences and patterns.

WARNING

MATCH_RECOGNIZE complexity grows fast. Pattern with greedy quantifier (B+) + selective DEFINE conditions может explode state так же как Pattern API. Same rules apply: WITHIN required, selective conditions, monitor pending partial matches metric. SQL syntax doesn’t make it cheap — it just makes it accessible.


Проверка знанийKnowledge check
SQL analyst пишет MATCH_RECOGNIZE: PATTERN (A B+ C) DEFINE A AS A.event='start', B AS true, C AS C.event='end'. Без WITHIN. На production stream с 10K events/sec, через час state grew to 200GB и Flink job начал throttle. Что произошло и как fix?
ОтветAnswer
Проблема: PATTERN B+ without selective DEFINE + no WITHIN. B AS true matches любой event. Без WITHIN partial match never expires. Так как B+ greedy, каждый event entering partial match добавляется в SharedBuffer. Через час: 10K events/sec * 3600 sec = 36M events accumulated в SharedBuffer для each in-progress match. Multiple partial matches per partition (если PARTITION BY использован — per key). State explosion. Real cause: pattern logically incorrect — wants 'A followed by some B events followed by C', but expressed without constraints. Fixes: (1) ADD WITHIN INTERVAL '5' MINUTE (or appropriate window). Bounds partial match lifetime. After 5 min, expire. (2) Make B more selective. B AS true is anti-pattern. Should be specific event types relevant to pattern (e.g., B AS B.event IN ('CLICK', 'SCROLL')). (3) Use bounded quantifier B{1,100} instead of B+ unbounded. Caps state per partial match. (4) Consider если pattern truly needs all B events. Если just 'A then eventually C, ignore between' — pattern (A {- X* -} C) without B at all. Output measures from A, C only. (5) Verify PARTITION BY appropriate. Without PARTITION BY (or with low-cardinality partition), all events into one stream — scaling impossible. Lesson: MATCH_RECOGNIZE convenience doesn't waive CEP fundamentals. WITHIN required, selective conditions required, quantifiers bounded.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что такое MATCH_RECOGNIZE в SQL и как Flink его реализует?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4