Коннекторы Kafka и Elasticsearch/OpenSearch: семантика чтения
Реляционные базы — не единственные источники федерации. В современном ландшафте данных есть две принципиально иные категории, которые Trino тоже умеет включать в федеративный запрос: системы потоковой передачи сообщений (Kafka) и поисковые движки (Elasticsearch и OpenSearch). Они устроены не как таблицы, и подключать их «как ещё одну базу» — ошибка. Этот урок разбирает семантику чтения этих источников: как Trino представляет поток Kafka в виде таблицы, как читает индексы поисковых систем и какие у этого ограничения.
Kafka: поток сообщений, представленный как таблица
Apache Kafka — это не база данных, а распределённый лог сообщений (брокер сообщений). Данные в Kafka организованы в топики; топик — это упорядоченная, дописываемая в конец последовательность сообщений. Каждое сообщение — это в общем случае пара «ключ, значение» в виде байтов плюс метаданные (offset — позиция в логе, partition, timestamp). Kafka не знает, что внутри байтов значения; для него это непрозрачный payload.
Задача Kafka-коннектора Trino — представить топик как таблицу, чтобы по нему можно было выполнять SQL. Концептуально маппинг такой: один топик Kafka становится одной таблицей Trino, а одно сообщение топика — одной строкой этой таблицы.
Но возникает проблема. Сообщение Kafka — это байты. Чтобы превратить байты в строку с типизированными колонками, коннектору нужно знать формат и структуру этих байтов: значение — это JSON? Avro? И какие в нём поля, каких типов? Сам Kafka этого не сообщает. Поэтому Kafka-коннектор Trino требует описания топика — конфигурации, которая задаёт, как разобрать сообщение: какой у него формат (JSON, Avro и другие) и как поля сообщения отображаются в колонки таблицы.
Kafka: брокеры, топики и партицииAvro: Kafka-интеграция и wire formats Без такого описания коннектор не сможет дать осмысленные колонки — топик не превратится в полезную таблицу.
# etc/catalog/kafka.properties — подключение кластера Kafka
connector.name=kafka
kafka.nodes=kafka-broker:9092
kafka.table-names=events,orders
kafka.table-description-supplier=file
-- Топик events представлен как таблица; SQL обычный.
-- Колонки получены из описания топика; доступны и метаданные сообщения.
SELECT event_type, count(*) AS cnt
FROM kafka.default.events
GROUP BY event_type
ORDER BY cnt DESC;
-- event_type | cnt
-- ------------+-------
-- click | 88210
-- view | 41005
-- purchase | 9120
Семантика чтения Kafka: ключевые ограничения
Вот где критично не путать Kafka с базой данных. Семантика чтения потока сообщений принципиально отличается от семантики таблицы, и непонимание этого ведёт к неверным ожиданиям.
Таблица Kafka непрерывно меняется. В реляционную таблицу строки добавляют дискретными транзакциями. В топик Kafka сообщения льются непрерывно. Когда Trino выполняет SELECT по Kafka-таблице, он читает сообщения, имеющиеся в топике на момент запроса. Через секунду их уже больше — продьюсеры дописали новые. Два одинаковых SELECT count(*) подряд законно вернут разные числа. Это не баг, это природа потока.
Это чтение, не подписка. Trino-запрос — конечная операция: прочитал срез, вернул результат, завершился. Это не стриминговая подписка, которая работала бы бесконечно и эмитила новые сообщения по мере поступления. Trino-коннектор Kafka даёт снимок состояния топика в момент запроса, а не живой поток. Для непрерывной обработки потока существуют другие инструменты (Kafka Streams, Flink); Trino-коннектор Kafka — для аналитических запросов и федерации поверх содержимого топика на момент чтения.
Kafka обычно хранит данные ограниченное время. У топиков, как правило, настроен retention — старые сообщения удаляются по истечении срока или по достижении размера. Поэтому SELECT по Kafka-таблице видит не «всю историю», а только то, что укладывается в окно retention топика.
И всё же pushdown здесь работает в важном частном случае. Главная «координата» сообщения в Kafka — его offset, позиция в логе. Фильтр по offset (и по partition) коннектор может протолкнуть так, что Kafka отдаст только сообщения нужного диапазона, а не весь топик. А вот фильтр по полю внутри значения протолкнуть в Kafka нельзя — Kafka не индексирует содержимое сообщений и не понимает их структуру; такой фильтр Trino применит у себя после чтения.
Trino-коннектор Kafka не делает Trino стриминговым движком. SELECT по Kafka-таблице — это разовый снимок топика на момент запроса, а не подписка на поток. Два одинаковых запроса законно вернут разные результаты, потому что топик непрерывно растёт, а старое вытесняется по retention. Для непрерывной потоковой обработки нужны Kafka Streams или Flink, а не Trino.
Elasticsearch и OpenSearch: поисковые индексы как таблицы
Вторая нереляционная категория — поисковые движки. Elasticsearch и OpenSearch — близкие системы (OpenSearch — форк Elasticsearch), и коннекторы Trino к ним устроены схоже. Это движки полнотекстового поиска и аналитики над документами.
Данные в них организованы в индексы; индекс хранит документы — JSON-объекты. У документов есть mapping — описание полей и их типов, которое ведёт сам поисковый движок. В отличие от Kafka, где структуру сообщения приходится описывать вручную, здесь движок уже знает структуру документов.
Коннектор представляет это как таблицы: один индекс становится таблицей, один документ — строкой. И, в отличие от Kafka, коннектор может сам получить схему — он читает mapping индекса и выводит из него колонки таблицы и их типы. Поэтому подключение поискового источника обычно проще, чем Kafka: ручное описание не требуется.
# etc/catalog/opensearch.properties — подключение OpenSearch
connector.name=opensearch
opensearch.host=opensearch-node
opensearch.port=9200
opensearch.default-schema-name=default
-- Индекс logs представлен как таблица; схема получена из mapping индекса
SELECT level, count(*) AS cnt
FROM opensearch.default.logs
WHERE level IN ('ERROR', 'WARN')
GROUP BY level;
-- level | cnt
-- -------+------
-- ERROR | 412
-- WARN | 3890
Семантика чтения поисковых источников
У поисковых движков своя специфика, которую нужно учитывать.
Pushdown через поисковый запрос. Сила Elasticsearch/OpenSearch — быстрый поиск по индексам. Коннектор может проталкивать фильтрацию в поисковый движок: предикат превращается в нативный поисковый запрос, и движок возвращает только подходящие документы, используя свои индексы. Это тот же принцип predicate pushdown, что у реляционных коннекторов, и он критичен — без него Trino тянул бы весь индекс.
Полнотекстовый поиск. У поисковых движков есть то, чего нет у обычных баз, — полнотекстовый поиск с релевантностью. Коннектор даёт способ задействовать нативные поисковые запросы движка из Trino, чтобы не терять эту возможность при работе через SQL.
Особенности системы типов. Документы — это JSON, и модель данных поисковых движков гибче и свободнее реляционной: поле может быть массивом, вложенным объектом, в разных документах присутствовать или нет. Это не всегда ложится в строгие колонки чисто — маппинг типов поискового коннектора имеет свои тонкости, как и у JDBC-коннекторов. При неожиданном поведении поля смотрите раздел type mapping коннектора.
Сравнение трёх категорий источников
Соберём картину федеративных источников, разобранных в модуле.
| Аспект | Реляционная база (JDBC) | Kafka | Elasticsearch / OpenSearch |
|---|---|---|---|
| Природа источника | Таблицы со строками | Поток сообщений в топиках | Индексы JSON-документов |
| Единица -> строка Trino | Строка таблицы | Сообщение топика | Документ индекса |
| Откуда схема | Метаданные базы через JDBC | Ручное описание топика | Mapping индекса (автоматически) |
| Стабильность результата | Стабилен между записями | Меняется непрерывно | Зависит от обновлений индекса |
| Pushdown | Predicate, projection, aggregation, limit | По offset / partition | Через нативный поисковый запрос |
| Главное предостережение | Не OLTP-движок | Снимок, а не стрим | Гибкая модель типов JSON |
Общий вывод модуля: федерация Trino охватывает не только реляционные базы. Но каждый класс источников несёт свою семантику чтения, и подключать его, перенося ожидания от реляционной таблицы, — ошибка. Kafka — это снимок растущего потока, а не таблица; поисковый индекс — это документы с гибкой схемой и полнотекстовым поиском. Понимать природу источника так же важно, как уметь прописать каталог.
Попробуй сам
В песочнице курса подключите два нереляционных источника. Упражнение первое, Kafka: поднимите брокер Kafka, создайте топик и отправьте в него несколько сообщений в JSON. Настройте Kafka-каталог с описанием топика (формат и маппинг полей в колонки) и выполните SELECT по получившейся таблице. Затем отправьте в топик ещё сообщений и выполните тот же SELECT count(*) снова — зафиксируйте, что число изменилось, и объясните почему. Упражнение второе, OpenSearch: поднимите узел OpenSearch, создайте индекс, добавьте несколько JSON-документов. Настройте каталог и выполните DESCRIBE таблицы — обратите внимание, что схему коннектор получил сам, без ручного описания. Выполните SELECT с фильтром. Упражнение третье, письменно: сравните три категории источников из модуля (JDBC, Kafka, поисковый движок) по трём осям — откуда берётся схема таблицы, стабилен ли результат повторного запроса, и какой pushdown возможен. Объясните, почему называть Kafka-коннектор «стриминговым» — неточно.