CREATE MODEL DDL и ML_PREDICT TVF: details
В прошлом уроке мы посмотрели Flink AI features на high level. Теперь — детали: как именно работает CREATE MODEL для разных providers, как ML_PREDICT реализован под капотом (async vs sync), и production-level considerations.
CREATE MODEL: provider config
CREATE MODEL syntax extensible — каждый provider определяет свои config keys.
OpenAI
CREATE MODEL customer_service_agent
INPUT (prompt STRING)
OUTPUT (response STRING)
WITH (
'provider' = 'openai',
'task' = 'chat-completion',
'api_key' = '{{ secret.openai_api_key }}',
'base_url' = 'https://api.openai.com/v1', -- override для Azure / proxy
'model_name' = 'gpt-4-turbo',
'temperature' = '0.0',
'max_tokens' = '500',
'top_p' = '0.95',
'presence_penalty' = '0',
'frequency_penalty' = '0',
'response_format' = 'json_object', -- json mode
'seed' = '12345', -- для deterministic output
'system_prompt' = 'You are a helpful assistant.',
'request_timeout' = '30s',
'max_retries' = '3'
);
Critical params для production:
- temperature = 0.0 + seed: для deterministic output (exactly-once compatibility).
- response_format = json_object: JSON mode — guaranteed valid JSON output (parses cleanly).
- request_timeout: bounded per-request latency. Default usually 60s (too long для streaming).
- max_retries: retry на 429/5xx.
Anthropic
CREATE MODEL legal_analyzer
INPUT (text STRING)
OUTPUT (analysis STRING)
WITH (
'provider' = 'anthropic',
'task' = 'messages',
'api_key' = '{{ secret.anthropic_api_key }}',
'model_name' = 'claude-3-opus-20240229',
'max_tokens' = '1000',
'temperature' = '0',
'system_prompt' = 'You are a legal expert...',
'request_timeout' = '60s'
);
Similar pattern, different model names и slight config differences.
Local model (ONNX/PyTorch)
CREATE MODEL fraud_classifier
INPUT (
amount DOUBLE,
hour_of_day INT,
merchant_category INT,
user_age INT,
user_country STRING,
txn_velocity_24h INT
)
OUTPUT (
fraud_score DOUBLE
)
WITH (
'provider' = 'local',
'task' = 'classification',
'model_path' = 's3://models/fraud_v3.onnx',
'runtime' = 'onnx',
'batch_size' = '32',
'inference_threads' = '4',
'gpu' = 'false'
);
Configs:
- model_path: location model file. Loaded в TaskManager memory at startup.
- runtime: onnx, pytorch, tensorflow.
- batch_size: collect inputs до batch_size events, run batch inference (faster than per-event for many models).
- inference_threads: per-TaskManager threads для inference.
- gpu: CUDA support если model GPU-compatible.
Trade-offs local vs API:
| Aspect | Local | API |
|---|---|---|
| Latency | sub-ms (with batching) | 100ms-2s |
| Throughput | depends на CPU/GPU | rate limit by API |
| Cost | infrastructure | per-call $$$ |
| Model size | limited by TM memory | unlimited (API-side) |
| Updates | re-deploy job | rotate API model |
| Determinism | yes | usually no (LLM) |
Local — для high-throughput structured prediction. API — для unstructured tasks (text generation, complex reasoning) где LLM needed.
ML_PREDICT semantics
ML_PREDICT — table-valued function (TVF):
SELECT *
FROM TABLE(
ML_PREDICT(
TABLE input_data,
MODEL my_model,
DESCRIPTOR(col1, col2, col3)
)
);
Returns input table + output columns. Per-row inference.
Under hood — Flink creates MLPredictOperator в job graph. This operator:
- Reads input row.
- Constructs model input (from DESCRIPTOR columns).
- Calls model (sync для local, async для API).
- Receives output.
- Emits row = input + output.
Sync vs async inference
Для local models — sync. Operator blocks on inference, then proceeds. Latency low (μs-ms), backpressure works normally.
Для API models — async (default в Flink). Operator dispatches request, doesn’t block. When response arrives, emits row. Multiple in-flight requests parallel.
Async benefit: per-row latency 500ms, throughput не bounded by latency. Если operator has 100 concurrent requests in-flight, effective throughput = 100/0.5s = 200 rows/sec per operator instance. С 32 subtasks — 6400 rows/sec.
Without async: throughput = 1/latency = 2 rows/sec per subtask. With 32 subtasks — 64 rows/sec. 100x worse.
Configure async behavior:
CREATE MODEL my_model WITH (
...,
'async' = 'true', -- enable async
'max_concurrent_requests' = '100', -- max in-flight per operator instance
'ordered_output' = 'true' -- preserve order (default true)
);
ordered_output: если true, output emitted in input order (preserves stream ordering). Если false, output emitted as soon as ready (faster, but reorders).
max_concurrent_requests: bound на concurrency per operator instance. Higher = more throughput, but more API rate-limit pressure.
ML_PREDICT с aggregations и joins
ML_PREDICT can combined с other SQL constructs:
-- Window aggregation + predict
INSERT INTO predicted_churn
SELECT
customer_id,
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
predicted.churn_probability
FROM (
SELECT
customer_id,
TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
SUM(amount) AS total_amount,
COUNT(*) AS txn_count,
AVG(amount) AS avg_amount
FROM transactions
GROUP BY customer_id, TUMBLE(event_time, INTERVAL '1' HOUR)
) hourly
JOIN customer_features ON hourly.customer_id = customer_features.id
CROSS APPLY (
SELECT churn_probability
FROM TABLE(
ML_PREDICT(
VALUES (hourly.total_amount, hourly.txn_count, hourly.avg_amount,
customer_features.age, customer_features.tenure),
MODEL customer_churn_model,
DESCRIPTOR(total_amount, txn_count, avg_amount, age, tenure)
)
)
) predicted;
Pipeline:
- Tumbling window aggregation на transactions.
- Join с customer features table.
- ML_PREDICT на joined data.
- Output churn predictions per hour.
Combination — natural в SQL, complex в imperative code. Это reason SQL-first AI valuable.
ML_PREDICT с join: feature enrichment
Common pattern — augment events с features from lookup table, then predict:
-- Lookup table (e.g., dimension)
CREATE TABLE customer_dim (
customer_id BIGINT PRIMARY KEY,
age INT,
tenure INT,
segment STRING,
...
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://...',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '1 h'
);
-- Streaming events
CREATE TABLE event_stream (
customer_id BIGINT,
txn_amount DOUBLE,
event_time TIMESTAMP(3) METADATA
) WITH (
'connector' = 'kafka',
...
);
-- Enrich + predict
INSERT INTO scored_events
SELECT
e.customer_id,
e.txn_amount,
p.fraud_score
FROM event_stream e
JOIN customer_dim FOR SYSTEM_TIME AS OF e.event_time AS c
ON e.customer_id = c.customer_id
CROSS APPLY (
SELECT *
FROM TABLE(
ML_PREDICT(
VALUES (e.txn_amount, c.age, c.tenure, c.segment),
MODEL fraud_model,
DESCRIPTOR(amount, age, tenure, segment)
)
)
) p
WHERE p.fraud_score > 0.8;
Steps:
- Event arrives in stream.
- JOIN с customer_dim для feature enrichment (cached lookup).
- ML_PREDICT scores enriched event.
- Filter only high-fraud.
- Sink.
End-to-end в одном query.
Performance considerations: under hood
For local models, MLPredictOperator works like этой:
- Loads model at operator open() — once per operator instance.
- For each input row — constructs tensor inputs, calls model.predict(), gets output tensor, deserialize в row.
- Optionally batches: accumulates inputs до batch_size, runs single batch inference, distributes outputs.
For API models, async с Flink’s AsyncFunction:
// Conceptually
public class MLPredictAsyncFunction extends RichAsyncFunction<RowData, RowData> {
private OpenAIClient client;
private int maxConcurrency;
@Override
public void open(Configuration parameters) {
client = createClient();
}
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) {
String prompt = constructPrompt(input);
CompletableFuture<String> future = client.completeAsync(prompt);
future.thenAccept(response -> {
RowData output = appendOutput(input, parseResponse(response));
resultFuture.complete(Collections.singletonList(output));
}).exceptionally(ex -> {
if (shouldRetry(ex)) {
// Retry logic
} else {
resultFuture.completeExceptionally(ex);
}
return null;
});
}
@Override
public void timeout(RowData input, ResultFuture<RowData> resultFuture) {
// Если request takes longer than configured timeout
resultFuture.complete(Collections.emptyList()); // или error
}
}
RichAsyncFunction — built-in Flink class для async IO patterns. Manages capacity (max concurrent requests), timeout, ordering, error handling.
Backpressure и error handling
ML_PREDICT — IO operator. Backpressure scenarios:
1. API slow или rate-limited.
API responds slowly или returns 429. Async function buffer fills up. New rows from upstream blocked (backpressure). Pipeline slows.
Mitigation:
- Retries с exponential backoff: built-in via max_retries.
- Dead letter: failed requests after retries -> side output для inspection.
- Adaptive concurrency: reduce max_concurrent_requests если detection of 429s.
CREATE MODEL my_model WITH (
...,
'on_error' = 'dead_letter', -- 'fail' | 'skip' | 'dead_letter'
'dead_letter.topic' = 'kafka:failed-predictions'
);
2. Cost spike from retries.
Если permanent error (bad prompt, etc.) — retries waste $$$. Need detect non-retriable vs retriable.
Retriable: 429 (rate limit), 503 (service unavailable), network timeout. Non-retriable: 400 (bad input), 401 (auth), 404 (model not found).
'retry.on_codes' = '429,503,504'
3. Checkpoint alignment.
Async operations interact с checkpoint barriers. Flink waits for in-flight async to complete before checkpoint barrier propagates. Slow API -> long alignment time.
Mitigation:
- Bound
request_timeout— после это, async future completes (with error), barrier proceeds. - Monitor
alignmentDurationNanosmetric.
Determinism для exactly-once
LLMs (temperature > 0) — non-deterministic. Same input -> different output. Это breaks exactly-once semantics — at restore (replay from checkpoint), same input -> different output -> duplicate/inconsistent rows.
Mitigations:
- temperature=0: most providers honor (mostly). Same prompt -> same output (theoretical).
- seed parameter: explicit seed для determinism.
- JSON mode: structured output reduces variability.
- Cache (next lesson detail): once input scored, cache result. Replay uses cache, not API call.
- Side-effect-free check: ML_PREDICT MUST be side-effect-free (no DB writes, no API мутации) для exactly-once. Validate.
Even with these, perfect determinism с LLMs not guaranteed. Trade-off: real-time inference vs strict exactly-once.
Provider-specific quirks
OpenAI:
- Strict rate limits (TPM = tokens-per-minute, RPM = requests-per-minute).
- JSON mode reliable.
- Streaming responses не supported в ML_PREDICT (would need different design).
Anthropic:
- Slightly higher latency than OpenAI.
- Tool use / function calling well-supported.
- Recently added prompt caching — useful для repeated system prompts.
Azure OpenAI:
- Same models as OpenAI но different rate limits (often more generous для enterprise).
- Deployment-specific (model_name = your Azure deployment name).
AWS Bedrock:
- Multi-vendor (Claude, Llama, Titan, etc.).
- IAM-based auth (no API key).
- Regional considerations.
Local (DJL, ONNX):
- No network cost / latency.
- Memory hungry (model в TaskManager heap).
- Update = redeploy job.
Production-перспектива: deployment patterns
Pattern 1: Hybrid local + API.
Use local model для majority of events (fast, cheap), API только для special cases (e.g., unusual events flagged by local model).
-- Two-step pipeline
WITH local_scored AS (
SELECT *, ML_PREDICT(MODEL local_fraud_model, ...) AS local_score
FROM transactions
)
INSERT INTO final_scored
SELECT
*,
CASE
WHEN local_score > 0.5 -- suspicious, get LLM analysis
THEN ML_PREDICT(MODEL llm_analyzer, transaction_description)
ELSE NULL
END AS llm_analysis
FROM local_scored;
Cost: only ~5% events trigger LLM. Massive savings.
Pattern 2: Async pipeline.
Multiple ML_PREDICT stages в pipeline:
-- Step 1: embed
WITH embedded AS (
SELECT *, ML_EMBED(MODEL embed_model, description) AS embedding
FROM events
),
-- Step 2: vector search
enriched AS (
SELECT e.*, similar_items
FROM embedded e
CROSS APPLY (
SELECT ARRAY_AGG(item) AS similar_items
FROM VECTOR_SEARCH(TABLE items, 'embedding', e.embedding, TOP_K => 5)
)
),
-- Step 3: recommend
recommendations AS (
SELECT e.user_id, ML_PREDICT(MODEL recommender, e.user_id, e.similar_items) AS recommendation
FROM enriched e
)
INSERT INTO output SELECT * FROM recommendations;
Each step adds latency. Optimize parallelism, async, batching at each step.
Pattern 3: Snapshot for offline replay.
Сначала запиши raw events + features в Paimon. Потом batch ML inference в отдельной pipeline. Streaming inference только для use cases requiring real-time.
-- Streaming: just write events to lakehouse
INSERT INTO event_features
SELECT *, computed_features
FROM event_stream JOIN feature_table ...;
-- Batch (scheduled): predict on stored
INSERT INTO predictions
SELECT *, ML_PREDICT(MODEL m, features)
FROM event_features
WHERE date = CURRENT_DATE;
Cheaper, simpler — но latency multi-hour.
ML_PREDICT в streaming с external API — это powerful but easy to misuse. Top mistakes: (1) Not respecting rate limits — pipeline 429s constantly, retries spike costs. (2) No timeouts — single hung request blocks checkpoint forever. (3) Determinism issues breaking exactly-once silently. Always: bound timeouts, configure retries explicitly, cache aggressively, monitor cost/throughput.