JDBC source и sink
Связка Flink + реляционная БД встречается в почти каждом production-pipeline. JDBC connector в Flink даёт три вещи: bounded source для batch-режима, lookup-source для enrichment-join’ов, и sink для записи результатов в БД (с upsert-семантикой по PK).
В этом уроке разберём всё: scan-режим, lookup-режим, sink append/upsert, и подводные камни, на которых ломаются jobs в production — connection pool, batch size, deadlock’и при upsert.
JDBC как scan source
В режиме scan source Flink один раз выполняет SELECT * FROM table (или указанный запрос), возвращает все строки и завершает source. Это bounded source — для batch-режима или для one-shot ingestion в стриминговый job.
CREATE TABLE customers (
customer_id BIGINT,
customer_name STRING,
segment STRING,
created_at TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://db.internal:5432/crm',
'table-name' = 'customers',
'username' = 'flink_user',
'password' = '...',
'scan.partition.column' = 'customer_id',
'scan.partition.lower-bound' = '1',
'scan.partition.upper-bound' = '10000000',
'scan.partition.num' = '8',
'scan.fetch-size' = '1000'
);
Параметры partitioning:
- scan.partition.column — колонка для разбиения (должна быть числовая, обычно ID).
- scan.partition.lower-bound / upper-bound — диапазон значений.
- scan.partition.num — на сколько partition’ов разбить. Каждый Flink subtask получит подмножество диапазона и сделает свой
SELECT ... WHERE id BETWEEN ? AND ?.
Без partitioning Flink использует один subtask — на большой таблице это узкое место.
CDC как альтернатива full-scan для PostgreSQL/MySQLScan partitioning делает несколько параллельных запросов к БД. На production-БД это создаёт нагрузку — 8 параллельных SELECT на 100M строк могут забить connection pool и I/O БД. Координируйте с DBA, делайте такие full-scan’ы в maintenance window, или экспортируйте через специализированный CDC-pipeline (Flink CDC, см. модуль 14).
JDBC как lookup source
В режиме lookup Flink не сканирует таблицу заранее. Вместо этого, при join из стрима, для каждого ключа делает point-query в БД (SELECT * WHERE pk = ?). Результаты кешируются.
CREATE TABLE customers (
customer_id BIGINT,
customer_name STRING,
segment STRING,
PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://db.internal:5432/crm',
'table-name' = 'customers',
'lookup.cache.max-rows' = '50000',
'lookup.cache.ttl' = '10 min',
'lookup.max-retries' = '3'
);
-- Использование: lookup-join из orders
SELECT o.order_id, o.amount, c.customer_name, c.segment
FROM orders AS o
JOIN customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.customer_id;
Параметры lookup:
- lookup.cache.max-rows — размер LRU-кеша на subtask.
- lookup.cache.ttl — TTL записей в кеше. После TTL — повторный query в БД.
- lookup.max-retries — сколько раз повторять при ошибке БД.
- lookup.async —
trueдля async-lookup (через Async I/O), повышает throughput на slow БД.
Если customer обновился в БД (например, изменил segment), Flink увидит обновление только после истечения TTL и повторного query. На lookup.cache.ttl=10min — устаревшие данные до 10 минут. Если нужна актуальность секунд — настройте маленький TTL или используйте Flink CDC + temporal join (см. модуль 14).
JDBC sink: append-only
В append-only режиме каждая входящая строка делает INSERT INTO table VALUES (...). Без PK в DDL Flink не знает, как делать update — только insert.
CREATE TABLE event_log (
event_id STRING,
event_type STRING,
payload STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://warehouse:5432/events',
'table-name' = 'events',
'username' = 'flink_writer',
'password' = '...',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '5 s',
'sink.max-retries' = '3'
);
INSERT INTO event_log
SELECT event_id, event_type, payload, event_time
FROM kafka_events
WHERE event_type IN ('click', 'view');
Параметры:
- sink.buffer-flush.max-rows — сколько строк буферизовать перед batch insert.
- sink.buffer-flush.interval — максимальная задержка флаша (latency budget).
- sink.max-retries — повторы при ошибке.
JDBC sink не гарантирует exactly-once. По умолчанию это at-least-once: при failure после batch insert но до checkpoint — записи будут вставлены повторно. Для exactly-once в JDBC нужно XA-транзакции (двухфазный commit на уровне БД) — поддержка ограниченная (jdbc-xa connector).
JDBC sink: upsert
Если в DDL указан PRIMARY KEY, Flink делает upsert: INSERT ... ON CONFLICT (pk) DO UPDATE SET ... (Postgres) или INSERT ... ON DUPLICATE KEY UPDATE ... (MySQL).
CREATE TABLE user_metrics (
user_id STRING,
click_count BIGINT,
last_click_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://warehouse:5432/metrics',
'table-name' = 'user_metrics',
'sink.buffer-flush.max-rows' = '500',
'sink.buffer-flush.interval' = '2 s'
);
INSERT INTO user_metrics
SELECT user_id, COUNT(*) AS clicks, MAX(event_time)
FROM clicks
GROUP BY user_id;
В upsert-режиме sink принимает retract/upsert changelog: +I и +U становятся INSERT ... ON CONFLICT DO UPDATE, -D становится DELETE FROM table WHERE pk = ?.
Upsert sink в Postgres использует INSERT ... ON CONFLICT. Это требует существующего unique constraint или PRIMARY KEY на колонке указанной в ON CONFLICT (...). Если в БД нет constraint — Postgres вернёт ошибку “no unique or exclusion constraint matching the ON CONFLICT specification”. Перед запуском job убедитесь, что схема в БД соответствует PRIMARY KEY в DDL Flink.
Архитектура JDBC sink
Stream events
Stream входящих событий. Flink subtask получает события один за другим.In-memory buffer
JDBC sink буферизует записи в memory. Не отправляет в БД сразу. Буфер ограничен sink.buffer-flush.max-rows и sink.buffer-flush.interval.Batch INSERT
При достижении max-rows или interval — sink делает batch insert (или upsert) одним SQL-statement. Connection берётся из pool.Pending events in state
JDBC sink хранит pending events в state. При checkpoint происходит forced flush — буфер опустошается, чтобы все события до checkpoint попали в БД.Forced flush
Forced flush гарантирует at-least-once: после успешного checkpoint все события до checkpoint barrier в БД.Connection pool
Flink JDBC connector использует один connection на subtask. Если parallelism=8, у вас будет 8 подключений к БД от одного Flink job. Несколько Flink jobs суммируются.
# Расчёт connections на БД
total_connections = sum(jobs) × parallelism
# Пример: 3 job по parallelism=8
total = 3 × 8 = 24 connections от Flink
# На Postgres c max_connections=100 — это 24% capacity, нормально.
# На Postgres с max_connections=20 — 24 connections не влезут, errors at startup.
В production обязательно мониторьте pg_stat_activity (или эквивалент в MySQL/MariaDB). Заранее обсуждайте с DBA: сколько Flink job будет коннектиться, какой parallelism, что произойдёт при scaling.
Многие team’ы делают эту ошибку: запускают Flink job с parallelism=64, JDBC sink -> Postgres с max_connections=100. Job стартует, занимает 64 connection, остальные сервисы (REST API, аналитики, мониторинг) не могут подключиться. Связь между Flink parallelism и БД capacity не очевидна, и DBA не всегда заранее знают про новый Flink job. Координируйтесь.
Deadlocks при upsert
Upsert sink выполняет INSERT ... ON CONFLICT параллельно из нескольких subtasks. Если две subtasks обновляют один и тот же row одновременно — Postgres использует row-level lock. При неправильном порядке locks возможны deadlock’и.
Симптомы: периодические “deadlock detected” в логах sink, retries срабатывают, job не падает, но throughput плавает.
Решения:
- Партиционирование по PK: убедитесь, что Flink использует
keyBy(pk)перед sink — тогда одну и ту же PK будет писать одна и та же subtask. - Уменьшить batch size: меньше rows на batch = меньше шанс конфликта lock’ов.
- Уменьшить parallelism sink: если sink overshadows reality — 4 параллельных upsert’а часто достаточно для большинства задач.
Поддерживаемые БД
- Postgres — first-class, нативный
INSERT ... ON CONFLICT. - MySQL / MariaDB — first-class,
INSERT ... ON DUPLICATE KEY UPDATE. - Oracle —
MERGE INTOдля upsert. - SQL Server —
MERGE INTO. - DB2 —
MERGE INTO. - SQLite — для тестов.
CrateDB, ClickHouse, Snowflake — не входят в стандартный JDBC connector; для них есть отдельные dedicated connectors. Использовать generic JDBC к ним можно, но без upsert и с худшей производительностью.
Попробуй сам
- Настрой scan-source на таблицу с 50M строк, parallelism=8. Какой будет SQL для каждой subtask?
- Сделай lookup-join из стрима orders в JDBC-таблицу products. Что произойдёт при ошибке connection к БД во время lookup?
- Реализуй upsert sink в Postgres-таблицу metrics. Что произойдёт, если в DDL указан PRIMARY KEY (id), но в БД таблица создана без
PRIMARY KEY?