Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 18 мин
Средний
JDBCConnectorSinkSourceLookupBatchingUpsert

JDBC source и sink

Связка Flink + реляционная БД встречается в почти каждом production-pipeline. JDBC connector в Flink даёт три вещи: bounded source для batch-режима, lookup-source для enrichment-join’ов, и sink для записи результатов в БД (с upsert-семантикой по PK).

В этом уроке разберём всё: scan-режим, lookup-режим, sink append/upsert, и подводные камни, на которых ломаются jobs в production — connection pool, batch size, deadlock’и при upsert.


JDBC как scan source

В режиме scan source Flink один раз выполняет SELECT * FROM table (или указанный запрос), возвращает все строки и завершает source. Это bounded source — для batch-режима или для one-shot ingestion в стриминговый job.

CREATE TABLE customers (
  customer_id BIGINT,
  customer_name STRING,
  segment STRING,
  created_at TIMESTAMP(3)
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db.internal:5432/crm',
  'table-name' = 'customers',
  'username' = 'flink_user',
  'password' = '...',
  'scan.partition.column' = 'customer_id',
  'scan.partition.lower-bound' = '1',
  'scan.partition.upper-bound' = '10000000',
  'scan.partition.num' = '8',
  'scan.fetch-size' = '1000'
);

Параметры partitioning:

  • scan.partition.column — колонка для разбиения (должна быть числовая, обычно ID).
  • scan.partition.lower-bound / upper-bound — диапазон значений.
  • scan.partition.num — на сколько partition’ов разбить. Каждый Flink subtask получит подмножество диапазона и сделает свой SELECT ... WHERE id BETWEEN ? AND ?.

Без partitioning Flink использует один subtask — на большой таблице это узкое место.

CDC как альтернатива full-scan для PostgreSQL/MySQL
WARNING

Scan partitioning делает несколько параллельных запросов к БД. На production-БД это создаёт нагрузку — 8 параллельных SELECT на 100M строк могут забить connection pool и I/O БД. Координируйте с DBA, делайте такие full-scan’ы в maintenance window, или экспортируйте через специализированный CDC-pipeline (Flink CDC, см. модуль 14).


JDBC как lookup source

В режиме lookup Flink не сканирует таблицу заранее. Вместо этого, при join из стрима, для каждого ключа делает point-query в БД (SELECT * WHERE pk = ?). Результаты кешируются.

CREATE TABLE customers (
  customer_id BIGINT,
  customer_name STRING,
  segment STRING,
  PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://db.internal:5432/crm',
  'table-name' = 'customers',
  'lookup.cache.max-rows' = '50000',
  'lookup.cache.ttl' = '10 min',
  'lookup.max-retries' = '3'
);

-- Использование: lookup-join из orders
SELECT o.order_id, o.amount, c.customer_name, c.segment
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
  ON o.customer_id = c.customer_id;

Параметры lookup:

  • lookup.cache.max-rows — размер LRU-кеша на subtask.
  • lookup.cache.ttl — TTL записей в кеше. После TTL — повторный query в БД.
  • lookup.max-retries — сколько раз повторять при ошибке БД.
  • lookup.asynctrue для async-lookup (через Async I/O), повышает throughput на slow БД.
TIP

Если customer обновился в БД (например, изменил segment), Flink увидит обновление только после истечения TTL и повторного query. На lookup.cache.ttl=10min — устаревшие данные до 10 минут. Если нужна актуальность секунд — настройте маленький TTL или используйте Flink CDC + temporal join (см. модуль 14).


JDBC sink: append-only

В append-only режиме каждая входящая строка делает INSERT INTO table VALUES (...). Без PK в DDL Flink не знает, как делать update — только insert.

CREATE TABLE event_log (
  event_id STRING,
  event_type STRING,
  payload STRING,
  event_time TIMESTAMP(3)
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://warehouse:5432/events',
  'table-name' = 'events',
  'username' = 'flink_writer',
  'password' = '...',
  'sink.buffer-flush.max-rows' = '500',
  'sink.buffer-flush.interval' = '5 s',
  'sink.max-retries' = '3'
);

INSERT INTO event_log
SELECT event_id, event_type, payload, event_time
FROM kafka_events
WHERE event_type IN ('click', 'view');

Параметры:

  • sink.buffer-flush.max-rows — сколько строк буферизовать перед batch insert.
  • sink.buffer-flush.interval — максимальная задержка флаша (latency budget).
  • sink.max-retries — повторы при ошибке.

JDBC sink не гарантирует exactly-once. По умолчанию это at-least-once: при failure после batch insert но до checkpoint — записи будут вставлены повторно. Для exactly-once в JDBC нужно XA-транзакции (двухфазный commit на уровне БД) — поддержка ограниченная (jdbc-xa connector).


JDBC sink: upsert

Если в DDL указан PRIMARY KEY, Flink делает upsert: INSERT ... ON CONFLICT (pk) DO UPDATE SET ... (Postgres) или INSERT ... ON DUPLICATE KEY UPDATE ... (MySQL).

CREATE TABLE user_metrics (
  user_id STRING,
  click_count BIGINT,
  last_click_time TIMESTAMP(3),
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://warehouse:5432/metrics',
  'table-name' = 'user_metrics',
  'sink.buffer-flush.max-rows' = '500',
  'sink.buffer-flush.interval' = '2 s'
);

INSERT INTO user_metrics
SELECT user_id, COUNT(*) AS clicks, MAX(event_time)
FROM clicks
GROUP BY user_id;

В upsert-режиме sink принимает retract/upsert changelog: +I и +U становятся INSERT ... ON CONFLICT DO UPDATE, -D становится DELETE FROM table WHERE pk = ?.

WARNING

Upsert sink в Postgres использует INSERT ... ON CONFLICT. Это требует существующего unique constraint или PRIMARY KEY на колонке указанной в ON CONFLICT (...). Если в БД нет constraint — Postgres вернёт ошибку “no unique or exclusion constraint matching the ON CONFLICT specification”. Перед запуском job убедитесь, что схема в БД соответствует PRIMARY KEY в DDL Flink.


Архитектура JDBC sink

JDBC sink: батчинг и flush

Stream events

Stream входящих событий. Flink subtask получает события один за другим.

In-memory buffer

JDBC sink буферизует записи в memory. Не отправляет в БД сразу. Буфер ограничен sink.buffer-flush.max-rows и sink.buffer-flush.interval.
flush trigger

Batch INSERT

При достижении max-rows или interval — sink делает batch insert (или upsert) одним SQL-statement. Connection берётся из pool.

Pending events in state

JDBC sink хранит pending events в state. При checkpoint происходит forced flush — буфер опустошается, чтобы все события до checkpoint попали в БД.
on checkpoint

Forced flush

Forced flush гарантирует at-least-once: после успешного checkpoint все события до checkpoint barrier в БД.

Connection pool

Flink JDBC connector использует один connection на subtask. Если parallelism=8, у вас будет 8 подключений к БД от одного Flink job. Несколько Flink jobs суммируются.

# Расчёт connections на БД
total_connections = sum(jobs) × parallelism

# Пример: 3 job по parallelism=8
total = 3 × 8 = 24 connections от Flink

# На Postgres c max_connections=100 — это 24% capacity, нормально.
# На Postgres с max_connections=20 — 24 connections не влезут, errors at startup.

В production обязательно мониторьте pg_stat_activity (или эквивалент в MySQL/MariaDB). Заранее обсуждайте с DBA: сколько Flink job будет коннектиться, какой parallelism, что произойдёт при scaling.

WARNING

Многие team’ы делают эту ошибку: запускают Flink job с parallelism=64, JDBC sink -> Postgres с max_connections=100. Job стартует, занимает 64 connection, остальные сервисы (REST API, аналитики, мониторинг) не могут подключиться. Связь между Flink parallelism и БД capacity не очевидна, и DBA не всегда заранее знают про новый Flink job. Координируйтесь.


Deadlocks при upsert

Upsert sink выполняет INSERT ... ON CONFLICT параллельно из нескольких subtasks. Если две subtasks обновляют один и тот же row одновременно — Postgres использует row-level lock. При неправильном порядке locks возможны deadlock’и.

Симптомы: периодические “deadlock detected” в логах sink, retries срабатывают, job не падает, но throughput плавает.

Решения:

  1. Партиционирование по PK: убедитесь, что Flink использует keyBy(pk) перед sink — тогда одну и ту же PK будет писать одна и та же subtask.
  2. Уменьшить batch size: меньше rows на batch = меньше шанс конфликта lock’ов.
  3. Уменьшить parallelism sink: если sink overshadows reality — 4 параллельных upsert’а часто достаточно для большинства задач.

Поддерживаемые БД

  • Postgres — first-class, нативный INSERT ... ON CONFLICT.
  • MySQL / MariaDB — first-class, INSERT ... ON DUPLICATE KEY UPDATE.
  • OracleMERGE INTO для upsert.
  • SQL ServerMERGE INTO.
  • DB2MERGE INTO.
  • SQLite — для тестов.

CrateDB, ClickHouse, Snowflake — не входят в стандартный JDBC connector; для них есть отдельные dedicated connectors. Использовать generic JDBC к ним можно, но без upsert и с худшей производительностью.


Попробуй сам

  1. Настрой scan-source на таблицу с 50M строк, parallelism=8. Какой будет SQL для каждой subtask?
  2. Сделай lookup-join из стрима orders в JDBC-таблицу products. Что произойдёт при ошибке connection к БД во время lookup?
  3. Реализуй upsert sink в Postgres-таблицу metrics. Что произойдёт, если в DDL указан PRIMARY KEY (id), но в БД таблица создана без PRIMARY KEY?
Проверка знанийKnowledge check
Команда добавила в Flink job новый JDBC sink в Postgres. После деплоя БД начала возвращать "FATAL: too many clients already". В кластере 8 Flink jobs, у каждого parallelism=16. max_connections в Postgres=128. Что случилось и как исправить?
ОтветAnswer
Подсчёт: 8 jobs × 16 parallelism = 128 connections от Flink. Это ровно max_connections — но Postgres резервирует часть connections для superuser (по умолчанию superuser_reserved_connections=3) и сервисных подключений (мониторинг, репликация). Эффективно доступно ~120 connections, и Flink-нагрузка их полностью забирает, обычные сервисы валятся. Решения: (1) увеличить max_connections в Postgres (если ресурсы позволяют — каждое connection требует ~10MB shared memory); (2) уменьшить parallelism sink — sink не обязательно должен быть с тем же parallelism что pipeline; добавить keyBy + setParallelism(4) перед sink, БД увидит 8×4=32 connection вместо 128; (3) использовать connection pooler типа PgBouncer перед Postgres — pooler принимает много client-connections и переиспользует меньшее число pgsql-сессий. PgBouncer — стандартное решение для high-parallel Flink+Postgres.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда настраивает JDBC sink с upsert семантикой (PRIMARY KEY в DDL). При попытке запустить job Postgres возвращает: 'no unique or exclusion constraint matching the ON CONFLICT specification'. В чём причина?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4