Learning Platform
Глоссарий Troubleshooting
Урок 17.02 · 25 мин
Продвинутый
CREATE MODELML_PREDICTModel providersAsync lookupLocal inferenceOpenAIAnthropic

CREATE MODEL DDL и ML_PREDICT TVF: details

В прошлом уроке мы посмотрели Flink AI features на high level. Теперь — детали: как именно работает CREATE MODEL для разных providers, как ML_PREDICT реализован под капотом (async vs sync), и production-level considerations.

MLlib Pipelines в Spark

CREATE MODEL: provider config

CREATE MODEL syntax extensible — каждый provider определяет свои config keys.

OpenAI

CREATE MODEL customer_service_agent
INPUT (prompt STRING)
OUTPUT (response STRING)
WITH (
  'provider' = 'openai',
  'task' = 'chat-completion',
  'api_key' = '{{ secret.openai_api_key }}',
  'base_url' = 'https://api.openai.com/v1',  -- override для Azure / proxy
  'model_name' = 'gpt-4-turbo',
  'temperature' = '0.0',
  'max_tokens' = '500',
  'top_p' = '0.95',
  'presence_penalty' = '0',
  'frequency_penalty' = '0',
  'response_format' = 'json_object',  -- json mode
  'seed' = '12345',  -- для deterministic output
  'system_prompt' = 'You are a helpful assistant.',
  'request_timeout' = '30s',
  'max_retries' = '3'
);

Critical params для production:

  • temperature = 0.0 + seed: для deterministic output (exactly-once compatibility).
  • response_format = json_object: JSON mode — guaranteed valid JSON output (parses cleanly).
  • request_timeout: bounded per-request latency. Default usually 60s (too long для streaming).
  • max_retries: retry на 429/5xx.

Anthropic

CREATE MODEL legal_analyzer
INPUT (text STRING)
OUTPUT (analysis STRING)
WITH (
  'provider' = 'anthropic',
  'task' = 'messages',
  'api_key' = '{{ secret.anthropic_api_key }}',
  'model_name' = 'claude-3-opus-20240229',
  'max_tokens' = '1000',
  'temperature' = '0',
  'system_prompt' = 'You are a legal expert...',
  'request_timeout' = '60s'
);

Similar pattern, different model names и slight config differences.

Local model (ONNX/PyTorch)

CREATE MODEL fraud_classifier
INPUT (
  amount DOUBLE,
  hour_of_day INT,
  merchant_category INT,
  user_age INT,
  user_country STRING,
  txn_velocity_24h INT
)
OUTPUT (
  fraud_score DOUBLE
)
WITH (
  'provider' = 'local',
  'task' = 'classification',
  'model_path' = 's3://models/fraud_v3.onnx',
  'runtime' = 'onnx',
  'batch_size' = '32',
  'inference_threads' = '4',
  'gpu' = 'false'
);

Configs:

  • model_path: location model file. Loaded в TaskManager memory at startup.
  • runtime: onnx, pytorch, tensorflow.
  • batch_size: collect inputs до batch_size events, run batch inference (faster than per-event for many models).
  • inference_threads: per-TaskManager threads для inference.
  • gpu: CUDA support если model GPU-compatible.

Trade-offs local vs API:

AspectLocalAPI
Latencysub-ms (with batching)100ms-2s
Throughputdepends на CPU/GPUrate limit by API
Costinfrastructureper-call $$$
Model sizelimited by TM memoryunlimited (API-side)
Updatesre-deploy jobrotate API model
Determinismyesusually no (LLM)

Local — для high-throughput structured prediction. API — для unstructured tasks (text generation, complex reasoning) где LLM needed.


ML_PREDICT semantics

ML_PREDICT — table-valued function (TVF):

SELECT *
FROM TABLE(
  ML_PREDICT(
    TABLE input_data,
    MODEL my_model,
    DESCRIPTOR(col1, col2, col3)
  )
);

Returns input table + output columns. Per-row inference.

Under hood — Flink creates MLPredictOperator в job graph. This operator:

  1. Reads input row.
  2. Constructs model input (from DESCRIPTOR columns).
  3. Calls model (sync для local, async для API).
  4. Receives output.
  5. Emits row = input + output.
ML_PREDICT operator: sync (local) vs async (API)
Local modelsyncprovider=local. ONNX/PyTorch model loaded в TM memory. Inference inline на operator thread. Latency μs-ms. Backpressure works naturally
Input rowInput row arrives
extract features
Local inferenceLocal model inference: matrix ops on CPU/GPU. Deterministic. No network. Sub-millisecond typical
Output row
API modelasyncprovider=openai/anthropic. HTTP call. Latency 100ms-2s. Async lookup join semantics — multiple concurrent calls, non-blocking
Input row NInput row arrives
dispatch async
DispatchAsyncFunction: dispatches HTTP call к API, returns CompletableFuture. Не блокирует. Multiple in-flight requests parallel (max_concurrent_requests)
APIAPI server (OpenAI etc.) processes. Subject to rate limits, latency variance. Returns response or error
callback
CallbackCallback fires when response arrives. Constructs output row, emits. С max_concurrency=100 effective throughput = 100/latency rows per subtask
Output row

Sync vs async inference

Для local models — sync. Operator blocks on inference, then proceeds. Latency low (μs-ms), backpressure works normally.

Для API models — async (default в Flink). Operator dispatches request, doesn’t block. When response arrives, emits row. Multiple in-flight requests parallel.

Async benefit: per-row latency 500ms, throughput не bounded by latency. Если operator has 100 concurrent requests in-flight, effective throughput = 100/0.5s = 200 rows/sec per operator instance. С 32 subtasks — 6400 rows/sec.

Without async: throughput = 1/latency = 2 rows/sec per subtask. With 32 subtasks — 64 rows/sec. 100x worse.

Configure async behavior:

CREATE MODEL my_model WITH (
  ...,
  'async' = 'true',           -- enable async
  'max_concurrent_requests' = '100',  -- max in-flight per operator instance
  'ordered_output' = 'true'   -- preserve order (default true)
);

ordered_output: если true, output emitted in input order (preserves stream ordering). Если false, output emitted as soon as ready (faster, but reorders).

max_concurrent_requests: bound на concurrency per operator instance. Higher = more throughput, but more API rate-limit pressure.


ML_PREDICT с aggregations и joins

ML_PREDICT can combined с other SQL constructs:

-- Window aggregation + predict
INSERT INTO predicted_churn
SELECT
  customer_id,
  TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
  predicted.churn_probability
FROM (
  SELECT
    customer_id,
    TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
    SUM(amount) AS total_amount,
    COUNT(*) AS txn_count,
    AVG(amount) AS avg_amount
  FROM transactions
  GROUP BY customer_id, TUMBLE(event_time, INTERVAL '1' HOUR)
) hourly
JOIN customer_features ON hourly.customer_id = customer_features.id
CROSS APPLY (
  SELECT churn_probability
  FROM TABLE(
    ML_PREDICT(
      VALUES (hourly.total_amount, hourly.txn_count, hourly.avg_amount,
              customer_features.age, customer_features.tenure),
      MODEL customer_churn_model,
      DESCRIPTOR(total_amount, txn_count, avg_amount, age, tenure)
    )
  )
) predicted;

Pipeline:

  1. Tumbling window aggregation на transactions.
  2. Join с customer features table.
  3. ML_PREDICT на joined data.
  4. Output churn predictions per hour.

Combination — natural в SQL, complex в imperative code. Это reason SQL-first AI valuable.


ML_PREDICT с join: feature enrichment

Common pattern — augment events с features from lookup table, then predict:

-- Lookup table (e.g., dimension)
CREATE TABLE customer_dim (
  customer_id BIGINT PRIMARY KEY,
  age INT,
  tenure INT,
  segment STRING,
  ...
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://...',
  'lookup.cache.max-rows' = '10000',
  'lookup.cache.ttl' = '1 h'
);

-- Streaming events
CREATE TABLE event_stream (
  customer_id BIGINT,
  txn_amount DOUBLE,
  event_time TIMESTAMP(3) METADATA
) WITH (
  'connector' = 'kafka',
  ...
);

-- Enrich + predict
INSERT INTO scored_events
SELECT
  e.customer_id,
  e.txn_amount,
  p.fraud_score
FROM event_stream e
JOIN customer_dim FOR SYSTEM_TIME AS OF e.event_time AS c
  ON e.customer_id = c.customer_id
CROSS APPLY (
  SELECT *
  FROM TABLE(
    ML_PREDICT(
      VALUES (e.txn_amount, c.age, c.tenure, c.segment),
      MODEL fraud_model,
      DESCRIPTOR(amount, age, tenure, segment)
    )
  )
) p
WHERE p.fraud_score > 0.8;

Steps:

  1. Event arrives in stream.
  2. JOIN с customer_dim для feature enrichment (cached lookup).
  3. ML_PREDICT scores enriched event.
  4. Filter only high-fraud.
  5. Sink.

End-to-end в одном query.


Performance considerations: under hood

For local models, MLPredictOperator works like этой:

  1. Loads model at operator open() — once per operator instance.
  2. For each input row — constructs tensor inputs, calls model.predict(), gets output tensor, deserialize в row.
  3. Optionally batches: accumulates inputs до batch_size, runs single batch inference, distributes outputs.

For API models, async с Flink’s AsyncFunction:

// Conceptually
public class MLPredictAsyncFunction extends RichAsyncFunction<RowData, RowData> {

    private OpenAIClient client;
    private int maxConcurrency;

    @Override
    public void open(Configuration parameters) {
        client = createClient();
    }

    @Override
    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) {
        String prompt = constructPrompt(input);

        CompletableFuture<String> future = client.completeAsync(prompt);

        future.thenAccept(response -> {
            RowData output = appendOutput(input, parseResponse(response));
            resultFuture.complete(Collections.singletonList(output));
        }).exceptionally(ex -> {
            if (shouldRetry(ex)) {
                // Retry logic
            } else {
                resultFuture.completeExceptionally(ex);
            }
            return null;
        });
    }

    @Override
    public void timeout(RowData input, ResultFuture<RowData> resultFuture) {
        // Если request takes longer than configured timeout
        resultFuture.complete(Collections.emptyList()); // или error
    }
}

RichAsyncFunction — built-in Flink class для async IO patterns. Manages capacity (max concurrent requests), timeout, ordering, error handling.


Backpressure и error handling

ML_PREDICT — IO operator. Backpressure scenarios:

1. API slow или rate-limited.

API responds slowly или returns 429. Async function buffer fills up. New rows from upstream blocked (backpressure). Pipeline slows.

Mitigation:

  • Retries с exponential backoff: built-in via max_retries.
  • Dead letter: failed requests after retries -> side output для inspection.
  • Adaptive concurrency: reduce max_concurrent_requests если detection of 429s.
CREATE MODEL my_model WITH (
  ...,
  'on_error' = 'dead_letter',  -- 'fail' | 'skip' | 'dead_letter'
  'dead_letter.topic' = 'kafka:failed-predictions'
);

2. Cost spike from retries.

Если permanent error (bad prompt, etc.) — retries waste $$$. Need detect non-retriable vs retriable.

Retriable: 429 (rate limit), 503 (service unavailable), network timeout. Non-retriable: 400 (bad input), 401 (auth), 404 (model not found).

'retry.on_codes' = '429,503,504'

3. Checkpoint alignment.

Async operations interact с checkpoint barriers. Flink waits for in-flight async to complete before checkpoint barrier propagates. Slow API -> long alignment time.

Mitigation:

  • Bound request_timeout — после это, async future completes (with error), barrier proceeds.
  • Monitor alignmentDurationNanos metric.

Determinism для exactly-once

LLMs (temperature > 0) — non-deterministic. Same input -> different output. Это breaks exactly-once semantics — at restore (replay from checkpoint), same input -> different output -> duplicate/inconsistent rows.

Mitigations:

  1. temperature=0: most providers honor (mostly). Same prompt -> same output (theoretical).
  2. seed parameter: explicit seed для determinism.
  3. JSON mode: structured output reduces variability.
  4. Cache (next lesson detail): once input scored, cache result. Replay uses cache, not API call.
  5. Side-effect-free check: ML_PREDICT MUST be side-effect-free (no DB writes, no API мутации) для exactly-once. Validate.

Even with these, perfect determinism с LLMs not guaranteed. Trade-off: real-time inference vs strict exactly-once.


Provider-specific quirks

OpenAI:

  • Strict rate limits (TPM = tokens-per-minute, RPM = requests-per-minute).
  • JSON mode reliable.
  • Streaming responses не supported в ML_PREDICT (would need different design).

Anthropic:

  • Slightly higher latency than OpenAI.
  • Tool use / function calling well-supported.
  • Recently added prompt caching — useful для repeated system prompts.

Azure OpenAI:

  • Same models as OpenAI но different rate limits (often more generous для enterprise).
  • Deployment-specific (model_name = your Azure deployment name).

AWS Bedrock:

  • Multi-vendor (Claude, Llama, Titan, etc.).
  • IAM-based auth (no API key).
  • Regional considerations.

Local (DJL, ONNX):

  • No network cost / latency.
  • Memory hungry (model в TaskManager heap).
  • Update = redeploy job.

Production-перспектива: deployment patterns

Pattern 1: Hybrid local + API.

Use local model для majority of events (fast, cheap), API только для special cases (e.g., unusual events flagged by local model).

-- Two-step pipeline
WITH local_scored AS (
  SELECT *, ML_PREDICT(MODEL local_fraud_model, ...) AS local_score
  FROM transactions
)
INSERT INTO final_scored
SELECT
  *,
  CASE
    WHEN local_score > 0.5  -- suspicious, get LLM analysis
    THEN ML_PREDICT(MODEL llm_analyzer, transaction_description)
    ELSE NULL
  END AS llm_analysis
FROM local_scored;

Cost: only ~5% events trigger LLM. Massive savings.

Pattern 2: Async pipeline.

Multiple ML_PREDICT stages в pipeline:

-- Step 1: embed
WITH embedded AS (
  SELECT *, ML_EMBED(MODEL embed_model, description) AS embedding
  FROM events
),
-- Step 2: vector search
enriched AS (
  SELECT e.*, similar_items
  FROM embedded e
  CROSS APPLY (
    SELECT ARRAY_AGG(item) AS similar_items
    FROM VECTOR_SEARCH(TABLE items, 'embedding', e.embedding, TOP_K => 5)
  )
),
-- Step 3: recommend
recommendations AS (
  SELECT e.user_id, ML_PREDICT(MODEL recommender, e.user_id, e.similar_items) AS recommendation
  FROM enriched e
)
INSERT INTO output SELECT * FROM recommendations;

Each step adds latency. Optimize parallelism, async, batching at each step.

Pattern 3: Snapshot for offline replay.

Сначала запиши raw events + features в Paimon. Потом batch ML inference в отдельной pipeline. Streaming inference только для use cases requiring real-time.

-- Streaming: just write events to lakehouse
INSERT INTO event_features
SELECT *, computed_features
FROM event_stream JOIN feature_table ...;

-- Batch (scheduled): predict on stored
INSERT INTO predictions
SELECT *, ML_PREDICT(MODEL m, features)
FROM event_features
WHERE date = CURRENT_DATE;

Cheaper, simpler — но latency multi-hour.

WARNING

ML_PREDICT в streaming с external API — это powerful but easy to misuse. Top mistakes: (1) Not respecting rate limits — pipeline 429s constantly, retries spike costs. (2) No timeouts — single hung request blocks checkpoint forever. (3) Determinism issues breaking exactly-once silently. Always: bound timeouts, configure retries explicitly, cache aggressively, monitor cost/throughput.


Проверка знанийKnowledge check
Production: streaming fraud detection. CREATE MODEL fraud_model (provider=openai, gpt-4) + ML_PREDICT на каждой transaction. Throughput 1000 txn/sec target. После deployment: throughput только 30 txn/sec, OpenAI bill $5000/час, 30% requests fail (429). Что не так и redesign?
ОтветAnswer
Multi-cause analysis. (1) GPT-4 wrong tool: для structured fraud scoring. GPT-4 reasoning model для complex text, не для numeric prediction. Latency 1-2 sec per call, expensive ($0.03-0.06/1K tokens). 1000 txn/sec * average $0.05 per call = $50/sec = $180K/day. $5000/час confirms catastrophic cost. (2) Rate limits: OpenAI GPT-4 default ~150K TPM. 1000 txn/sec * even 1K tokens/call = 1M TPM — 7x over limit. 30% failure (429) makes sense. (3) Throughput 30 txn/sec — latency-limited (1-2s/call, even with async concurrency). Bottlenecked by API. Redesign: (a) Replace GPT-4 со специализированной моделью. Gradient boosted tree (XGBoost) или small neural net (TabNet) trained на historical fraud. Convert to ONNX, deploy CREATE MODEL provider=local. Inference under 1ms per call, no API cost, no rate limits, deterministic. Throughput 1000/sec easy on single TM. (b) Если LLM-style reasoning needed (e.g., understand free-text transaction description), use hybrid: local model scores all (fast filter), only 1-2% flagged sent to LLM для analysis. 1000 * 0.01 = 10 LLM calls/sec — easy under rate limits, $50/час cost — manageable. (c) Add caching: ML_PREDICT cache by transaction characteristics. Repeated similar transactions hit cache. (d) Async + batching where supported. (e) Monitoring: alerts on cost per hour, error rate, throughput. Key takeaway: ML_PREDICT с LLM API powerful but for narrow use cases. Match model к task — structured ML for structured prediction, LLM for unstructured reasoning. Don't use Ferrari for grocery shopping.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В CREATE MODEL DDL, какая разница provider=openai (API) vs provider=local (ONNX model в TaskManager)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4