VECTOR_SEARCH и RAG pipelines в Flink
Vector search — основа modern AI applications: semantic search, RAG (Retrieval-Augmented Generation), recommendation engines, content moderation. Flink 2.2 добавил native VECTOR_SEARCH SQL function — first-class support для embedding-based retrieval в streaming.
Этот урок: что такое embeddings, как VECTOR_SEARCH работает, как строить RAG pipelines в SQL, и production-grade examples с разными vector DB backends.
Topics, partitions и offsets в KafkaEmbeddings: 30-second refresher
Embedding — представление текста (или image / audio) как vector of floats. Trained models (OpenAI text-embedding-3, sentence-transformers, etc.) переводят semantic content в high-dimensional space (typically 384, 768, 1536 dimensions).
Properties:
- Similar content -> similar vectors (close in vector space).
- Distance metrics (cosine, dot product, Euclidean) approximate semantic similarity.
- Storage: vector per item в DB или index.
- Search: для query, compute embedding, find nearest vectors.
Use cases:
- Semantic search: “find documents about climate change” -> embed query -> vector search -> return relevant docs. Better than keyword (handles synonyms, intent).
- Recommendations: user view embedded -> find similar items in space -> recommend.
- RAG: question embedded -> find relevant context docs -> LLM uses them для answer.
- Anomaly detection: events embedded -> outliers in vector space -> flagged.
- Content moderation: content embedded -> similar to known violations -> flagged.
VECTOR_SEARCH syntax
SELECT *
FROM VECTOR_SEARCH(
TABLE products, -- table containing vectors
'embedding_column', -- column with vectors
ARRAY[0.1, 0.2, 0.3, ...], -- query vector
TOP_K => 10,
DISTANCE => 'cosine'
);
Returns: top-K rows from source table, sorted by similarity to query vector. Output schema = source columns + similarity score.
Distance options:
- cosine:
1 - (a · b) / (|a| * |b|). Most common — works на normalized или unnormalized vectors. - euclidean: L2 distance
sqrt(sum((a_i - b_i)^2)). - dot_product:
a · b. Faster but requires normalized vectors для meaningful comparison.
Что под капотом зависит от source connector:
- pgvector: SQL query
SELECT * FROM products ORDER BY embedding <-> '[...]' LIMIT 10. - Pinecone: HTTP API call
POST /query. - Milvus: gRPC call.
- Weaviate: GraphQL query.
Flink VECTOR_SEARCH абстрагирует это — same SQL независимо от backend.
Sourcing embeddings: ML_EMBED
Companion function для VECTOR_SEARCH — ML_EMBED:
-- Embed text using model
SELECT
product_id,
description,
ML_EMBED(MODEL embedding_model, description) AS vec
FROM products;
ML_EMBED — scalar function, returns ARRAY<DOUBLE> (the vector).
Setup embedding model:
CREATE MODEL embedding_model
INPUT (text STRING)
OUTPUT (embedding ARRAY<DOUBLE>)
WITH (
'provider' = 'openai',
'task' = 'text-embedding-3-small', -- OpenAI's 1536-dim embedding
'api_key' = '{{ secret.openai_api_key }}'
);
Or local:
CREATE MODEL local_embed
INPUT (text STRING)
OUTPUT (embedding ARRAY<DOUBLE>)
WITH (
'provider' = 'local',
'task' = 'embedding',
'model_path' = 's3://models/all-MiniLM-L6-v2.onnx',
'runtime' = 'onnx',
'dimensions' = '384'
);
Cost considerations: OpenAI embedding ~ 30 per million). Local embedding model (e.g., MiniLM) — free, latency under 10ms, decent quality для many tasks.
Vector DB connector setup
pgvector
CREATE TABLE knowledge_base (
id BIGINT PRIMARY KEY NOT ENFORCED,
title STRING,
content STRING,
embedding ARRAY<DOUBLE>,
metadata MAP<STRING, STRING>
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres:5432/kb',
'table-name' = 'documents',
'username' = 'flink',
'password' = '{{ secret.pgvector_password }}',
'vector-column' = 'embedding' -- индикация для VECTOR_SEARCH
);
pgvector — Postgres extension. Mature, well-known SQL semantics, supports both exact и approximate (IVFFlat, HNSW) indexes. Best for moderate scale (under 100M vectors).
Pinecone
CREATE TABLE pinecone_index WITH (
'connector' = 'pinecone',
'index' = 'products',
'api_key' = '{{ secret.pinecone_api_key }}',
'environment' = 'us-east1-aws',
'namespace' = 'production'
);
Pinecone — managed vector DB service. Scales к billions of vectors. API-first, no SQL semantics.
Milvus
CREATE TABLE milvus_collection WITH (
'connector' = 'milvus',
'uri' = 'milvus://milvus:19530',
'collection' = 'products',
'partition' = 'production'
);
Milvus — open-source, self-hosted vector DB. Various index types (HNSW, IVF, ANNoy). Scales horizontally.
Choosing
| DB | Pros | Cons |
|---|---|---|
| pgvector | Familiar SQL, low ops, joins | under 100M vectors practical |
| Pinecone | Managed, scales к billions | $$$, vendor lock |
| Milvus | Self-hosted, OSS, fast | Ops overhead |
| Weaviate | Hybrid (vector + graph) | Less mature |
| Qdrant | Rust-based, fast | Smaller ecosystem |
For most usecases — pgvector if scale modest (most apps), Pinecone if want managed, Milvus if want OSS at scale.
RAG pipeline: complete example
Build streaming chatbot. User questions arrive in Kafka, RAG answers them с knowledge base.
-- 1. Source: user questions
CREATE TABLE user_questions (
user_id BIGINT,
question STRING,
session_id STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-questions',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
);
-- 2. Knowledge base (pgvector)
CREATE TABLE knowledge_base (
doc_id STRING,
title STRING,
content STRING,
embedding ARRAY<DOUBLE>
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://pg:5432/kb',
'table-name' = 'documents',
'vector-column' = 'embedding'
);
-- 3. Models
CREATE MODEL embed_model WITH (
'provider' = 'openai',
'task' = 'text-embedding-3-small'
);
CREATE MODEL llm WITH (
'provider' = 'anthropic',
'task' = 'messages',
'model_name' = 'claude-3-haiku-20240307', -- fast, cheap
'max_tokens' = '500',
'temperature' = '0'
);
-- 4. Output: answers
CREATE TABLE answers (
user_id BIGINT,
session_id STRING,
question STRING,
answer STRING,
source_doc_ids ARRAY<STRING>,
generated_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'answers'
);
-- 5. RAG pipeline
INSERT INTO answers
WITH question_embeddings AS (
-- Step 1: embed question
SELECT
user_id,
session_id,
question,
event_time,
ML_EMBED(MODEL embed_model, question) AS query_vec
FROM user_questions
),
retrieved_context AS (
-- Step 2: vector search top-3 documents для каждого question
SELECT
qe.*,
ARRAY_AGG(doc.doc_id) AS source_doc_ids,
STRING_AGG(doc.content, '\n---\n') AS context
FROM question_embeddings qe
CROSS JOIN LATERAL (
SELECT doc_id, content
FROM VECTOR_SEARCH(
TABLE knowledge_base,
'embedding',
qe.query_vec,
TOP_K => 3,
DISTANCE => 'cosine'
)
) doc
GROUP BY qe.user_id, qe.session_id, qe.question, qe.event_time, qe.query_vec
)
SELECT
user_id,
session_id,
question,
ML_PREDICT(
MODEL llm,
'You are a helpful assistant. Use the following context to answer the user question. ' ||
'If the context is not relevant, say "I do not know." ' ||
'Do not make up information.\n\n' ||
'Context:\n' || context || '\n\n' ||
'Question: ' || question
) AS answer,
source_doc_ids,
CURRENT_TIMESTAMP AS generated_at
FROM retrieved_context;
End-to-end RAG в одном INSERT INTO. Streaming — каждый question автоматически processed.
Latency breakdown:
- Kafka -> Flink: ~10ms.
- ML_EMBED (OpenAI embedding): ~80-100ms.
- VECTOR_SEARCH (pgvector ANN): ~50-100ms.
- ML_PREDICT (Claude Haiku): ~500-800ms.
- Total: ~700-1100ms.
Acceptable для chatbot UX. Throughput depends на concurrency, rate limits.
Ingesting knowledge base в streaming
Knowledge base needs continuously updating с new docs:
-- Document source (newly published articles)
CREATE TABLE new_documents (
doc_id STRING,
title STRING,
content STRING,
published_at TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'new-docs',
'format' = 'json'
);
-- Embed + insert into knowledge base
INSERT INTO knowledge_base
SELECT
doc_id,
title,
content,
ML_EMBED(MODEL embed_model, content) AS embedding
FROM new_documents;
Streaming ingestion: new doc arrives -> embedded -> stored. Available immediately для queries. Continuous knowledge base update.
Cost consideration: embedding caches помогают если documents arrive с duplicate content (e.g., reposts).
Hybrid: vector + keyword search
Pure vector search misses cases где exact keywords matter (product names, codes, technical terms). Hybrid combines:
WITH semantic_results AS (
SELECT doc_id, score AS semantic_score
FROM VECTOR_SEARCH(
TABLE knowledge_base,
'embedding',
ML_EMBED(MODEL embed_model, 'IBM ThinkPad X1 keyboard repair'),
TOP_K => 20
)
),
keyword_results AS (
SELECT doc_id, 1.0 / RANK() OVER (ORDER BY ts_rank(...)) AS keyword_score
FROM knowledge_base
WHERE content LIKE '%X1%'
OR title LIKE '%ThinkPad%'
LIMIT 20
)
SELECT
COALESCE(s.doc_id, k.doc_id) AS doc_id,
COALESCE(s.semantic_score, 0) * 0.7 + COALESCE(k.keyword_score, 0) * 0.3 AS final_score
FROM semantic_results s
FULL OUTER JOIN keyword_results k ON s.doc_id = k.doc_id
ORDER BY final_score DESC
LIMIT 10;
Linear combination of semantic и keyword scores. Tunable weights (70/30 typical). Reciprocal rank fusion alternative.
Streaming updates к vector DB
Pattern для CDC: source events change underlying data. Vector index должен update.
-- CDC source: product changes
CREATE TABLE product_changes (
product_id BIGINT,
description STRING,
op STRING, -- 'I', 'U', 'D'
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'product-cdc',
'format' = 'debezium-json'
);
-- Update vector DB
INSERT INTO product_embeddings
SELECT
product_id,
description,
ML_EMBED(MODEL embed_model, description) AS embedding,
ts
FROM product_changes
WHERE op IN ('I', 'U');
Delete handling complex — depends на connector. pgvector — straightforward (DELETE WHERE). Pinecone — explicit delete-by-id call.
Performance: ANN vs exact search
Vector DB used approximate nearest neighbor (ANN) для performance. Trade accuracy для speed.
Exact search: scan all vectors, compute distance, sort. O(N) per query. For 10M vectors с 1536 dims — seconds per query.
ANN: use index (HNSW, IVF, LSH). O(log N) или O(sqrt N) per query. For 10M vectors — milliseconds. Trade: ~95-99% recall (some near-neighbors missed).
Configure recall vs latency:
-- pgvector с HNSW index
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Higher m / ef_construction = better recall, slower build
-- Higher ef_search at query time = better recall, slower query
HNSW (Hierarchical Navigable Small World) — most popular ANN algorithm. Balance recall vs latency через hyperparameters.
VECTOR_SEARCH function defaults к ANN (если index exists). Override для exact:
SELECT * FROM VECTOR_SEARCH(
TABLE documents,
'embedding',
query_vec,
TOP_K => 10,
EXACT => true -- force exact search (slow для large datasets)
);
Caching embeddings
Embedding API calls expensive. Cache по input text:
CREATE MODEL embed_model WITH (
'provider' = 'openai',
'task' = 'text-embedding-3-small',
'cache.enabled' = 'true',
'cache.ttl' = '1 d',
'cache.max_entries' = '100000',
'cache.storage' = 'rocksdb'
);
Cache stores: input text hash -> embedding vector. Hit -> return cached, no API call. TTL controls staleness (для evolving content).
Typical hit rate: 30-70% depending на data (questions repeat? documents reused?). High hit rate = big cost savings.
RAG quality engineering
Production RAG не just SQL — needs quality work:
-
Chunk size: documents split в chunks (sentences, paragraphs). Small chunks — precise retrieval, less context. Large — more context, less precision. Typical 256-512 tokens.
-
Reranking: top-k from vector search may be suboptimal. Apply reranker model (cross-encoder) на top-k для better ordering.
-
Query rewriting: user query may be vague. LLM rewrites: “tell me about climate change” -> “climate change causes, effects, and mitigation strategies”.
-
Context optimization: feed только most relevant snippets, not full docs. Reduces LLM cost и improves answer quality.
-
Fallback: if no good context retrieved (low max similarity), don’t hallucinate — say “I don’t know”.
-
Evaluation: build golden dataset, evaluate answers against. Iterate.
-- Reranking example
WITH retrieved AS (
SELECT * FROM VECTOR_SEARCH(TABLE kb, 'embedding', query_vec, TOP_K => 20)
),
reranked AS (
SELECT *, ML_PREDICT(MODEL reranker, query_text, content) AS rerank_score
FROM retrieved
)
SELECT * FROM reranked
ORDER BY rerank_score DESC
LIMIT 5;
Real-time use case: customer service
Common production RAG: customer service automation.
-- User chat events
CREATE TABLE chat_messages WITH (
'connector' = 'kafka',
'topic' = 'chat',
...
);
-- Knowledge base of FAQ, product docs, past resolutions
CREATE TABLE kb WITH (
'connector' = 'pgvector',
...
);
-- RAG pipeline
INSERT INTO chat_responses
WITH context AS (
SELECT
m.user_id,
m.session_id,
m.message,
-- Find similar past resolutions
ARRAY_AGG(kb.resolution) AS past_resolutions
FROM chat_messages m
CROSS JOIN LATERAL (
SELECT * FROM VECTOR_SEARCH(
TABLE kb,
'embedding',
ML_EMBED(MODEL embed_model, m.message),
TOP_K => 5
)
) kb
GROUP BY m.user_id, m.session_id, m.message
)
SELECT
user_id,
session_id,
message,
ML_PREDICT(
MODEL llm,
'You are a customer service agent. Past resolutions for similar issues: ' ||
ARRAY_TO_STRING(past_resolutions, '\n') ||
'\nCustomer message: ' || message ||
'\nProvide a helpful response.'
) AS response
FROM context;
Auto-respond to chat messages based на history of resolutions. Human agents see suggested responses, can edit/approve.
RAG quality is product of retrieval quality + LLM quality. Bad retrieval -> LLM hallucinates from irrelevant context. Always evaluate retrieval quality (precision @ K, recall @ K) separately from LLM quality. Vector search не silver bullet — chunk strategy, query rewriting, reranking matter a lot. Plan для quality iteration, not just throughput.