Learning Platform
Глоссарий Troubleshooting
Урок 12.04 · 23 мин
Средний
federationkafkaelasticsearchopensearch

Коннекторы Kafka и Elasticsearch/OpenSearch: семантика чтения

Реляционные базы — не единственные источники федерации. В современном ландшафте данных есть две принципиально иные категории, которые Trino тоже умеет включать в федеративный запрос: системы потоковой передачи сообщений (Kafka) и поисковые движки (Elasticsearch и OpenSearch). Они устроены не как таблицы, и подключать их «как ещё одну базу» — ошибка. Этот урок разбирает семантику чтения этих источников: как Trino представляет поток Kafka в виде таблицы, как читает индексы поисковых систем и какие у этого ограничения.

Kafka: поток сообщений, представленный как таблица

Apache Kafka — это не база данных, а распределённый лог сообщений (брокер сообщений). Данные в Kafka организованы в топики; топик — это упорядоченная, дописываемая в конец последовательность сообщений. Каждое сообщение — это в общем случае пара «ключ, значение» в виде байтов плюс метаданные (offset — позиция в логе, partition, timestamp). Kafka не знает, что внутри байтов значения; для него это непрозрачный payload.

Задача Kafka-коннектора Trino — представить топик как таблицу, чтобы по нему можно было выполнять SQL. Концептуально маппинг такой: один топик Kafka становится одной таблицей Trino, а одно сообщение топика — одной строкой этой таблицы.

Но возникает проблема. Сообщение Kafka — это байты. Чтобы превратить байты в строку с типизированными колонками, коннектору нужно знать формат и структуру этих байтов: значение — это JSON? Avro? И какие в нём поля, каких типов? Сам Kafka этого не сообщает. Поэтому Kafka-коннектор Trino требует описания топика — конфигурации, которая задаёт, как разобрать сообщение: какой у него формат (JSON, Avro и другие) и как поля сообщения отображаются в колонки таблицы.

Kafka: брокеры, топики и партиции

Avro: Kafka-интеграция и wire formats Без такого описания коннектор не сможет дать осмысленные колонки — топик не превратится в полезную таблицу.

Kafka-коннектор: топик как таблица, сообщение как строка
Топик KafkaУпорядоченная дописываемая последовательность сообщений; каждое сообщение — байты ключа и значения плюс метаданные offset, partition, timestamp
описание топика задаёт разбор
Описание топикаКонфигурация коннектора: формат сообщения JSON или Avro и маппинг полей в колонки — Kafka сам структуру не сообщает
коннектор превращает в
Таблица TrinoТопик становится таблицей, сообщение — строкой с типизированными колонками; доступны и метаданные сообщения как колонки
# etc/catalog/kafka.properties — подключение кластера Kafka
connector.name=kafka
kafka.nodes=kafka-broker:9092
kafka.table-names=events,orders
kafka.table-description-supplier=file
-- Топик events представлен как таблица; SQL обычный.
-- Колонки получены из описания топика; доступны и метаданные сообщения.
SELECT event_type, count(*) AS cnt
FROM kafka.default.events
GROUP BY event_type
ORDER BY cnt DESC;

--  event_type | cnt
-- ------------+-------
--  click      | 88210
--  view       | 41005
--  purchase   |  9120

Семантика чтения Kafka: ключевые ограничения

Вот где критично не путать Kafka с базой данных. Семантика чтения потока сообщений принципиально отличается от семантики таблицы, и непонимание этого ведёт к неверным ожиданиям.

Таблица Kafka непрерывно меняется. В реляционную таблицу строки добавляют дискретными транзакциями. В топик Kafka сообщения льются непрерывно. Когда Trino выполняет SELECT по Kafka-таблице, он читает сообщения, имеющиеся в топике на момент запроса. Через секунду их уже больше — продьюсеры дописали новые. Два одинаковых SELECT count(*) подряд законно вернут разные числа. Это не баг, это природа потока.

Это чтение, не подписка. Trino-запрос — конечная операция: прочитал срез, вернул результат, завершился. Это не стриминговая подписка, которая работала бы бесконечно и эмитила новые сообщения по мере поступления. Trino-коннектор Kafka даёт снимок состояния топика в момент запроса, а не живой поток. Для непрерывной обработки потока существуют другие инструменты (Kafka Streams, Flink); Trino-коннектор Kafka — для аналитических запросов и федерации поверх содержимого топика на момент чтения.

Kafka обычно хранит данные ограниченное время. У топиков, как правило, настроен retention — старые сообщения удаляются по истечении срока или по достижении размера. Поэтому SELECT по Kafka-таблице видит не «всю историю», а только то, что укладывается в окно retention топика.

И всё же pushdown здесь работает в важном частном случае. Главная «координата» сообщения в Kafka — его offset, позиция в логе. Фильтр по offset (и по partition) коннектор может протолкнуть так, что Kafka отдаст только сообщения нужного диапазона, а не весь топик. А вот фильтр по полю внутри значения протолкнуть в Kafka нельзя — Kafka не индексирует содержимое сообщений и не понимает их структуру; такой фильтр Trino применит у себя после чтения.

WARNING

Trino-коннектор Kafka не делает Trino стриминговым движком. SELECT по Kafka-таблице — это разовый снимок топика на момент запроса, а не подписка на поток. Два одинаковых запроса законно вернут разные результаты, потому что топик непрерывно растёт, а старое вытесняется по retention. Для непрерывной потоковой обработки нужны Kafka Streams или Flink, а не Trino.

Elasticsearch и OpenSearch: поисковые индексы как таблицы

Вторая нереляционная категория — поисковые движки. Elasticsearch и OpenSearch — близкие системы (OpenSearch — форк Elasticsearch), и коннекторы Trino к ним устроены схоже. Это движки полнотекстового поиска и аналитики над документами.

Данные в них организованы в индексы; индекс хранит документы — JSON-объекты. У документов есть mapping — описание полей и их типов, которое ведёт сам поисковый движок. В отличие от Kafka, где структуру сообщения приходится описывать вручную, здесь движок уже знает структуру документов.

Коннектор представляет это как таблицы: один индекс становится таблицей, один документ — строкой. И, в отличие от Kafka, коннектор может сам получить схему — он читает mapping индекса и выводит из него колонки таблицы и их типы. Поэтому подключение поискового источника обычно проще, чем Kafka: ручное описание не требуется.

# etc/catalog/opensearch.properties — подключение OpenSearch
connector.name=opensearch
opensearch.host=opensearch-node
opensearch.port=9200
opensearch.default-schema-name=default
-- Индекс logs представлен как таблица; схема получена из mapping индекса
SELECT level, count(*) AS cnt
FROM opensearch.default.logs
WHERE level IN ('ERROR', 'WARN')
GROUP BY level;

--  level | cnt
-- -------+------
--  ERROR |  412
--  WARN  | 3890

Семантика чтения поисковых источников

У поисковых движков своя специфика, которую нужно учитывать.

Pushdown через поисковый запрос. Сила Elasticsearch/OpenSearch — быстрый поиск по индексам. Коннектор может проталкивать фильтрацию в поисковый движок: предикат превращается в нативный поисковый запрос, и движок возвращает только подходящие документы, используя свои индексы. Это тот же принцип predicate pushdown, что у реляционных коннекторов, и он критичен — без него Trino тянул бы весь индекс.

Полнотекстовый поиск. У поисковых движков есть то, чего нет у обычных баз, — полнотекстовый поиск с релевантностью. Коннектор даёт способ задействовать нативные поисковые запросы движка из Trino, чтобы не терять эту возможность при работе через SQL.

Особенности системы типов. Документы — это JSON, и модель данных поисковых движков гибче и свободнее реляционной: поле может быть массивом, вложенным объектом, в разных документах присутствовать или нет. Это не всегда ложится в строгие колонки чисто — маппинг типов поискового коннектора имеет свои тонкости, как и у JDBC-коннекторов. При неожиданном поведении поля смотрите раздел type mapping коннектора.

Kafka и поисковые движки: разные нереляционные источники
KafkaПоток сообщений; структуру задаёт ручное описание топика; SELECT — снимок на момент запроса; pushdown по offset и partition
и принципиально иной
Elasticsearch / OpenSearchИндексы документов; схему коннектор берёт из mapping сам; pushdown через нативный поисковый запрос; есть полнотекстовый поиск

Сравнение трёх категорий источников

Соберём картину федеративных источников, разобранных в модуле.

АспектРеляционная база (JDBC)KafkaElasticsearch / OpenSearch
Природа источникаТаблицы со строкамиПоток сообщений в топикахИндексы JSON-документов
Единица -> строка TrinoСтрока таблицыСообщение топикаДокумент индекса
Откуда схемаМетаданные базы через JDBCРучное описание топикаMapping индекса (автоматически)
Стабильность результатаСтабилен между записямиМеняется непрерывноЗависит от обновлений индекса
PushdownPredicate, projection, aggregation, limitПо offset / partitionЧерез нативный поисковый запрос
Главное предостережениеНе OLTP-движокСнимок, а не стримГибкая модель типов JSON

Общий вывод модуля: федерация Trino охватывает не только реляционные базы. Но каждый класс источников несёт свою семантику чтения, и подключать его, перенося ожидания от реляционной таблицы, — ошибка. Kafka — это снимок растущего потока, а не таблица; поисковый индекс — это документы с гибкой схемой и полнотекстовым поиском. Понимать природу источника так же важно, как уметь прописать каталог.

Попробуй сам

В песочнице курса подключите два нереляционных источника. Упражнение первое, Kafka: поднимите брокер Kafka, создайте топик и отправьте в него несколько сообщений в JSON. Настройте Kafka-каталог с описанием топика (формат и маппинг полей в колонки) и выполните SELECT по получившейся таблице. Затем отправьте в топик ещё сообщений и выполните тот же SELECT count(*) снова — зафиксируйте, что число изменилось, и объясните почему. Упражнение второе, OpenSearch: поднимите узел OpenSearch, создайте индекс, добавьте несколько JSON-документов. Настройте каталог и выполните DESCRIBE таблицы — обратите внимание, что схему коннектор получил сам, без ручного описания. Выполните SELECT с фильтром. Упражнение третье, письменно: сравните три категории источников из модуля (JDBC, Kafka, поисковый движок) по трём осям — откуда берётся схема таблицы, стабилен ли результат повторного запроса, и какой pushdown возможен. Объясните, почему называть Kafka-коннектор «стриминговым» — неточно.


Проверка знанийKnowledge check
Чем семантика чтения Kafka и поисковых движков (Elasticsearch/OpenSearch) отличается от реляционных источников, и почему Kafka-коннектор Trino нельзя считать стриминговым?
ОтветAnswer
Kafka и поисковые движки — нереляционные источники с принципиально иной семантикой чтения. Kafka — это не база, а распределённый лог сообщений: данные организованы в топики, топик это упорядоченная дописываемая последовательность сообщений, каждое сообщение — байты ключа и значения плюс метаданные offset, partition, timestamp. Kafka-коннектор представляет топик как таблицу, а сообщение как строку, но сам Kafka не знает структуру байтов значения, поэтому коннектор требует ручного описания топика — формата сообщения JSON или Avro и маппинга полей в колонки. У реляционного источника схему даёт сама база через метаданные JDBC, у поискового движка коннектор получает схему сам из mapping индекса автоматически — а у Kafka описание задаётся вручную. Главное отличие семантики чтения Kafka: таблица непрерывно меняется. В топик сообщения льются непрерывно, и SELECT читает сообщения, имеющиеся на момент запроса; два одинаковых SELECT count подряд законно вернут разные числа, а старое вытесняется по retention топика. Kafka-коннектор нельзя считать стриминговым именно поэтому: Trino-запрос — конечная операция, он читает срез топика на момент запроса, возвращает результат и завершается. Это снимок состояния, а не подписка, которая работала бы бесконечно и эмитила новые сообщения по мере поступления. Для непрерывной потоковой обработки нужны Kafka Streams или Flink, а не Trino. Pushdown в Kafka работает по offset и partition — координатам сообщения в логе, но фильтр по полю внутри значения протолкнуть нельзя, Kafka не индексирует содержимое сообщений. Elasticsearch и OpenSearch хранят индексы JSON-документов; коннектор представляет индекс как таблицу, документ как строку, проталкивает фильтрацию через нативный поисковый запрос с использованием индексов, даёт доступ к полнотекстовому поиску, а гибкая модель JSON-документов делает маппинг типов нетривиальным. Каждый класс источников несёт свою семантику, и переносить на него ожидания от реляционной таблицы — ошибка.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Почему Kafka-коннектор Trino требует ручного описания топика, тогда как коннектор OpenSearch получает схему сам?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5