Async lookup и caching: production strategies для AI
В предыдущих уроках мы посмотрели Flink AI features. Теперь focus на production engineering — как сделать AI pipelines reliable, cost-effective, observable. External API calls (LLM, embedding) добавляют new failure modes (rate limits, timeouts, hangs), новые costs (per-token billing), new variance (LLM latency 100ms-5s).
Этот урок: async lookup deep dive, caching strategies, backpressure handling, rate limiting, cost monitoring.
Async I/O оператор в FlinkAsync lookup: фундамент
Async lookup join — Flink primitive, на котором построены ML_PREDICT и VECTOR_SEARCH для external providers.
Sync lookup blocks operator thread on each call:
Input row -> call API (500ms wait) -> emit output -> next row -> call API -> ...
Throughput: 1/latency = 2 rows/sec per operator
Async lookup dispatches calls в parallel, blocks only when waiting for results:
Input row 1 -> dispatch API (start callback)
Input row 2 -> dispatch API
Input row 3 -> dispatch API
... (up to maxConcurrency in flight)
Row 1 callback -> emit
Row 2 callback -> emit
...
Throughput: maxConcurrency / latency
С maxConcurrency = 100, latency 500ms — throughput 200 rows/sec per operator instance.
Implementation в Flink via RichAsyncFunction:
public class AsyncInferenceFunction extends RichAsyncFunction<RowData, RowData> {
@Override
public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) {
CompletableFuture<String> apiCall = client.predict(input);
apiCall
.thenAccept(response -> {
RowData enriched = enrich(input, response);
resultFuture.complete(Collections.singletonList(enriched));
})
.exceptionally(ex -> {
if (isRetriable(ex) && retriesRemaining > 0) {
scheduleRetry(...);
} else {
resultFuture.completeExceptionally(ex);
}
return null;
});
}
@Override
public void timeout(RowData input, ResultFuture<RowData> resultFuture) {
// Called when async future doesn't complete within configured timeout
if (failOnTimeout) {
resultFuture.completeExceptionally(new TimeoutException());
} else {
// Emit with default value
resultFuture.complete(Collections.singletonList(input)); // skip prediction
}
}
}
Configuration в DataStream API:
AsyncDataStream.unorderedWait(
input,
new AsyncInferenceFunction(),
30, TimeUnit.SECONDS, // timeout
100 // capacity (max concurrent)
);
Configuration в SQL:
CREATE MODEL m WITH (
...,
'async' = 'true',
'max_concurrent_requests' = '100',
'request_timeout' = '30s',
'ordered_output' = 'true'
);
Ordered vs unordered output
Ordered: output preserves input order. Если row 1 takes 500ms, row 2 takes 100ms — row 2 result waits for row 1. Latency depends на slow rows.
Unordered: output emitted ASAP — row 2 emit first if completes first. Higher throughput, но stream reordered.
When choose:
- Ordered for: stateful downstream (e.g., aggregations с timestamps), order matters semantically.
- Unordered for: independent enrichment, downstream stateless, throughput critical.
Most ML enrichment — unordered acceptable. Downstream usually filters / aggregates, не cares of order.
'ordered_output' = 'false' -- unordered (faster)
'ordered_output' = 'true' -- ordered (default, safer)
Backpressure dynamics
ML_PREDICT operator имеет capacity (max_concurrent_requests). Когда capacity full:
- Operator buffer fills.
- Upstream operator blocked (can’t push more rows).
- Backpressure propagates upstream.
- Source stops fetching new events.
Это natural Flink backpressure mechanism. Works OK для steady-state — pipeline стабилизируется на effective throughput = maxConcurrency / latency.
Problems:
- Latency variance: если single call hangs (provider slow), capacity blocked.
- Bursts: input spike beyond steady-state, backpressure builds, lag grows.
- Rate limits: API responds 429 -> must back off -> effective throughput drops.
Mitigations:
CREATE MODEL m WITH (
'request_timeout' = '10s', -- bound per-call latency
'max_retries' = '3',
'retry.initial_delay' = '100ms',
'retry.max_delay' = '5s',
'retry.exponential_backoff' = 'true',
'rate_limit.requests_per_second' = '50', -- explicit rate limit
'circuit_breaker.failure_threshold' = '50%', -- if 50% calls fail in window
'circuit_breaker.window' = '1m',
'circuit_breaker.open_duration' = '30s' -- pause calls for 30s if circuit open
);
Circuit breaker: если provider degrades, ML_PREDICT stops calling temporarily — let provider recover. Calls fail-fast (или fallback) instead of building backlog.
Caching стратегии
Caching critical для ML cost / latency. Different cache levels:
1. Result caching
Cache: input -> output. Same input -> cached result, no provider call.
CREATE MODEL embed_model WITH (
...,
'cache.enabled' = 'true',
'cache.ttl' = '1d',
'cache.max_entries' = '100000',
'cache.storage' = 'rocksdb', -- 'heap' for fast small cache
'cache.key' = 'hash(input)' -- default: hash of all input cols
);
Cache hit benefits:
- No API call -> no cost.
- Latency микросекунды vs hundreds milliseconds.
- No rate limit pressure.
Hit rate depends на data:
- Embedding (text repeats often): 50-90%.
- LLM prediction (unique inputs): 5-30%.
- Vector search (queries repeat): 30-70%.
Trade-offs:
- Memory / storage для cache.
- Staleness (TTL — outputs могут change если model updated).
- Determinism — same input -> same cached output (good for exactly-once).
2. Embedding cache
For text embeddings — особенно effective. Same text -> same embedding (deterministic):
CREATE MODEL embed_model WITH (
'cache.enabled' = 'true',
'cache.ttl' = '7d', -- embeddings stable
'cache.storage' = 'rocksdb' -- many entries, fit on disk
);
Heavy usage: documents в RAG ingestion get re-embedded если pipeline restarts — cache prevents re-cost.
3. Vector search cache
Cache: query embedding -> top-K results. Repeated similar queries hit cache.
'vector_search.cache.enabled' = 'true',
'vector_search.cache.ttl' = '1h'
Tricky: similar but not identical embeddings shouldn’t share cache (results may differ). Solution: cache by exact embedding vector hash. Hit rate lower but correctness maintained.
4. Negative result caching
Cache “не работает” (404, validation error). Repeat same bad input -> fail fast, не retry storm.
Rate limiting
API providers имеют rate limits — exceeding causes 429s, slowdowns.
OpenAI:
- TPM (tokens per minute): GPT-4 ~150K (default), can request increase.
- RPM (requests per minute): few thousand для GPT-4.
- TPD (tokens per day) for some models.
Anthropic:
- Similar TPM / RPM model.
Need throttle pipeline к stay under limits. ML_PREDICT supports:
'rate_limit.requests_per_second' = '50',
'rate_limit.tokens_per_minute' = '100000',
'rate_limit.algorithm' = 'token_bucket'
Strategy:
- Token bucket: requests refilled at constant rate, can burst до bucket size.
- Sliding window: count requests в last 60s window, block если over.
- Per-key rate limit: separate limit per API key (useful если multiple keys).
Distributed pipelines: rate limit per operator instance, не global. С 16 subtasks и rate_limit = 50 RPS — total 16 * 50 = 800 RPS. Need set per-instance limit accordingly.
Coordinated rate limit (cross-instance): через distributed counter (Redis, etc.). Complex, ускоряет network. Worth it только для tight limits.
Cost monitoring
ML API costs могут spike easily. Monitor:
-- Track token consumption через metrics
SELECT
model_name,
SUM(input_tokens) AS total_input_tokens,
SUM(output_tokens) AS total_output_tokens,
COUNT(*) AS total_calls,
AVG(latency_ms) AS avg_latency
FROM ml_metrics
GROUP BY model_name, TUMBLE(ts, INTERVAL '1' MINUTE);
Flink ML operators expose metrics:
flink_taskmanager_job_task_operator_ml_input_tokens_totalflink_taskmanager_job_task_operator_ml_output_tokens_totalflink_taskmanager_job_task_operator_ml_cache_hits_totalflink_taskmanager_job_task_operator_ml_cache_misses_totalflink_taskmanager_job_task_operator_ml_calls_failed_totalflink_taskmanager_job_task_operator_ml_latency_milliseconds
Alerts:
- Cost per hour spike > threshold.
- Cache hit rate drops below 50%.
- Error rate > 5%.
- Latency P99 > 5s.
Cost optimization checklist:
- Caching — biggest win, often 50%+ reduction.
- Use cheapest model that meets quality bar (Haiku vs Opus, gpt-4o-mini vs gpt-4).
- Batch where supported — OpenAI embeddings support batch (single call для multiple texts).
- Limit max_tokens — prevents long completions.
- Reduce input context — feed only relevant context, не full docs.
- Pre-filter — score with cheap model first, expensive только for select cases.
Hybrid pipelines: cheap + expensive
Common production pattern:
WITH initial_score AS (
-- Step 1: cheap local model scores all
SELECT *, ML_PREDICT(MODEL local_classifier, features) AS quick_score
FROM events
),
suspicious AS (
-- Step 2: only flagged events go to expensive LLM
SELECT *
FROM initial_score
WHERE quick_score > 0.7
),
deep_analysis AS (
-- Step 3: expensive LLM analysis on suspicious
SELECT *, ML_PREDICT(MODEL llm, transaction_description) AS llm_analysis
FROM suspicious
)
INSERT INTO alerts
SELECT * FROM deep_analysis WHERE llm_analysis.confidence > 0.8;
Economics:
- Local model: 1M events / sec, $0.
- Filter: 99% events skipped (cheap_score below 0.7).
- LLM call: 10K events / sec * 10/sec = $36K/day.
vs LLM на всё: 1M events / sec * 1000/sec = $86M/day. 2400x more.
Hybrid pattern essential для high-throughput с LLM in loop.
Side outputs для failures
ML calls fail (network, rate limit, bad input). Don’t drop events silently:
final OutputTag<RowData> failedTag = new OutputTag<>("failed-predictions") {};
DataStream<RowData> input = ...;
SingleOutputStreamOperator<RowData> predicted = input
.process(new ProcessFunction<RowData, RowData>() {
@Override
public void processElement(RowData row, Context ctx, Collector<RowData> out) {
try {
RowData enriched = predict(row);
out.collect(enriched);
} catch (Exception e) {
ctx.output(failedTag, row); // side output
}
}
});
DataStream<RowData> failed = predicted.getSideOutput(failedTag);
failed.sinkTo(deadLetterSink);
// Continue with successful predictions
predicted.sinkTo(mainSink);
Failed events accumulated в dead letter sink. Periodically inspect, retry, alert ops if rate high.
Observability dashboard
Critical metrics для production ML pipeline:
| Metric | What it tells |
|---|---|
numRecordsInPerSecond | input throughput |
numRecordsOutPerSecond | output throughput |
| Difference between in/out | dropped / failed |
ml_latency_p99 | API latency tail |
ml_calls_failed_total rate | error rate |
ml_cache_hit_rate | cost optimization indicator |
ml_tokens_total cost = tokens * price | actual $$$ |
backpressure per operator | bottleneck location |
checkpoint_duration | health (long -> ML stalls) |
Dashboard:
- Time series: throughput, latency, errors, cost.
- Histograms: latency P50/P95/P99.
- Heatmap: errors by type.
- Alerts: anomalies на любой метрике.
Failure scenarios и recovery
Common production incidents и handling:
Incident: API provider outage (503 spike).
- Symptoms: high error rate, latency spike.
- Auto: circuit breaker opens, calls fail fast, dead letter sink fills.
- Manual: page on-call, investigate provider status. If short outage, wait. If long, switch к fallback provider.
Incident: Rate limit hit (429 spike).
- Symptoms: throughput drops, 429 errors.
- Auto: rate limiter throttles, exponential backoff kicks in.
- Manual: investigate cause (input surge? key issue?). Request rate limit increase or adjust pipeline.
Incident: Latency spike (P99 multi-second).
- Symptoms: throughput drops, checkpoint duration grows.
- Auto: timeout cancels long calls, pipeline continues.
- Manual: investigate provider, consider switching, scale concurrency.
Incident: Cost spike (10x normal).
- Symptoms: cost dashboard alert.
- Investigate: input volume increase? cache misses? Retries storm?
- Action: stop pipeline if accidental loop. Adjust pipeline. Add cache. Reduce model usage.
Cost optimization checklist (detailed)
Practical playbook:
-
Cache embeddings (hit rate 50-90% typical). RocksDB cache, 7-day TTL.
-
Cache LLM responses for repeatable inputs (e.g., FAQ answers). Hit rate 10-30%.
-
Filter before predict: drop irrelevant events early. Don’t predict on noise.
-
Use smaller model: Claude Haiku 60x cheaper than Opus. GPT-4o-mini 100x cheaper than GPT-4. Often quality sufficient.
-
Batch where supported: embedding APIs support batch (1 call для 100 texts). 100x throughput.
-
Hybrid pipelines: cheap + expensive. 99% cheap, 1% expensive.
-
Truncate inputs: max input length. Don’t feed entire documents если irrelevant.
-
Limit output: max_tokens parameter. Don’t pay для unbounded generations.
-
Determinism (temperature=0, seed): consistent outputs cache better.
-
Monitor cost continuously: alert on hourly spend exceeding budget.
ML cost can spike orders of magnitude faster than infrastructure cost. Bad query can cost 0.5/hour). Treat ML calls as expensive external service: rate limit aggressively, cache always, monitor cost continuously, alert на anomalies. Cost is operational concern just as CPU/memory.
Real-world: monitoring setup
Production deployment includes:
# Prometheus rules
groups:
- name: ml_pipeline
rules:
- alert: HighMLErrorRate
expr: rate(flink_ml_calls_failed_total[5m]) > 0.05
annotations:
summary: "ML pipeline error rate > 5%"
- alert: LowCacheHitRate
expr: rate(flink_ml_cache_hits_total[10m]) / rate(flink_ml_calls_total[10m]) < 0.3
annotations:
summary: "Cache hit rate below 30% — review cache config"
- alert: HighMLCost
expr: rate(flink_ml_tokens_total[1h]) * 0.001 > 100
annotations:
summary: "ML cost > $100/hour"
- alert: HighMLLatency
expr: flink_ml_latency_p99 > 5000
annotations:
summary: "ML P99 latency > 5s"
Grafana dashboard:
- Throughput timeline.
- Latency P50/P95/P99 timeline.
- Cost per hour timeline.
- Cache hit rate gauge.
- Error breakdown by type.
- Per-model breakdown (if multiple models).
This setup catches issues early. Incident response cost: 5 min vs hours.