TableEnvironment и Catalog
Перед тем как написать первый SELECT, Flink надо сообщить две вещи: где живут данные (источники, sinks) и как они описываются (схемы, формат). Эту задачу решает связка TableEnvironment + Catalog. TableEnvironment — это рантайм Table API/SQL внутри Flink-приложения. Catalog — это место, где хранятся метаданные таблиц.
В этом уроке разберём StreamTableEnvironment, как создавать таблицы через DDL, и какие виды каталогов бывают — от эфемерного in-memory до полноценного Hive Metastore с многопроцессовой шарингом метаданных.
StreamTableEnvironment
Каждое Flink-приложение, использующее Table API/SQL, начинается с создания TableEnvironment. Для streaming-задач (что в этом курсе и есть наш дефолт) используется StreamTableEnvironment:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// DDL — создаём source table
tEnv.executeSql("""
CREATE TABLE clicks (
user_id STRING,
url STRING,
ts TIMESTAMP_LTZ(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-clicks-consumer',
'scan.startup.mode' = 'group-offsets',
'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601'
)
""");
StreamTableEnvironment шарит underlying StreamExecutionEnvironment — все настройки parallelism, checkpointing, state backend применяются к обоим API. Это важно: вы можете в одном приложении свободно миксовать DataStream и Table API, и checkpoints, savepoints, restart strategy будут общими.
Для pure-Table приложений без DataStream-кода также существует TableEnvironment (без префикса Stream). Он создаётся без StreamExecutionEnvironment и не позволяет конвертацию в/из DataStream. Используйте TableEnvironment для чистых SQL-only job; StreamTableEnvironment — когда нужен mix с DataStream API.
Catalog API
Catalog — это namespace для метаданных Flink-таблиц. Иерархия: catalog.database.table.
Hive Metastore Catalog в Spark: тот же metastore что и в Flink По умолчанию Flink создаёт один каталог default_catalog с одной базой default_database. Все DDL CREATE TABLE без явного USE CATALOG ложатся туда.
Зачем нужны каталоги:
- Изоляция окружений — отдельный каталог под dev, staging, prod.
- Шаринг метаданных между job’ами — если несколько Flink-приложений или ad-hoc SQL CLI должны видеть одну и ту же таблицу, эта таблица хранится в shared-каталоге (Hive Metastore, Iceberg REST).
- Интеграция с lakehouse — Iceberg Catalog, Paimon Catalog позволяют автоматически открывать все таблицы стандартного DWH-озера.
TableEnvironment
TableEnvironment может одновременно работать с несколькими каталогами. Один из них активный — операции без префикса идут туда.Переключение между каталогами:
SHOW CATALOGS;
USE CATALOG hive_catalog;
SHOW DATABASES;
USE prod;
SHOW TABLES;
SELECT * FROM clickstream_events LIMIT 10;
Каталог можно задать в DDL и через программный API:
import org.apache.flink.table.catalog.hive.HiveCatalog;
HiveCatalog hive = new HiveCatalog(
"hive_catalog",
"default",
"/opt/hive-conf"
);
tEnv.registerCatalog("hive_catalog", hive);
tEnv.useCatalog("hive_catalog");
DDL для Kafka source
DDL создаёт catalog-entry, описывающий таблицу. Запрос данных при SELECT инстанциирует connector с параметрами из WITH-блока.
CREATE TABLE orders (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10, 2),
ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-orders',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro-confluent',
'avro-confluent.schema-registry.url' = 'http://schema-registry:8081'
);
Что здесь происходит:
- Схема колонок — типы данных + computed columns.
METADATA FROM 'timestamp'извлекает Kafka-метаданные (timestamp записи). - WATERMARK — декларация event-time с допуском (5 секунд out-of-orderness здесь). Подробнее watermarks в модуле 06.
- WITH-параметры — connector-specific.
kafkaконнектор знает свои опции:topic,properties.*,scan.startup.mode,format. - Format — как декодировать payload. JSON, Avro, Protobuf, CSV, raw.
avro-confluentиспользует Confluent Schema Registry.
DDL — это declaration, не execution. CREATE TABLE не подключается к Kafka и не валидирует, что топик существует. Connector создаётся при первом SELECT или INSERT. Ошибка типа “topic does not exist” появится при запуске job, а не при DDL.
In-Memory Catalog
Дефолтный каталог Flink — GenericInMemoryCatalog. Метаданные хранятся в JVM heap. Преимущества: ноль зависимостей, мгновенный старт. Недостатки: метаданные живут только в одном job, после рестарта job DDL придётся выполнить заново.
Когда использовать:
- Локальная разработка и unit-тесты.
- Embedded-job с фиксированным набором таблиц.
- Простые batch-задачи где DDL — часть скрипта.
Когда не использовать:
- SQL CLI для ad-hoc-аналитики (теряете state после reconnect).
- Production-job, переживающий рестарт со сложной схемой (DDL надо где-то хранить версионно).
Hive Catalog
HiveCatalog — это адаптер к Hive Metastore. Hive Metastore — это сервис, который хранит метаданные таблиц в внешней БД (обычно MySQL или Postgres). HiveCatalog даёт persistent-хранилище метаданных, plus совместимость с Spark, Trino, Presto, Hive — все они могут читать таблицы из того же metastore.
HiveCatalog hive = new HiveCatalog(
"hive_catalog", // имя в Flink
"default", // database по умолчанию
"/opt/hive-conf", // путь к hive-site.xml
"3.1.3" // версия Hive
);
tEnv.registerCatalog("hive_catalog", hive);
tEnv.useCatalog("hive_catalog");
// Теперь DDL ложится в Hive Metastore
tEnv.executeSql("""
CREATE TABLE pageviews (
url STRING,
view_count BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
...
)
""");
После создания таблица сохранена в metastore и доступна:
- Другому Flink-job в том же кластере.
- Spark-job через
spark.sql("SELECT * FROM pageviews")(Spark должен знать формат — Hive-managed таблицы Spark читает напрямую, Kafka-таблицы — нет, тут уже разные коннекторы). - Trino через тот же metastore.
Не все Flink-таблицы переносимы через Hive Metastore в другие движки. Hive-managed таблицы (Parquet/ORC файлы в HDFS/S3) — да, читаются всеми. Kafka, JDBC, Datagen и прочие Flink-only connector-таблицы хранятся в metastore как метаданные, но другой движок (Spark/Trino) их не знает. Для shared lakehouse-таблиц используйте Iceberg или Paimon-каталоги — там формат универсальный.
Iceberg и Paimon Catalogs
Современный выбор для lakehouse — IcebergCatalog (через REST или Hive backend) и PaimonCatalog. Они хранят метаданные Iceberg/Paimon-таблиц и интегрируются с любым движком, поддерживающим эти форматы (Spark, Trino, Flink, Snowflake).
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 's3://my-bucket/paimon-warehouse'
);
USE CATALOG paimon;
CREATE TABLE orders_paimon (
order_id BIGINT,
amount DECIMAL(10, 2),
ts TIMESTAMP_LTZ(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'bucket' = '4',
'changelog-producer' = 'lookup'
);
Iceberg и Paimon подробно — в модуле 13, урок 3.
CatalogTable vs ConnectorTable
Внутри Flink есть две градации:
- CatalogTable — метаданные в каталоге (схема + WITH-параметры). Создаётся DDL
CREATE TABLE. Persistent (если catalog persistent) или ephemeral (in-memory). - TemporaryTable —
CREATE TEMPORARY TABLE. Живёт только в текущей session, не сохраняется в catalog. Полезно для тестов и для тех таблиц, которые не должны попасть в shared metastore.
CREATE TEMPORARY TABLE temp_clicks (
user_id STRING,
click_ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
);
Попробуй сам
- Создай локальный HiveCatalog с in-memory Derby БД (для тестов):
new HiveCatalog("hive", "default", "src/test/resources/hive-conf")— настройте hive-site.xml с Derby backend. - Объяви Kafka source через DDL с парсингом nested JSON: например, поле
payload ROW<id BIGINT, name STRING>. - Попробуй переключиться между каталогами в Flink SQL CLI:
USE CATALOG default_catalog;потомUSE CATALOG hive_catalog;. Что произойдёт с таблицами созданными вdefault_catalogпосле рестарта SQL CLI?