Learning Platform
Глоссарий Troubleshooting
Урок 17.01 · 24 мин
Продвинутый
Flink AIML_PREDICTVECTOR_SEARCHCREATE MODELReal-time inferenceRAGAgentic streaming

AI features в Flink 2.x: ML_PREDICT, VECTOR_SEARCH, CREATE MODEL

В Flink 2.1 (Q4 2025) появилась первая wave AI-features: CREATE MODEL DDL, ML_PREDICT table-valued function (TVF). В Flink 2.2 добавилось VECTOR_SEARCH — native SQL function для embedding search. Это reflects broader industry trend — integration ML inference прямо в streaming pipelines.

Этот урок: что Flink AI делает, зачем real-time inference нужно (vs batch scoring), use cases от fraud detection до RAG (Retrieval-Augmented Generation), и где это fit в общей architecture.

MLlib Pipeline API в Spark: batch ML serving

Почему inference в streaming

Classical ML deployment:

  1. Batch scoring: nightly job runs predictions на all users, results stored в DB. Production reads pre-computed scores.

  2. API microservice: model deployed как REST service, app calls для inference при необходимости.

  3. Edge inference: model embedded в client (mobile, browser), runs locally.

Все три имеют limitations для real-time use cases:

  • Batch: stale (last update yesterday). Не useful для fraud detection — fraud happens сейчас.
  • API microservice: works, но requires synchronous orchestration. App -> API -> DB -> API -> App. Latency, complexity.
  • Edge: limited по model size, не secure для proprietary models.

Real-time streaming inference — fourth pattern. Model runs внутри Flink pipeline, processes events as they flow. Predictions emitted в real-time, downstream consumers see fresh results.

Use cases:

  • Fraud detection: каждая transaction scored через ML model в-stream. Suspicious -> blocked в milliseconds.
  • Recommendation systems: events -> recompute user embeddings -> search similar items -> emit recommendations. Low-latency loop.
  • RAG (Retrieval-Augmented Generation): question event -> embed -> vector search -> top-k results -> LLM answer. All in streaming.
  • Agentic streaming: agent receives events, takes actions based on LLM reasoning. Real-time AI loops.

Flink выбрана как platform для этого потому что:

  • Already handles streaming infrastructure (sources, sinks, state, exactly-once).
  • SQL interface — accessible non-Java teams.
  • Scales horizontally — same patterns for low/high throughput.
  • Already integrated с lakehouse (Paimon, Iceberg) для feature stores.

CREATE MODEL DDL

CREATE MODEL — Flink 2.1 DDL для register external models в catalog.

CREATE MODEL customer_churn_model
INPUT (
  age INT,
  tenure INT,
  monthly_charges DECIMAL(10,2),
  total_charges DECIMAL(10,2),
  contract_type STRING
)
OUTPUT (
  churn_probability DOUBLE
)
WITH (
  'provider' = 'openai',
  'task' = 'text-generation',
  'api_key' = '{{ secret.openai_api_key }}',
  'model_name' = 'gpt-4-turbo',
  'temperature' = '0',
  'max_tokens' = '10'
);

Sections:

  • MODEL name: register-name в catalog. Можно reference в SQL queries.
  • INPUT: schema input features. Должно match what model expects.
  • OUTPUT: schema output. Что model returns.
  • WITH: provider-specific config. provider name (openai, anthropic, local, custom), credentials, model id, parameters.

Поддерживаемые providers (Flink 2.1):

  • openai: GPT models через OpenAI API.
  • anthropic: Claude через Anthropic API.
  • azure_openai: Azure-hosted OpenAI.
  • google_vertex: Google Vertex AI.
  • bedrock: AWS Bedrock.
  • local: model hosted на same JVM (PyTorch via DJL, ONNX runtime).
  • custom: implement your own ModelProvider Java class.

Model object — stored в catalog (Hive Metastore, etc.). Available для use в queries through ML_PREDICT.


ML_PREDICT TVF

ML_PREDICT — table-valued function для invoke model на rows:

SELECT *
FROM TABLE(
  ML_PREDICT(
    TABLE customer_features,
    MODEL customer_churn_model,
    DESCRIPTOR(age, tenure, monthly_charges, total_charges, contract_type)
  )
);

Args:

  • TABLE source: input table (или query result).
  • MODEL name: registered model в catalog.
  • DESCRIPTOR(cols): which columns на feed в model (must match INPUT schema).

Output: original columns + output columns от model. В нашем случае добавляется churn_probability к каждой строке.

Working example:

-- Setup: streaming customer events
CREATE TABLE customer_events (
  customer_id BIGINT,
  age INT,
  tenure INT,
  monthly_charges DECIMAL(10,2),
  total_charges DECIMAL(10,2),
  contract_type STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'customer-events',
  ...
);

-- Predict on stream
INSERT INTO churn_predictions
SELECT
  customer_id,
  churn_probability,
  event_time
FROM TABLE(
  ML_PREDICT(
    TABLE customer_events,
    MODEL customer_churn_model,
    DESCRIPTOR(age, tenure, monthly_charges, total_charges, contract_type)
  )
)
WHERE churn_probability > 0.7;

Every customer event -> model inference -> output только если churn_probability > 0.7. Streams в churn_predictions table (probably Paimon для downstream consume).

Под капотом — ML_PREDICT operator. Для external models (API providers) — async lookup join semantics (next lesson detail). Для local models — synchronous inference в operator.


В Flink 2.2 добавлена native VECTOR_SEARCH function для embedding-based search.

SELECT *
FROM VECTOR_SEARCH(
  TABLE products,           -- table with embeddings
  'product_embedding',      -- column with vectors
  ARRAY[0.1, 0.2, ..., 0.5], -- query vector
  TOP_K => 10,
  DISTANCE => 'cosine'
);

Args:

  • TABLE source: table containing vectors.
  • column: which column holds vectors.
  • query vector: array of floats, vector для search.
  • TOP_K: how many nearest results.
  • DISTANCE: similarity metric (cosine, euclidean, dot_product).

Returns: top-k nearest neighbors к query vector. Output schema = source columns + similarity score.

Behind scenes — connector specific:

  • pgvector (Postgres): translates в SELECT ... ORDER BY embedding <-> '[...]' LIMIT k.
  • Pinecone / Weaviate / Milvus / Qdrant: API call к vector DB.
  • In-Flink (small datasets): brute-force scan, sort by distance.

Vector DB как source table:

CREATE TABLE products WITH (
  'connector' = 'pinecone',
  'index' = 'products',
  'api_key' = '...',
  'environment' = 'us-east-1-aws'
);

SELECT * FROM VECTOR_SEARCH(
  TABLE products,
  'embedding',
  cast_array(ML_EMBED(MODEL embedding_model, 'wireless headphones')),
  TOP_K => 5
);

ML_EMBED — companion function для embeddings (creates vector из text using embedding model).


RAG pipeline пример

Retrieval-Augmented Generation — combination embedding search + LLM answer.

-- Source: user questions stream
CREATE TABLE user_questions (
  user_id BIGINT,
  question STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-questions'
);

-- Models
CREATE MODEL embed_model WITH (
  'provider' = 'openai',
  'task' = 'text-embedding-3-small'
);

CREATE MODEL llm WITH (
  'provider' = 'openai',
  'task' = 'gpt-4-turbo'
);

-- Vector DB (knowledge base)
CREATE TABLE knowledge_base WITH (
  'connector' = 'pgvector',
  'url' = 'jdbc:postgresql://...',
  'table' = 'documents'
);

-- RAG pipeline в одном SQL query
INSERT INTO answers
SELECT
  q.user_id,
  q.question,
  ml.answer
FROM user_questions q
CROSS JOIN LATERAL (
  -- Step 1: embed question
  SELECT ML_EMBED(MODEL embed_model, q.question) AS query_vec
) emb
CROSS JOIN LATERAL (
  -- Step 2: vector search top-3 documents
  SELECT STRING_AGG(content, '\n\n') AS context
  FROM VECTOR_SEARCH(
    TABLE knowledge_base,
    'embedding',
    emb.query_vec,
    TOP_K => 3
  )
) ctx
CROSS JOIN LATERAL (
  -- Step 3: LLM generates answer с context
  SELECT ML_PREDICT(
    MODEL llm,
    'You are a helpful assistant. Use this context:\n' || ctx.context ||
    '\n\nQuestion: ' || q.question
  ) AS answer
) ml;

Это complete RAG pipeline в streaming SQL. Каждый user question:

  1. Embedded (text -> vector).
  2. Vector searched в knowledge base (find top-3 relevant docs).
  3. Context + question fed в LLM.
  4. LLM answer streamed в output.

Latency end-to-end: ~500ms-2s (embed ~100ms + vector search ~50ms + LLM ~500ms-2s). Acceptable для chatbot UX.


Agentic streaming

Beyond RAG — agentic patterns: LLM decides actions based на streaming events.

-- Customer service event
CREATE TABLE customer_interactions (
  customer_id BIGINT,
  interaction_type STRING,
  details STRING,
  ts TIMESTAMP(3)
);

CREATE MODEL agent WITH (
  'provider' = 'anthropic',
  'task' = 'claude-3-opus',
  'system_prompt' = 'You are a customer service agent. Analyze interaction, decide next action. Output JSON: {action: ..., params: {...}}'
);

INSERT INTO agent_actions
SELECT
  customer_id,
  ML_PREDICT(MODEL agent, interaction_type || '\n' || details) AS action_json,
  ts
FROM customer_interactions;

Output agent_actions consumed downstream operator that parses JSON, executes action (e.g., send email, create ticket, escalate). Closed-loop agentic system, fully streaming.


Где это fit в architecture

Flink AI в data architecture
Streaming SourcesSources: Kafka events, CDC из RDBMS, IoT sensors, click streams
Flink Streaming JobFlink Job: streaming pipeline. Includes CEP, joins, aggregations, AND ML_PREDICT / VECTOR_SEARCH / ML_EMBED для inference
AI ProvidersExternal AI services: OpenAI, Anthropic, Bedrock APIs (через async lookup join semantics). Local models: ONNX, PyTorch via DJL bundled в Flink TM
ML_PREDICT
Inference CallCaller is Flink — ML_PREDICT triggers calls через connector. Latency, rate limit considerations
Vector DBsVector DBs: pgvector, Pinecone, Milvus, Qdrant, Weaviate. VECTOR_SEARCH через connector translates SQL в provider API
VECTOR_SEARCH
SinksOutputs: enriched events (predictions + original data) в downstream — Paimon table для analytics, Kafka topic для apps, Redis для serving

Flink job becomes orchestrator that combines:

  • Streaming data movement.
  • Stateful transformations (aggregations, joins, CEP).
  • ML inference (через API providers или local models).
  • Vector search (через connectors).

Это powerful, но requires careful design для performance (next lessons cover).


Performance considerations

ML inference вводит new challenges streaming:

Latency variation: API calls к LLM могут take 100ms-2s, не milliseconds. Pipeline throughput limited by API latency.

Rate limits: External APIs (OpenAI, Anthropic) имеют RPM/TPM limits. Без throttling — 429 errors, retries.

Cost: API calls $$ per token. High-throughput pipelines может easily run thousands of $/day.

Cold start: Local models — first inference initializes runtime (load model в memory, JIT compile). Slow first call.

Backpressure: If downstream (sink) slow, pipeline stalls — including ongoing inference calls. Можно lose in-flight predictions.

Solutions (deep dive в 04-async-lookup-and-caching):

  • Async lookup join: non-blocking inference calls.
  • Caching: same inputs -> cached predictions (avoid redundant calls).
  • Rate limiting: throttle calls per second to respect API limits.
  • Batching: group multiple inputs into one API call (where supported).
  • Local fallback: try local model first, API only при miss.

Limitations и caveats

Flink AI новая (2025-2026), evolving fast:

1. Provider abstraction leaky.

Provider-specific behavior leaks через config options. Need understand each provider.

2. Determinism issues.

LLMs non-deterministic (temperature > 0). Same input -> different output. Exactly-once requires deterministic functions для replay correctness. Workarounds: temperature=0, idempotency keys, write output once при checkpoint.

3. Schema evolution.

If model output schema changes (new fields), pipelines break. Versioning critical.

4. Cost monitoring.

Easy to spike costs accidentally (loop in pipeline, retry storms). Monitor token consumption.

5. Vendor lock-in.

CREATE MODEL provider-specific. Switching providers — rewrite queries. Aim для abstractions if possible.

6. Vector DB ecosystem.

Connectors maturity varies. pgvector mature (just SQL), Pinecone/Milvus newer. Test before commit.

WARNING

ML_PREDICT и VECTOR_SEARCH прибавляют complexity в pipeline. Если ты добавляешь LLM call в streaming job processing 100K events/sec, expect: 100K calls/sec to OpenAI = $$$, rate limits hit immediately, latency variation makes checkpoint behave erratically. Pilot с small subset before full deployment. Caching, batching, rate limiting не optional для production.


Flink AI не silver bullet. Better alternatives:

  • Pure ML model serving (no streaming): use dedicated serving (TensorFlow Serving, TorchServe, KServe). Lower latency, optimized для inference.
  • Batch scoring (all users nightly): use Spark или dedicated batch ML pipeline. Cheaper, simpler than streaming.
  • Search (without streaming): use search engine (Elasticsearch, OpenSearch) directly. More mature features.
  • Heavy preprocessing: model preprocessing complex — use Python ML pipeline (sklearn, pandas) directly.

Flink AI sweet spot: streaming + ML inference + ETL combined. Если хотя бы одно слагаемое отсутствует — probably другой tool лучше.


Проверка знанийKnowledge check
Команда хочет добавить fraud detection: ML model scores каждую transaction в streaming pipeline. Throughput 5000 txn/sec. Текущая идея: CREATE MODEL pointing к OpenAI GPT-4, в ML_PREDICT во время каждой transaction. Какие 3 major issues с этим планом и какие альтернативы?
ОтветAnswer
Issues: (1) LLM (GPT-4) для structured prediction — wrong tool. GPT-4 для text generation, not numerical scoring. Latency 500-2000ms per call, expensive ($0.03/1K input tokens + $0.06/1K output). 5000 calls/sec * $0.05 average = $250/sec = $21M/day. Insane cost. (2) Rate limits — OpenAI has TPM (tokens/min) limits — default ~150K TPM for GPT-4, может максимум ~2500 calls/min. 5000 calls/sec FAR exceeds. Constant 429 errors. (3) Latency variability — GPT-4 latency может spike (1-5 sec), pipeline backpressure, checkpoint alignment time grows, downstream stalls. Alternatives: (a) Use specialized model: gradient boosted tree (XGBoost) trained на historical fraud — 100x faster inference (~10μs), no API cost. Embed model in Flink через DJL/ONNX runtime — local inference. (b) If LLM truly needed (e.g., reasoning о transaction descriptions): batch multiple transactions per call (OpenAI supports). Cache by similar inputs. Async lookup join — non-blocking. Still expensive. (c) Hybrid: local fast model scores all, only flagged cases sent to LLM (e.g., 1% of traffic = 50 calls/sec, manageable). (d) Real best practice for fraud в streaming: deploy ONNX/TensorFlow model embedded в Flink TaskManager. Inference inline, sub-millisecond latency, no API cost, no rate limits, deterministic for replay. Use LLM only для unstructured analysis (e.g., free-text customer service routing), не для structured prediction. Lesson: ML_PREDICT с API providers powerful but expensive — match tool to problem.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Какие AI features добавлены в Flink 2.1 и 2.2?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4