AI features в Flink 2.x: ML_PREDICT, VECTOR_SEARCH, CREATE MODEL
В Flink 2.1 (Q4 2025) появилась первая wave AI-features: CREATE MODEL DDL, ML_PREDICT table-valued function (TVF). В Flink 2.2 добавилось VECTOR_SEARCH — native SQL function для embedding search. Это reflects broader industry trend — integration ML inference прямо в streaming pipelines.
Этот урок: что Flink AI делает, зачем real-time inference нужно (vs batch scoring), use cases от fraud detection до RAG (Retrieval-Augmented Generation), и где это fit в общей architecture.
MLlib Pipeline API в Spark: batch ML servingПочему inference в streaming
Classical ML deployment:
-
Batch scoring: nightly job runs predictions на all users, results stored в DB. Production reads pre-computed scores.
-
API microservice: model deployed как REST service, app calls для inference при необходимости.
-
Edge inference: model embedded в client (mobile, browser), runs locally.
Все три имеют limitations для real-time use cases:
- Batch: stale (last update yesterday). Не useful для fraud detection — fraud happens сейчас.
- API microservice: works, но requires synchronous orchestration. App -> API -> DB -> API -> App. Latency, complexity.
- Edge: limited по model size, не secure для proprietary models.
Real-time streaming inference — fourth pattern. Model runs внутри Flink pipeline, processes events as they flow. Predictions emitted в real-time, downstream consumers see fresh results.
Use cases:
- Fraud detection: каждая transaction scored через ML model в-stream. Suspicious -> blocked в milliseconds.
- Recommendation systems: events -> recompute user embeddings -> search similar items -> emit recommendations. Low-latency loop.
- RAG (Retrieval-Augmented Generation): question event -> embed -> vector search -> top-k results -> LLM answer. All in streaming.
- Agentic streaming: agent receives events, takes actions based on LLM reasoning. Real-time AI loops.
Flink выбрана как platform для этого потому что:
- Already handles streaming infrastructure (sources, sinks, state, exactly-once).
- SQL interface — accessible non-Java teams.
- Scales horizontally — same patterns for low/high throughput.
- Already integrated с lakehouse (Paimon, Iceberg) для feature stores.
CREATE MODEL DDL
CREATE MODEL — Flink 2.1 DDL для register external models в catalog.
CREATE MODEL customer_churn_model
INPUT (
age INT,
tenure INT,
monthly_charges DECIMAL(10,2),
total_charges DECIMAL(10,2),
contract_type STRING
)
OUTPUT (
churn_probability DOUBLE
)
WITH (
'provider' = 'openai',
'task' = 'text-generation',
'api_key' = '{{ secret.openai_api_key }}',
'model_name' = 'gpt-4-turbo',
'temperature' = '0',
'max_tokens' = '10'
);
Sections:
- MODEL name: register-name в catalog. Можно reference в SQL queries.
- INPUT: schema input features. Должно match what model expects.
- OUTPUT: schema output. Что model returns.
- WITH: provider-specific config. provider name (openai, anthropic, local, custom), credentials, model id, parameters.
Поддерживаемые providers (Flink 2.1):
- openai: GPT models через OpenAI API.
- anthropic: Claude через Anthropic API.
- azure_openai: Azure-hosted OpenAI.
- google_vertex: Google Vertex AI.
- bedrock: AWS Bedrock.
- local: model hosted на same JVM (PyTorch via DJL, ONNX runtime).
- custom: implement your own ModelProvider Java class.
Model object — stored в catalog (Hive Metastore, etc.). Available для use в queries through ML_PREDICT.
ML_PREDICT TVF
ML_PREDICT — table-valued function для invoke model на rows:
SELECT *
FROM TABLE(
ML_PREDICT(
TABLE customer_features,
MODEL customer_churn_model,
DESCRIPTOR(age, tenure, monthly_charges, total_charges, contract_type)
)
);
Args:
- TABLE source: input table (или query result).
- MODEL name: registered model в catalog.
- DESCRIPTOR(cols): which columns на feed в model (must match INPUT schema).
Output: original columns + output columns от model. В нашем случае добавляется churn_probability к каждой строке.
Working example:
-- Setup: streaming customer events
CREATE TABLE customer_events (
customer_id BIGINT,
age INT,
tenure INT,
monthly_charges DECIMAL(10,2),
total_charges DECIMAL(10,2),
contract_type STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'customer-events',
...
);
-- Predict on stream
INSERT INTO churn_predictions
SELECT
customer_id,
churn_probability,
event_time
FROM TABLE(
ML_PREDICT(
TABLE customer_events,
MODEL customer_churn_model,
DESCRIPTOR(age, tenure, monthly_charges, total_charges, contract_type)
)
)
WHERE churn_probability > 0.7;
Every customer event -> model inference -> output только если churn_probability > 0.7. Streams в churn_predictions table (probably Paimon для downstream consume).
Под капотом — ML_PREDICT operator. Для external models (API providers) — async lookup join semantics (next lesson detail). Для local models — synchronous inference в operator.
VECTOR_SEARCH function (Flink 2.2)
В Flink 2.2 добавлена native VECTOR_SEARCH function для embedding-based search.
SELECT *
FROM VECTOR_SEARCH(
TABLE products, -- table with embeddings
'product_embedding', -- column with vectors
ARRAY[0.1, 0.2, ..., 0.5], -- query vector
TOP_K => 10,
DISTANCE => 'cosine'
);
Args:
- TABLE source: table containing vectors.
- column: which column holds vectors.
- query vector: array of floats, vector для search.
- TOP_K: how many nearest results.
- DISTANCE: similarity metric (cosine, euclidean, dot_product).
Returns: top-k nearest neighbors к query vector. Output schema = source columns + similarity score.
Behind scenes — connector specific:
- pgvector (Postgres): translates в
SELECT ... ORDER BY embedding <-> '[...]' LIMIT k. - Pinecone / Weaviate / Milvus / Qdrant: API call к vector DB.
- In-Flink (small datasets): brute-force scan, sort by distance.
Vector DB как source table:
CREATE TABLE products WITH (
'connector' = 'pinecone',
'index' = 'products',
'api_key' = '...',
'environment' = 'us-east-1-aws'
);
SELECT * FROM VECTOR_SEARCH(
TABLE products,
'embedding',
cast_array(ML_EMBED(MODEL embedding_model, 'wireless headphones')),
TOP_K => 5
);
ML_EMBED — companion function для embeddings (creates vector из text using embedding model).
RAG pipeline пример
Retrieval-Augmented Generation — combination embedding search + LLM answer.
-- Source: user questions stream
CREATE TABLE user_questions (
user_id BIGINT,
question STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-questions'
);
-- Models
CREATE MODEL embed_model WITH (
'provider' = 'openai',
'task' = 'text-embedding-3-small'
);
CREATE MODEL llm WITH (
'provider' = 'openai',
'task' = 'gpt-4-turbo'
);
-- Vector DB (knowledge base)
CREATE TABLE knowledge_base WITH (
'connector' = 'pgvector',
'url' = 'jdbc:postgresql://...',
'table' = 'documents'
);
-- RAG pipeline в одном SQL query
INSERT INTO answers
SELECT
q.user_id,
q.question,
ml.answer
FROM user_questions q
CROSS JOIN LATERAL (
-- Step 1: embed question
SELECT ML_EMBED(MODEL embed_model, q.question) AS query_vec
) emb
CROSS JOIN LATERAL (
-- Step 2: vector search top-3 documents
SELECT STRING_AGG(content, '\n\n') AS context
FROM VECTOR_SEARCH(
TABLE knowledge_base,
'embedding',
emb.query_vec,
TOP_K => 3
)
) ctx
CROSS JOIN LATERAL (
-- Step 3: LLM generates answer с context
SELECT ML_PREDICT(
MODEL llm,
'You are a helpful assistant. Use this context:\n' || ctx.context ||
'\n\nQuestion: ' || q.question
) AS answer
) ml;
Это complete RAG pipeline в streaming SQL. Каждый user question:
- Embedded (text -> vector).
- Vector searched в knowledge base (find top-3 relevant docs).
- Context + question fed в LLM.
- LLM answer streamed в output.
Latency end-to-end: ~500ms-2s (embed ~100ms + vector search ~50ms + LLM ~500ms-2s). Acceptable для chatbot UX.
Agentic streaming
Beyond RAG — agentic patterns: LLM decides actions based на streaming events.
-- Customer service event
CREATE TABLE customer_interactions (
customer_id BIGINT,
interaction_type STRING,
details STRING,
ts TIMESTAMP(3)
);
CREATE MODEL agent WITH (
'provider' = 'anthropic',
'task' = 'claude-3-opus',
'system_prompt' = 'You are a customer service agent. Analyze interaction, decide next action. Output JSON: {action: ..., params: {...}}'
);
INSERT INTO agent_actions
SELECT
customer_id,
ML_PREDICT(MODEL agent, interaction_type || '\n' || details) AS action_json,
ts
FROM customer_interactions;
Output agent_actions consumed downstream operator that parses JSON, executes action (e.g., send email, create ticket, escalate). Closed-loop agentic system, fully streaming.
Где это fit в architecture
Flink job becomes orchestrator that combines:
- Streaming data movement.
- Stateful transformations (aggregations, joins, CEP).
- ML inference (через API providers или local models).
- Vector search (через connectors).
Это powerful, но requires careful design для performance (next lessons cover).
Performance considerations
ML inference вводит new challenges streaming:
Latency variation: API calls к LLM могут take 100ms-2s, не milliseconds. Pipeline throughput limited by API latency.
Rate limits: External APIs (OpenAI, Anthropic) имеют RPM/TPM limits. Без throttling — 429 errors, retries.
Cost: API calls $$ per token. High-throughput pipelines может easily run thousands of $/day.
Cold start: Local models — first inference initializes runtime (load model в memory, JIT compile). Slow first call.
Backpressure: If downstream (sink) slow, pipeline stalls — including ongoing inference calls. Можно lose in-flight predictions.
Solutions (deep dive в 04-async-lookup-and-caching):
- Async lookup join: non-blocking inference calls.
- Caching: same inputs -> cached predictions (avoid redundant calls).
- Rate limiting: throttle calls per second to respect API limits.
- Batching: group multiple inputs into one API call (where supported).
- Local fallback: try local model first, API only при miss.
Limitations и caveats
Flink AI новая (2025-2026), evolving fast:
1. Provider abstraction leaky.
Provider-specific behavior leaks через config options. Need understand each provider.
2. Determinism issues.
LLMs non-deterministic (temperature > 0). Same input -> different output. Exactly-once requires deterministic functions для replay correctness. Workarounds: temperature=0, idempotency keys, write output once при checkpoint.
3. Schema evolution.
If model output schema changes (new fields), pipelines break. Versioning critical.
4. Cost monitoring.
Easy to spike costs accidentally (loop in pipeline, retry storms). Monitor token consumption.
5. Vendor lock-in.
CREATE MODEL provider-specific. Switching providers — rewrite queries. Aim для abstractions if possible.
6. Vector DB ecosystem.
Connectors maturity varies. pgvector mature (just SQL), Pinecone/Milvus newer. Test before commit.
ML_PREDICT и VECTOR_SEARCH прибавляют complexity в pipeline. Если ты добавляешь LLM call в streaming job processing 100K events/sec, expect: 100K calls/sec to OpenAI = $$$, rate limits hit immediately, latency variation makes checkpoint behave erratically. Pilot с small subset before full deployment. Caching, batching, rate limiting не optional для production.
When NOT to use Flink AI
Flink AI не silver bullet. Better alternatives:
- Pure ML model serving (no streaming): use dedicated serving (TensorFlow Serving, TorchServe, KServe). Lower latency, optimized для inference.
- Batch scoring (all users nightly): use Spark или dedicated batch ML pipeline. Cheaper, simpler than streaming.
- Search (without streaming): use search engine (Elasticsearch, OpenSearch) directly. More mature features.
- Heavy preprocessing: model preprocessing complex — use Python ML pipeline (sklearn, pandas) directly.
Flink AI sweet spot: streaming + ML inference + ETL combined. Если хотя бы одно слагаемое отсутствует — probably другой tool лучше.