Learning Platform
Глоссарий Troubleshooting
Урок 13.02 · 18 мин
Средний
TableEnvironmentCatalogHive CatalogDDLKafkaMetadata

TableEnvironment и Catalog

Перед тем как написать первый SELECT, Flink надо сообщить две вещи: где живут данные (источники, sinks) и как они описываются (схемы, формат). Эту задачу решает связка TableEnvironment + Catalog. TableEnvironment — это рантайм Table API/SQL внутри Flink-приложения. Catalog — это место, где хранятся метаданные таблиц.

В этом уроке разберём StreamTableEnvironment, как создавать таблицы через DDL, и какие виды каталогов бывают — от эфемерного in-memory до полноценного Hive Metastore с многопроцессовой шарингом метаданных.


StreamTableEnvironment

Каждое Flink-приложение, использующее Table API/SQL, начинается с создания TableEnvironment. Для streaming-задач (что в этом курсе и есть наш дефолт) используется StreamTableEnvironment:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// DDL — создаём source table
tEnv.executeSql("""
    CREATE TABLE clicks (
      user_id STRING,
      url STRING,
      ts TIMESTAMP_LTZ(3),
      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'clicks',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'flink-clicks-consumer',
      'scan.startup.mode' = 'group-offsets',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601'
    )
    """);

StreamTableEnvironment шарит underlying StreamExecutionEnvironment — все настройки parallelism, checkpointing, state backend применяются к обоим API. Это важно: вы можете в одном приложении свободно миксовать DataStream и Table API, и checkpoints, savepoints, restart strategy будут общими.

NOTE

Для pure-Table приложений без DataStream-кода также существует TableEnvironment (без префикса Stream). Он создаётся без StreamExecutionEnvironment и не позволяет конвертацию в/из DataStream. Используйте TableEnvironment для чистых SQL-only job; StreamTableEnvironment — когда нужен mix с DataStream API.


Catalog API

Catalog — это namespace для метаданных Flink-таблиц. Иерархия: catalog.database.table.

Hive Metastore Catalog в Spark: тот же metastore что и в Flink По умолчанию Flink создаёт один каталог default_catalog с одной базой default_database. Все DDL CREATE TABLE без явного USE CATALOG ложатся туда.

Зачем нужны каталоги:

  • Изоляция окружений — отдельный каталог под dev, staging, prod.
  • Шаринг метаданных между job’ами — если несколько Flink-приложений или ad-hoc SQL CLI должны видеть одну и ту же таблицу, эта таблица хранится в shared-каталоге (Hive Metastore, Iceberg REST).
  • Интеграция с lakehouse — Iceberg Catalog, Paimon Catalog позволяют автоматически открывать все таблицы стандартного DWH-озера.
Catalog hierarchy в Flink

TableEnvironment

TableEnvironment может одновременно работать с несколькими каталогами. Один из них активный — операции без префикса идут туда.
default_catalog (in-memory)Дефолтный каталог. In-memory: метаданные живут только во время работы job. После рестарта DDL надо выполнить заново. Удобен для разработки и SQL-only ad-hoc.
hive_catalogHiveCatalog: метаданные хранятся в Hive Metastore (MySQL/Postgres под капотом). Persistent — переживают рестарт job. Шарятся между job'ами и движками (Flink, Spark, Trino).
iceberg_catalogIceberg REST Catalog или Hive-backed Iceberg Catalog. Хранит схемы Iceberg-таблиц. Современный выбор для lakehouse.

Переключение между каталогами:

SHOW CATALOGS;
USE CATALOG hive_catalog;
SHOW DATABASES;
USE prod;
SHOW TABLES;
SELECT * FROM clickstream_events LIMIT 10;

Каталог можно задать в DDL и через программный API:

import org.apache.flink.table.catalog.hive.HiveCatalog;

HiveCatalog hive = new HiveCatalog(
    "hive_catalog",
    "default",
    "/opt/hive-conf"
);
tEnv.registerCatalog("hive_catalog", hive);
tEnv.useCatalog("hive_catalog");

DDL для Kafka source

DDL создаёт catalog-entry, описывающий таблицу. Запрос данных при SELECT инстанциирует connector с параметрами из WITH-блока.

CREATE TABLE orders (
  order_id BIGINT,
  customer_id BIGINT,
  amount DECIMAL(10, 2),
  ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp',
  WATERMARK FOR ts AS ts - INTERVAL '10' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'orders',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'flink-orders',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'avro-confluent',
  'avro-confluent.schema-registry.url' = 'http://schema-registry:8081'
);

Что здесь происходит:

  1. Схема колонок — типы данных + computed columns. METADATA FROM 'timestamp' извлекает Kafka-метаданные (timestamp записи).
  2. WATERMARK — декларация event-time с допуском (5 секунд out-of-orderness здесь). Подробнее watermarks в модуле 06.
  3. WITH-параметры — connector-specific. kafka коннектор знает свои опции: topic, properties.*, scan.startup.mode, format.
  4. Format — как декодировать payload. JSON, Avro, Protobuf, CSV, raw. avro-confluent использует Confluent Schema Registry.
WARNING

DDL — это declaration, не execution. CREATE TABLE не подключается к Kafka и не валидирует, что топик существует. Connector создаётся при первом SELECT или INSERT. Ошибка типа “topic does not exist” появится при запуске job, а не при DDL.


In-Memory Catalog

Дефолтный каталог Flink — GenericInMemoryCatalog. Метаданные хранятся в JVM heap. Преимущества: ноль зависимостей, мгновенный старт. Недостатки: метаданные живут только в одном job, после рестарта job DDL придётся выполнить заново.

Когда использовать:

  • Локальная разработка и unit-тесты.
  • Embedded-job с фиксированным набором таблиц.
  • Простые batch-задачи где DDL — часть скрипта.

Когда не использовать:

  • SQL CLI для ad-hoc-аналитики (теряете state после reconnect).
  • Production-job, переживающий рестарт со сложной схемой (DDL надо где-то хранить версионно).

Hive Catalog

HiveCatalog — это адаптер к Hive Metastore. Hive Metastore — это сервис, который хранит метаданные таблиц в внешней БД (обычно MySQL или Postgres). HiveCatalog даёт persistent-хранилище метаданных, plus совместимость с Spark, Trino, Presto, Hive — все они могут читать таблицы из того же metastore.

HiveCatalog hive = new HiveCatalog(
    "hive_catalog",              // имя в Flink
    "default",                    // database по умолчанию
    "/opt/hive-conf",             // путь к hive-site.xml
    "3.1.3"                       // версия Hive
);
tEnv.registerCatalog("hive_catalog", hive);
tEnv.useCatalog("hive_catalog");

// Теперь DDL ложится в Hive Metastore
tEnv.executeSql("""
    CREATE TABLE pageviews (
      url STRING,
      view_count BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'pageviews',
      ...
    )
    """);

После создания таблица сохранена в metastore и доступна:

  • Другому Flink-job в том же кластере.
  • Spark-job через spark.sql("SELECT * FROM pageviews") (Spark должен знать формат — Hive-managed таблицы Spark читает напрямую, Kafka-таблицы — нет, тут уже разные коннекторы).
  • Trino через тот же metastore.
TIP

Не все Flink-таблицы переносимы через Hive Metastore в другие движки. Hive-managed таблицы (Parquet/ORC файлы в HDFS/S3) — да, читаются всеми. Kafka, JDBC, Datagen и прочие Flink-only connector-таблицы хранятся в metastore как метаданные, но другой движок (Spark/Trino) их не знает. Для shared lakehouse-таблиц используйте Iceberg или Paimon-каталоги — там формат универсальный.


Iceberg и Paimon Catalogs

Современный выбор для lakehouse — IcebergCatalog (через REST или Hive backend) и PaimonCatalog. Они хранят метаданные Iceberg/Paimon-таблиц и интегрируются с любым движком, поддерживающим эти форматы (Spark, Trino, Flink, Snowflake).

CREATE CATALOG paimon WITH (
  'type' = 'paimon',
  'warehouse' = 's3://my-bucket/paimon-warehouse'
);

USE CATALOG paimon;

CREATE TABLE orders_paimon (
  order_id BIGINT,
  amount DECIMAL(10, 2),
  ts TIMESTAMP_LTZ(3),
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'bucket' = '4',
  'changelog-producer' = 'lookup'
);

Iceberg и Paimon подробно — в модуле 13, урок 3.


CatalogTable vs ConnectorTable

Внутри Flink есть две градации:

  • CatalogTable — метаданные в каталоге (схема + WITH-параметры). Создаётся DDL CREATE TABLE. Persistent (если catalog persistent) или ephemeral (in-memory).
  • TemporaryTableCREATE TEMPORARY TABLE. Живёт только в текущей session, не сохраняется в catalog. Полезно для тестов и для тех таблиц, которые не должны попасть в shared metastore.
CREATE TEMPORARY TABLE temp_clicks (
  user_id STRING,
  click_ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '100'
);

Попробуй сам

  1. Создай локальный HiveCatalog с in-memory Derby БД (для тестов): new HiveCatalog("hive", "default", "src/test/resources/hive-conf") — настройте hive-site.xml с Derby backend.
  2. Объяви Kafka source через DDL с парсингом nested JSON: например, поле payload ROW<id BIGINT, name STRING>.
  3. Попробуй переключиться между каталогами в Flink SQL CLI: USE CATALOG default_catalog; потом USE CATALOG hive_catalog;. Что произойдёт с таблицами созданными в default_catalog после рестарта SQL CLI?
Проверка знанийKnowledge check
Команда хочет, чтобы ad-hoc SQL-аналитики через Flink SQL CLI видели те же таблицы, что и production Flink job. Какой каталог нужно использовать и почему default_catalog не подойдёт?
ОтветAnswer
Нужен persistent shared catalog — HiveCatalog или IcebergCatalog/PaimonCatalog с external metastore (MySQL/Postgres под Hive, REST-сервер для Iceberg). default_catalog — это GenericInMemoryCatalog, метаданные которого живут только в JVM текущего процесса. Каждый запуск Flink SQL CLI создаёт свой собственный default_catalog. Если аналитик создаёт таблицу в SQL CLI и она нужна production job — её метаданные пропадут при выходе из CLI, а production job их вообще не увидит. HiveCatalog решает проблему: оба процесса (CLI и job) регистрируют один и тот же HiveCatalog с одним hive-site.xml, и видят общий namespace таблиц.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Команда настраивает Flink SQL CLI для ad-hoc-аналитики и хочет, чтобы созданные таблицы переживали рестарт CLI и были видны production Flink job. Какой каталог использовать?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5