Learning Platform
Глоссарий Troubleshooting
Урок 17.04 · 24 мин
Продвинутый
Async lookupCachingBackpressureRate limitingML observabilityCost optimization

Async lookup и caching: production strategies для AI

В предыдущих уроках мы посмотрели Flink AI features. Теперь focus на production engineering — как сделать AI pipelines reliable, cost-effective, observable. External API calls (LLM, embedding) добавляют new failure modes (rate limits, timeouts, hangs), новые costs (per-token billing), new variance (LLM latency 100ms-5s).

Этот урок: async lookup deep dive, caching strategies, backpressure handling, rate limiting, cost monitoring.

Async I/O оператор в Flink

Async lookup: фундамент

Async lookup join — Flink primitive, на котором построены ML_PREDICT и VECTOR_SEARCH для external providers.

Sync lookup blocks operator thread on each call:

Input row -> call API (500ms wait) -> emit output -> next row -> call API -> ...
Throughput: 1/latency = 2 rows/sec per operator

Async lookup dispatches calls в parallel, blocks only when waiting for results:

Input row 1 -> dispatch API (start callback)
Input row 2 -> dispatch API
Input row 3 -> dispatch API
... (up to maxConcurrency in flight)
Row 1 callback -> emit
Row 2 callback -> emit
...
Throughput: maxConcurrency / latency

С maxConcurrency = 100, latency 500ms — throughput 200 rows/sec per operator instance.

Implementation в Flink via RichAsyncFunction:

public class AsyncInferenceFunction extends RichAsyncFunction<RowData, RowData> {

    @Override
    public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) {
        CompletableFuture<String> apiCall = client.predict(input);

        apiCall
            .thenAccept(response -> {
                RowData enriched = enrich(input, response);
                resultFuture.complete(Collections.singletonList(enriched));
            })
            .exceptionally(ex -> {
                if (isRetriable(ex) && retriesRemaining > 0) {
                    scheduleRetry(...);
                } else {
                    resultFuture.completeExceptionally(ex);
                }
                return null;
            });
    }

    @Override
    public void timeout(RowData input, ResultFuture<RowData> resultFuture) {
        // Called when async future doesn't complete within configured timeout
        if (failOnTimeout) {
            resultFuture.completeExceptionally(new TimeoutException());
        } else {
            // Emit with default value
            resultFuture.complete(Collections.singletonList(input)); // skip prediction
        }
    }
}

Configuration в DataStream API:

AsyncDataStream.unorderedWait(
    input,
    new AsyncInferenceFunction(),
    30, TimeUnit.SECONDS,  // timeout
    100  // capacity (max concurrent)
);

Configuration в SQL:

CREATE MODEL m WITH (
  ...,
  'async' = 'true',
  'max_concurrent_requests' = '100',
  'request_timeout' = '30s',
  'ordered_output' = 'true'
);

Ordered vs unordered output

Async lookup throughput: sync vs async с max_concurrency
Sync mode2 rows/secOperator blocks on each call. Latency 500ms per row -> throughput 1/0.5 = 2 rows/sec per operator. Wasted concurrency capability
Row 1
500ms wait
Row 2
500ms wait
Row N
Async, max=100200 rows/sec100 concurrent in-flight requests. Operator dispatches all, callbacks process. Throughput = 100/0.5 = 200 rows/sec per operator. С 32 subtasks -> 6400 rows/sec total. 100x improvement
Row 1 dispatch
Row 2 dispatch
Row N dispatch
parallel
parallel
parallel
APIAPI processes multiple concurrent requests. Subject to rate limits TPM/RPM
callbacks emit
Output stream

Ordered: output preserves input order. Если row 1 takes 500ms, row 2 takes 100ms — row 2 result waits for row 1. Latency depends на slow rows.

Unordered: output emitted ASAP — row 2 emit first if completes first. Higher throughput, но stream reordered.

When choose:

  • Ordered for: stateful downstream (e.g., aggregations с timestamps), order matters semantically.
  • Unordered for: independent enrichment, downstream stateless, throughput critical.

Most ML enrichment — unordered acceptable. Downstream usually filters / aggregates, не cares of order.

'ordered_output' = 'false'  -- unordered (faster)
'ordered_output' = 'true'   -- ordered (default, safer)

Backpressure dynamics

ML_PREDICT operator имеет capacity (max_concurrent_requests). Когда capacity full:

  1. Operator buffer fills.
  2. Upstream operator blocked (can’t push more rows).
  3. Backpressure propagates upstream.
  4. Source stops fetching new events.

Это natural Flink backpressure mechanism. Works OK для steady-state — pipeline стабилизируется на effective throughput = maxConcurrency / latency.

Problems:

  • Latency variance: если single call hangs (provider slow), capacity blocked.
  • Bursts: input spike beyond steady-state, backpressure builds, lag grows.
  • Rate limits: API responds 429 -> must back off -> effective throughput drops.

Mitigations:

CREATE MODEL m WITH (
  'request_timeout' = '10s',           -- bound per-call latency
  'max_retries' = '3',
  'retry.initial_delay' = '100ms',
  'retry.max_delay' = '5s',
  'retry.exponential_backoff' = 'true',
  'rate_limit.requests_per_second' = '50',  -- explicit rate limit
  'circuit_breaker.failure_threshold' = '50%',  -- if 50% calls fail in window
  'circuit_breaker.window' = '1m',
  'circuit_breaker.open_duration' = '30s'  -- pause calls for 30s if circuit open
);

Circuit breaker: если provider degrades, ML_PREDICT stops calling temporarily — let provider recover. Calls fail-fast (или fallback) instead of building backlog.


Caching стратегии

Caching critical для ML cost / latency. Different cache levels:

1. Result caching

Cache: input -> output. Same input -> cached result, no provider call.

CREATE MODEL embed_model WITH (
  ...,
  'cache.enabled' = 'true',
  'cache.ttl' = '1d',
  'cache.max_entries' = '100000',
  'cache.storage' = 'rocksdb',  -- 'heap' for fast small cache
  'cache.key' = 'hash(input)'  -- default: hash of all input cols
);

Cache hit benefits:

  • No API call -> no cost.
  • Latency микросекунды vs hundreds milliseconds.
  • No rate limit pressure.

Hit rate depends на data:

  • Embedding (text repeats often): 50-90%.
  • LLM prediction (unique inputs): 5-30%.
  • Vector search (queries repeat): 30-70%.

Trade-offs:

  • Memory / storage для cache.
  • Staleness (TTL — outputs могут change если model updated).
  • Determinism — same input -> same cached output (good for exactly-once).

2. Embedding cache

For text embeddings — особенно effective. Same text -> same embedding (deterministic):

CREATE MODEL embed_model WITH (
  'cache.enabled' = 'true',
  'cache.ttl' = '7d',  -- embeddings stable
  'cache.storage' = 'rocksdb'  -- many entries, fit on disk
);

Heavy usage: documents в RAG ingestion get re-embedded если pipeline restarts — cache prevents re-cost.

3. Vector search cache

Cache: query embedding -> top-K results. Repeated similar queries hit cache.

'vector_search.cache.enabled' = 'true',
'vector_search.cache.ttl' = '1h'

Tricky: similar but not identical embeddings shouldn’t share cache (results may differ). Solution: cache by exact embedding vector hash. Hit rate lower but correctness maintained.

4. Negative result caching

Cache “не работает” (404, validation error). Repeat same bad input -> fail fast, не retry storm.


Rate limiting

API providers имеют rate limits — exceeding causes 429s, slowdowns.

OpenAI:

  • TPM (tokens per minute): GPT-4 ~150K (default), can request increase.
  • RPM (requests per minute): few thousand для GPT-4.
  • TPD (tokens per day) for some models.

Anthropic:

  • Similar TPM / RPM model.

Need throttle pipeline к stay under limits. ML_PREDICT supports:

'rate_limit.requests_per_second' = '50',
'rate_limit.tokens_per_minute' = '100000',
'rate_limit.algorithm' = 'token_bucket'

Strategy:

  • Token bucket: requests refilled at constant rate, can burst до bucket size.
  • Sliding window: count requests в last 60s window, block если over.
  • Per-key rate limit: separate limit per API key (useful если multiple keys).

Distributed pipelines: rate limit per operator instance, не global. С 16 subtasks и rate_limit = 50 RPS — total 16 * 50 = 800 RPS. Need set per-instance limit accordingly.

Coordinated rate limit (cross-instance): через distributed counter (Redis, etc.). Complex, ускоряет network. Worth it только для tight limits.


Cost monitoring

ML API costs могут spike easily. Monitor:

-- Track token consumption через metrics
SELECT
  model_name,
  SUM(input_tokens) AS total_input_tokens,
  SUM(output_tokens) AS total_output_tokens,
  COUNT(*) AS total_calls,
  AVG(latency_ms) AS avg_latency
FROM ml_metrics
GROUP BY model_name, TUMBLE(ts, INTERVAL '1' MINUTE);

Flink ML operators expose metrics:

  • flink_taskmanager_job_task_operator_ml_input_tokens_total
  • flink_taskmanager_job_task_operator_ml_output_tokens_total
  • flink_taskmanager_job_task_operator_ml_cache_hits_total
  • flink_taskmanager_job_task_operator_ml_cache_misses_total
  • flink_taskmanager_job_task_operator_ml_calls_failed_total
  • flink_taskmanager_job_task_operator_ml_latency_milliseconds

Alerts:

  • Cost per hour spike > threshold.
  • Cache hit rate drops below 50%.
  • Error rate > 5%.
  • Latency P99 > 5s.

Cost optimization checklist:

  1. Caching — biggest win, often 50%+ reduction.
  2. Use cheapest model that meets quality bar (Haiku vs Opus, gpt-4o-mini vs gpt-4).
  3. Batch where supported — OpenAI embeddings support batch (single call для multiple texts).
  4. Limit max_tokens — prevents long completions.
  5. Reduce input context — feed only relevant context, не full docs.
  6. Pre-filter — score with cheap model first, expensive только for select cases.

Hybrid pipelines: cheap + expensive

Common production pattern:

WITH initial_score AS (
  -- Step 1: cheap local model scores all
  SELECT *, ML_PREDICT(MODEL local_classifier, features) AS quick_score
  FROM events
),
suspicious AS (
  -- Step 2: only flagged events go to expensive LLM
  SELECT *
  FROM initial_score
  WHERE quick_score > 0.7
),
deep_analysis AS (
  -- Step 3: expensive LLM analysis on suspicious
  SELECT *, ML_PREDICT(MODEL llm, transaction_description) AS llm_analysis
  FROM suspicious
)
INSERT INTO alerts
SELECT * FROM deep_analysis WHERE llm_analysis.confidence > 0.8;

Economics:

  • Local model: 1M events / sec, $0.
  • Filter: 99% events skipped (cheap_score below 0.7).
  • LLM call: 10K events / sec * 0.001average=0.001 average = 10/sec = $36K/day.

vs LLM на всё: 1M events / sec * 0.001=0.001 = 1000/sec = $86M/day. 2400x more.

Hybrid pattern essential для high-throughput с LLM in loop.


Side outputs для failures

ML calls fail (network, rate limit, bad input). Don’t drop events silently:

final OutputTag<RowData> failedTag = new OutputTag<>("failed-predictions") {};

DataStream<RowData> input = ...;

SingleOutputStreamOperator<RowData> predicted = input
    .process(new ProcessFunction<RowData, RowData>() {
        @Override
        public void processElement(RowData row, Context ctx, Collector<RowData> out) {
            try {
                RowData enriched = predict(row);
                out.collect(enriched);
            } catch (Exception e) {
                ctx.output(failedTag, row); // side output
            }
        }
    });

DataStream<RowData> failed = predicted.getSideOutput(failedTag);
failed.sinkTo(deadLetterSink);

// Continue with successful predictions
predicted.sinkTo(mainSink);

Failed events accumulated в dead letter sink. Periodically inspect, retry, alert ops if rate high.


Observability dashboard

Critical metrics для production ML pipeline:

MetricWhat it tells
numRecordsInPerSecondinput throughput
numRecordsOutPerSecondoutput throughput
Difference between in/outdropped / failed
ml_latency_p99API latency tail
ml_calls_failed_total rateerror rate
ml_cache_hit_ratecost optimization indicator
ml_tokens_total cost = tokens * priceactual $$$
backpressure per operatorbottleneck location
checkpoint_durationhealth (long -> ML stalls)

Dashboard:

  • Time series: throughput, latency, errors, cost.
  • Histograms: latency P50/P95/P99.
  • Heatmap: errors by type.
  • Alerts: anomalies на любой метрике.

Failure scenarios и recovery

Common production incidents и handling:

Incident: API provider outage (503 spike).

  • Symptoms: high error rate, latency spike.
  • Auto: circuit breaker opens, calls fail fast, dead letter sink fills.
  • Manual: page on-call, investigate provider status. If short outage, wait. If long, switch к fallback provider.

Incident: Rate limit hit (429 spike).

  • Symptoms: throughput drops, 429 errors.
  • Auto: rate limiter throttles, exponential backoff kicks in.
  • Manual: investigate cause (input surge? key issue?). Request rate limit increase or adjust pipeline.

Incident: Latency spike (P99 multi-second).

  • Symptoms: throughput drops, checkpoint duration grows.
  • Auto: timeout cancels long calls, pipeline continues.
  • Manual: investigate provider, consider switching, scale concurrency.

Incident: Cost spike (10x normal).

  • Symptoms: cost dashboard alert.
  • Investigate: input volume increase? cache misses? Retries storm?
  • Action: stop pipeline if accidental loop. Adjust pipeline. Add cache. Reduce model usage.

Cost optimization checklist (detailed)

Practical playbook:

  1. Cache embeddings (hit rate 50-90% typical). RocksDB cache, 7-day TTL.

  2. Cache LLM responses for repeatable inputs (e.g., FAQ answers). Hit rate 10-30%.

  3. Filter before predict: drop irrelevant events early. Don’t predict on noise.

  4. Use smaller model: Claude Haiku 60x cheaper than Opus. GPT-4o-mini 100x cheaper than GPT-4. Often quality sufficient.

  5. Batch where supported: embedding APIs support batch (1 call для 100 texts). 100x throughput.

  6. Hybrid pipelines: cheap + expensive. 99% cheap, 1% expensive.

  7. Truncate inputs: max input length. Don’t feed entire documents если irrelevant.

  8. Limit output: max_tokens parameter. Don’t pay для unbounded generations.

  9. Determinism (temperature=0, seed): consistent outputs cache better.

  10. Monitor cost continuously: alert on hourly spend exceeding budget.

WARNING

ML cost can spike orders of magnitude faster than infrastructure cost. Bad query can cost 10K/hour.CompareкFlinkTMcost(10K/hour. Compare к Flink TM cost (0.5/hour). Treat ML calls as expensive external service: rate limit aggressively, cache always, monitor cost continuously, alert на anomalies. Cost is operational concern just as CPU/memory.


Real-world: monitoring setup

Production deployment includes:

# Prometheus rules
groups:
  - name: ml_pipeline
    rules:
      - alert: HighMLErrorRate
        expr: rate(flink_ml_calls_failed_total[5m]) > 0.05
        annotations:
          summary: "ML pipeline error rate > 5%"

      - alert: LowCacheHitRate
        expr: rate(flink_ml_cache_hits_total[10m]) / rate(flink_ml_calls_total[10m]) < 0.3
        annotations:
          summary: "Cache hit rate below 30% — review cache config"

      - alert: HighMLCost
        expr: rate(flink_ml_tokens_total[1h]) * 0.001 > 100
        annotations:
          summary: "ML cost > $100/hour"

      - alert: HighMLLatency
        expr: flink_ml_latency_p99 > 5000
        annotations:
          summary: "ML P99 latency > 5s"

Grafana dashboard:

  • Throughput timeline.
  • Latency P50/P95/P99 timeline.
  • Cost per hour timeline.
  • Cache hit rate gauge.
  • Error breakdown by type.
  • Per-model breakdown (if multiple models).

This setup catches issues early. Incident response cost: 5 min vs hours.


Проверка знанийKnowledge check
Production RAG pipeline: 1000 questions/sec, FRESHNESS = 30 sec. Currently: ML_EMBED (OpenAI) -> VECTOR_SEARCH (pgvector) -> ML_PREDICT (Claude Haiku). Costs $30K/day, sometimes 429 errors на OpenAI, P99 latency 3s. Optimize cost + reliability + latency. Top 5 changes?
ОтветAnswer
Multi-faceted optimization. (1) AGGRESSIVE EMBEDDING CACHE: text-embedding-3-small cache with TTL 7d, RocksDB storage 1M entries. Same questions repeat — hit rate 40-70%. Saves embedding API cost (40-70%) and latency (cached ~ μs vs 100ms). Configure max_entries большой (1M), TTL длинный (week+) — embeddings stable. (2) LOCAL EMBEDDING MODEL: switch к sentence-transformers (all-MiniLM-L6-v2) via ONNX. Free, ~5ms latency, 384-dim vs 1536-dim (smaller vector storage). Quality somewhat lower (~5% recall drop), but combined с reranking still good. Eliminates OpenAI embedding cost entirely. Combined с (1) — 99% cost reduction для embedding step. (3) BATCH EMBEDDING: if local model, batch 32-64 inputs per inference call (small overhead vs single). Throughput 32x. Reduces concurrent operator load. (4) LLM CACHE: cache Claude Haiku responses by (top-3 doc ids, normalized question). Same questions с same retrieved docs -> cached answer. Hit rate ~20-30% для FAQs. Helps both cost и P99 latency. (5) RATE LIMITING explicit: configure OpenAI ML_PREDICT с rate_limit.requests_per_second matching reserved capacity. Prevents 429 errors (provider returns less stress on retries). Combined с cache misses going below limit. (6) CIRCUIT BREAKER: on OpenAI provider degradation, fall back к cached responses or '@@please try again later'. Auto-recovery. (7) BATCH RAG: pre-compute most common questions answers offline (top 1000 popular questions, batch precompute), store as cache. Front-line cache catches majority. After optimization: $30K/day -> ~$3-5K/day (10x). P99 under 1s (cache + local). Error rate under 0.5%. Lesson: ML pipeline cost optimization layered — caching, smaller models, batching, hybrid all compose.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Production RAG pipeline cost $30K/day. Какие top-3 optimization приёмов?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4