Learning Platform
Глоссарий Troubleshooting
Урок 17.03 · 25 мин
Продвинутый
VECTOR_SEARCHRAGEmbeddingVector DBpgvectorPineconeMilvusStreaming retrieval

VECTOR_SEARCH и RAG pipelines в Flink

Vector search — основа modern AI applications: semantic search, RAG (Retrieval-Augmented Generation), recommendation engines, content moderation. Flink 2.2 добавил native VECTOR_SEARCH SQL function — first-class support для embedding-based retrieval в streaming.

Этот урок: что такое embeddings, как VECTOR_SEARCH работает, как строить RAG pipelines в SQL, и production-grade examples с разными vector DB backends.

Topics, partitions и offsets в Kafka

Embeddings: 30-second refresher

Embedding — представление текста (или image / audio) как vector of floats. Trained models (OpenAI text-embedding-3, sentence-transformers, etc.) переводят semantic content в high-dimensional space (typically 384, 768, 1536 dimensions).

Properties:

  • Similar content -> similar vectors (close in vector space).
  • Distance metrics (cosine, dot product, Euclidean) approximate semantic similarity.
  • Storage: vector per item в DB или index.
  • Search: для query, compute embedding, find nearest vectors.

Use cases:

  • Semantic search: “find documents about climate change” -> embed query -> vector search -> return relevant docs. Better than keyword (handles synonyms, intent).
  • Recommendations: user view embedded -> find similar items in space -> recommend.
  • RAG: question embedded -> find relevant context docs -> LLM uses them для answer.
  • Anomaly detection: events embedded -> outliers in vector space -> flagged.
  • Content moderation: content embedded -> similar to known violations -> flagged.

VECTOR_SEARCH syntax

SELECT *
FROM VECTOR_SEARCH(
  TABLE products,            -- table containing vectors
  'embedding_column',        -- column with vectors
  ARRAY[0.1, 0.2, 0.3, ...], -- query vector
  TOP_K => 10,
  DISTANCE => 'cosine'
);

Returns: top-K rows from source table, sorted by similarity to query vector. Output schema = source columns + similarity score.

Distance options:

  • cosine: 1 - (a · b) / (|a| * |b|). Most common — works на normalized или unnormalized vectors.
  • euclidean: L2 distance sqrt(sum((a_i - b_i)^2)).
  • dot_product: a · b. Faster but requires normalized vectors для meaningful comparison.

Что под капотом зависит от source connector:

  • pgvector: SQL query SELECT * FROM products ORDER BY embedding <-> '[...]' LIMIT 10.
  • Pinecone: HTTP API call POST /query.
  • Milvus: gRPC call.
  • Weaviate: GraphQL query.

Flink VECTOR_SEARCH абстрагирует это — same SQL независимо от backend.


Sourcing embeddings: ML_EMBED

Companion function для VECTOR_SEARCH — ML_EMBED:

-- Embed text using model
SELECT
  product_id,
  description,
  ML_EMBED(MODEL embedding_model, description) AS vec
FROM products;

ML_EMBED — scalar function, returns ARRAY<DOUBLE> (the vector).

Setup embedding model:

CREATE MODEL embedding_model
INPUT (text STRING)
OUTPUT (embedding ARRAY<DOUBLE>)
WITH (
  'provider' = 'openai',
  'task' = 'text-embedding-3-small',  -- OpenAI's 1536-dim embedding
  'api_key' = '{{ secret.openai_api_key }}'
);

Or local:

CREATE MODEL local_embed
INPUT (text STRING)
OUTPUT (embedding ARRAY<DOUBLE>)
WITH (
  'provider' = 'local',
  'task' = 'embedding',
  'model_path' = 's3://models/all-MiniLM-L6-v2.onnx',
  'runtime' = 'onnx',
  'dimensions' = '384'
);

Cost considerations: OpenAI embedding ~ 0.02permilliontokens.CheaprelativelyкLLM(0.02 per million tokens. Cheap relatively к LLM (30 per million). Local embedding model (e.g., MiniLM) — free, latency under 10ms, decent quality для many tasks.


Vector DB connector setup

pgvector

CREATE TABLE knowledge_base (
  id BIGINT PRIMARY KEY NOT ENFORCED,
  title STRING,
  content STRING,
  embedding ARRAY<DOUBLE>,
  metadata MAP<STRING, STRING>
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://postgres:5432/kb',
  'table-name' = 'documents',
  'username' = 'flink',
  'password' = '{{ secret.pgvector_password }}',
  'vector-column' = 'embedding'  -- индикация для VECTOR_SEARCH
);

pgvector — Postgres extension. Mature, well-known SQL semantics, supports both exact и approximate (IVFFlat, HNSW) indexes. Best for moderate scale (under 100M vectors).

Pinecone

CREATE TABLE pinecone_index WITH (
  'connector' = 'pinecone',
  'index' = 'products',
  'api_key' = '{{ secret.pinecone_api_key }}',
  'environment' = 'us-east1-aws',
  'namespace' = 'production'
);

Pinecone — managed vector DB service. Scales к billions of vectors. API-first, no SQL semantics.

Milvus

CREATE TABLE milvus_collection WITH (
  'connector' = 'milvus',
  'uri' = 'milvus://milvus:19530',
  'collection' = 'products',
  'partition' = 'production'
);

Milvus — open-source, self-hosted vector DB. Various index types (HNSW, IVF, ANNoy). Scales horizontally.

Choosing

DBProsCons
pgvectorFamiliar SQL, low ops, joinsunder 100M vectors practical
PineconeManaged, scales к billions$$$, vendor lock
MilvusSelf-hosted, OSS, fastOps overhead
WeaviateHybrid (vector + graph)Less mature
QdrantRust-based, fastSmaller ecosystem

For most usecases — pgvector if scale modest (most apps), Pinecone if want managed, Milvus if want OSS at scale.

RAG pipeline flow в Flink SQL
Kafka questionsKafka source: stream of user questions
EmbedML_EMBED: convert question text to vector. Local model (sentence-transformers) или API (OpenAI). Cached embeddings — repeat questions hit cache, no API call
query_vec
Vector SearchVECTOR_SEARCH: top-K nearest documents в knowledge base. ANN search (HNSW) — milliseconds для millions of vectors. pgvector / Pinecone / Milvus connector-specific
Knowledge baseKnowledge base: documents с pre-computed embeddings. CDC-updated continuously, или batch-loaded. Chunk size 256-512 tokens оптимально
top-K docs
LLM (RAG)ML_PREDICT с LLM: input = system_prompt + retrieved_context + question. LLM (Claude Haiku, GPT-4o-mini) generates answer. Latency 500ms-1.5s
AnswerOutput: answer + source doc IDs. Sink в Kafka topic или DB. End-to-end latency 700-1500ms typical

RAG pipeline: complete example

Build streaming chatbot. User questions arrive in Kafka, RAG answers them с knowledge base.

-- 1. Source: user questions
CREATE TABLE user_questions (
  user_id BIGINT,
  question STRING,
  session_id STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'user-questions',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json'
);

-- 2. Knowledge base (pgvector)
CREATE TABLE knowledge_base (
  doc_id STRING,
  title STRING,
  content STRING,
  embedding ARRAY<DOUBLE>
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://pg:5432/kb',
  'table-name' = 'documents',
  'vector-column' = 'embedding'
);

-- 3. Models
CREATE MODEL embed_model WITH (
  'provider' = 'openai',
  'task' = 'text-embedding-3-small'
);

CREATE MODEL llm WITH (
  'provider' = 'anthropic',
  'task' = 'messages',
  'model_name' = 'claude-3-haiku-20240307',  -- fast, cheap
  'max_tokens' = '500',
  'temperature' = '0'
);

-- 4. Output: answers
CREATE TABLE answers (
  user_id BIGINT,
  session_id STRING,
  question STRING,
  answer STRING,
  source_doc_ids ARRAY<STRING>,
  generated_at TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'answers'
);

-- 5. RAG pipeline
INSERT INTO answers
WITH question_embeddings AS (
  -- Step 1: embed question
  SELECT
    user_id,
    session_id,
    question,
    event_time,
    ML_EMBED(MODEL embed_model, question) AS query_vec
  FROM user_questions
),
retrieved_context AS (
  -- Step 2: vector search top-3 documents для каждого question
  SELECT
    qe.*,
    ARRAY_AGG(doc.doc_id) AS source_doc_ids,
    STRING_AGG(doc.content, '\n---\n') AS context
  FROM question_embeddings qe
  CROSS JOIN LATERAL (
    SELECT doc_id, content
    FROM VECTOR_SEARCH(
      TABLE knowledge_base,
      'embedding',
      qe.query_vec,
      TOP_K => 3,
      DISTANCE => 'cosine'
    )
  ) doc
  GROUP BY qe.user_id, qe.session_id, qe.question, qe.event_time, qe.query_vec
)
SELECT
  user_id,
  session_id,
  question,
  ML_PREDICT(
    MODEL llm,
    'You are a helpful assistant. Use the following context to answer the user question. ' ||
    'If the context is not relevant, say "I do not know." ' ||
    'Do not make up information.\n\n' ||
    'Context:\n' || context || '\n\n' ||
    'Question: ' || question
  ) AS answer,
  source_doc_ids,
  CURRENT_TIMESTAMP AS generated_at
FROM retrieved_context;

End-to-end RAG в одном INSERT INTO. Streaming — каждый question автоматически processed.

Latency breakdown:

  • Kafka -> Flink: ~10ms.
  • ML_EMBED (OpenAI embedding): ~80-100ms.
  • VECTOR_SEARCH (pgvector ANN): ~50-100ms.
  • ML_PREDICT (Claude Haiku): ~500-800ms.
  • Total: ~700-1100ms.

Acceptable для chatbot UX. Throughput depends на concurrency, rate limits.


Ingesting knowledge base в streaming

Knowledge base needs continuously updating с new docs:

-- Document source (newly published articles)
CREATE TABLE new_documents (
  doc_id STRING,
  title STRING,
  content STRING,
  published_at TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'new-docs',
  'format' = 'json'
);

-- Embed + insert into knowledge base
INSERT INTO knowledge_base
SELECT
  doc_id,
  title,
  content,
  ML_EMBED(MODEL embed_model, content) AS embedding
FROM new_documents;

Streaming ingestion: new doc arrives -> embedded -> stored. Available immediately для queries. Continuous knowledge base update.

Cost consideration: embedding caches помогают если documents arrive с duplicate content (e.g., reposts).


Pure vector search misses cases где exact keywords matter (product names, codes, technical terms). Hybrid combines:

WITH semantic_results AS (
  SELECT doc_id, score AS semantic_score
  FROM VECTOR_SEARCH(
    TABLE knowledge_base,
    'embedding',
    ML_EMBED(MODEL embed_model, 'IBM ThinkPad X1 keyboard repair'),
    TOP_K => 20
  )
),
keyword_results AS (
  SELECT doc_id, 1.0 / RANK() OVER (ORDER BY ts_rank(...)) AS keyword_score
  FROM knowledge_base
  WHERE content LIKE '%X1%'
    OR title LIKE '%ThinkPad%'
  LIMIT 20
)
SELECT
  COALESCE(s.doc_id, k.doc_id) AS doc_id,
  COALESCE(s.semantic_score, 0) * 0.7 + COALESCE(k.keyword_score, 0) * 0.3 AS final_score
FROM semantic_results s
FULL OUTER JOIN keyword_results k ON s.doc_id = k.doc_id
ORDER BY final_score DESC
LIMIT 10;

Linear combination of semantic и keyword scores. Tunable weights (70/30 typical). Reciprocal rank fusion alternative.


Streaming updates к vector DB

Pattern для CDC: source events change underlying data. Vector index должен update.

-- CDC source: product changes
CREATE TABLE product_changes (
  product_id BIGINT,
  description STRING,
  op STRING,  -- 'I', 'U', 'D'
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'product-cdc',
  'format' = 'debezium-json'
);

-- Update vector DB
INSERT INTO product_embeddings
SELECT
  product_id,
  description,
  ML_EMBED(MODEL embed_model, description) AS embedding,
  ts
FROM product_changes
WHERE op IN ('I', 'U');

Delete handling complex — depends на connector. pgvector — straightforward (DELETE WHERE). Pinecone — explicit delete-by-id call.


Vector DB used approximate nearest neighbor (ANN) для performance. Trade accuracy для speed.

Exact search: scan all vectors, compute distance, sort. O(N) per query. For 10M vectors с 1536 dims — seconds per query.

ANN: use index (HNSW, IVF, LSH). O(log N) или O(sqrt N) per query. For 10M vectors — milliseconds. Trade: ~95-99% recall (some near-neighbors missed).

Configure recall vs latency:

-- pgvector с HNSW index
CREATE INDEX ON documents USING hnsw (embedding vector_cosine_ops)
  WITH (m = 16, ef_construction = 64);

-- Higher m / ef_construction = better recall, slower build
-- Higher ef_search at query time = better recall, slower query

HNSW (Hierarchical Navigable Small World) — most popular ANN algorithm. Balance recall vs latency через hyperparameters.

VECTOR_SEARCH function defaults к ANN (если index exists). Override для exact:

SELECT * FROM VECTOR_SEARCH(
  TABLE documents,
  'embedding',
  query_vec,
  TOP_K => 10,
  EXACT => true  -- force exact search (slow для large datasets)
);

Caching embeddings

Embedding API calls expensive. Cache по input text:

CREATE MODEL embed_model WITH (
  'provider' = 'openai',
  'task' = 'text-embedding-3-small',
  'cache.enabled' = 'true',
  'cache.ttl' = '1 d',
  'cache.max_entries' = '100000',
  'cache.storage' = 'rocksdb'
);

Cache stores: input text hash -> embedding vector. Hit -> return cached, no API call. TTL controls staleness (для evolving content).

Typical hit rate: 30-70% depending на data (questions repeat? documents reused?). High hit rate = big cost savings.


RAG quality engineering

Production RAG не just SQL — needs quality work:

  1. Chunk size: documents split в chunks (sentences, paragraphs). Small chunks — precise retrieval, less context. Large — more context, less precision. Typical 256-512 tokens.

  2. Reranking: top-k from vector search may be suboptimal. Apply reranker model (cross-encoder) на top-k для better ordering.

  3. Query rewriting: user query may be vague. LLM rewrites: “tell me about climate change” -> “climate change causes, effects, and mitigation strategies”.

  4. Context optimization: feed только most relevant snippets, not full docs. Reduces LLM cost и improves answer quality.

  5. Fallback: if no good context retrieved (low max similarity), don’t hallucinate — say “I don’t know”.

  6. Evaluation: build golden dataset, evaluate answers against. Iterate.

-- Reranking example
WITH retrieved AS (
  SELECT * FROM VECTOR_SEARCH(TABLE kb, 'embedding', query_vec, TOP_K => 20)
),
reranked AS (
  SELECT *, ML_PREDICT(MODEL reranker, query_text, content) AS rerank_score
  FROM retrieved
)
SELECT * FROM reranked
ORDER BY rerank_score DESC
LIMIT 5;

Real-time use case: customer service

Common production RAG: customer service automation.

-- User chat events
CREATE TABLE chat_messages WITH (
  'connector' = 'kafka',
  'topic' = 'chat',
  ...
);

-- Knowledge base of FAQ, product docs, past resolutions
CREATE TABLE kb WITH (
  'connector' = 'pgvector',
  ...
);

-- RAG pipeline
INSERT INTO chat_responses
WITH context AS (
  SELECT
    m.user_id,
    m.session_id,
    m.message,
    -- Find similar past resolutions
    ARRAY_AGG(kb.resolution) AS past_resolutions
  FROM chat_messages m
  CROSS JOIN LATERAL (
    SELECT * FROM VECTOR_SEARCH(
      TABLE kb,
      'embedding',
      ML_EMBED(MODEL embed_model, m.message),
      TOP_K => 5
    )
  ) kb
  GROUP BY m.user_id, m.session_id, m.message
)
SELECT
  user_id,
  session_id,
  message,
  ML_PREDICT(
    MODEL llm,
    'You are a customer service agent. Past resolutions for similar issues: ' ||
    ARRAY_TO_STRING(past_resolutions, '\n') ||
    '\nCustomer message: ' || message ||
    '\nProvide a helpful response.'
  ) AS response
FROM context;

Auto-respond to chat messages based на history of resolutions. Human agents see suggested responses, can edit/approve.

WARNING

RAG quality is product of retrieval quality + LLM quality. Bad retrieval -> LLM hallucinates from irrelevant context. Always evaluate retrieval quality (precision @ K, recall @ K) separately from LLM quality. Vector search не silver bullet — chunk strategy, query rewriting, reranking matter a lot. Plan для quality iteration, not just throughput.


Проверка знанийKnowledge check
Production RAG chatbot: pgvector knowledge base 1M docs, embed_model = OpenAI text-embedding-3-small, llm = Claude Haiku, pipeline в Flink SQL. Users complain answers irrelevant. Investigation: vector search returns wrong docs для some queries. Diagnose и fix?
ОтветAnswer
Investigation approach: (1) Sample failing queries. Manually inspect: vector search returns docs близкие embedding-wise но semantically irrelevant. (2) Common causes для bad RAG retrieval: (a) Chunk size: if docs chunked 2000-token chunks, embeddings represent broad topics, не specific facts. Query about specific fact -> near matches но fact not actually in returned chunk. Fix: smaller chunks (256-512 tokens) с overlap (50 tokens). (b) Single-vector representation insufficient. Document about 'history of computing' embedded as one vector -> query 'who invented Linux' may not match well. Fix: hierarchical chunking — chunk-level embeddings + doc-level embeddings, multi-stage retrieval. (c) Query-doc mismatch: queries phrased differently (questions: 'How do I X?') than docs (statements: 'X is done by...'). Vector space distance can be large despite semantic similarity. Fix: query rewriting LLM step — convert query to declarative form before embed. (d) Domain shift: embedding model trained на general internet text, but knowledge base technical domain (medical, legal). Embeddings generic, not domain-aware. Fix: use domain-specific embedding model (e.g., BioBERT для medical) или fine-tune. (e) No keyword fallback: pure vector search misses exact-match queries (product names, codes). Fix: hybrid (vector + keyword) с weighted combination. (3) Implementation order: smaller chunks + overlap (easiest, biggest impact), hybrid search (next), reranking (затем), query rewriting (затем), domain-specific embeddings (most effort). (4) Evaluation: golden dataset of (query, expected_doc_ids), measure recall@5, precision@5. Iterate с metrics. (5) Monitor production: log retrieved docs alongside answers, sample for manual review, build feedback loop. Без quality engineering RAG в production stalls at 'demo quality'.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. VECTOR_SEARCH SQL function: что делает и что под капотом?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4