Event listeners, OpenLineage и filesystem caching
В этом уроке две темы, на первый взгляд разные, но связанные одной идеей: оба механизма работают на уровне всего кластера, а не отдельного запроса. Event listeners превращают каждый запрос в событие для внешней системы — это фундамент аудита, истории и data lineage. Filesystem caching кеширует данные object storage на локальных дисках воркеров — это один из самых сильных рычагов производительности для lakehouse-нагрузок.
Web UI забывает запросы через короткое время. Event listeners решают проблему истории. А filesystem caching решает проблему, которую не лечит никакой EXPLAIN: повторное чтение одних и тех же данных по сети из удалённого хранилища.
Event listeners: запрос как событие
Event listener — это плагин, который Trino вызывает на ключевых точках жизненного цикла запроса. Координатор генерирует три типа событий: при создании запроса (queryCreated), при завершении (queryCompleted) и при разбиении на splits. Плагин получает событие с полным контекстом и делает с ним что нужно: пишет в БД, шлёт в Kafka, в систему аудита, в lineage-сервис.
Событие queryCompleted особенно богатое. В нём — текст SQL, пользователь и принципал аутентификации, resource group, точные тайминги фаз, потреблённые CPU и память, число обработанных строк и байт, список прочитанных и записанных таблиц и колонок, код ошибки при падении, статистика операторов. Фактически это полный паспорт запроса.
Подключается event listener файлом конфигурации, который указывает на реализацию плагина:
# etc/event-listener.properties
event-listener.name=openlineage
openlineage-event-listener.transport.type=HTTP
openlineage-event-listener.transport.url=http://marquez:5000
openlineage-event-listener.trino.uri=http://coordinator:8080
Файл event-listener.properties должен лежать на координаторе. Можно подключить и несколько listener’ов, перечислив несколько файлов в config.properties через свойство event-listener.config-files. Типовое применение event listener’ов: долговременный аудит (кто, что и когда запускал), накопление истории для capacity planning, отправка метрик качества и, главное, data lineage.
OpenLineage: data lineage из коробки
Lineage (происхождение данных) — это ответ на вопросы «откуда взялись данные в этой таблице» и «что сломается, если я изменю вот эту». Для аналитической платформы lineage критичен: без него невозможны impact-анализ, отладка расхождений в отчётах и аудит для регуляторов.
Trino поставляется со встроенным OpenLineage event listener. OpenLineage — открытый стандарт описания происхождения данных; Trino-listener генерирует ровно два события на запрос: START при старте и COMPLETE либо FAIL при завершении.
Airflow: OpenLineage как кросс-системный стандарт lineage События содержат входные датасеты (что прочитано), выходные (что записано) и, что особенно ценно, column-level lineage — связь конкретной выходной колонки с конкретными входными.
Column-level lineage отвечает на вопрос точнее, чем table-level: не просто «mart.revenue зависит от raw.orders», а «колонка mart.revenue.total вычислена из raw.orders.amount и raw.orders.discount». Это позволяет точно оценить, какие именно отчёты затронет изменение конкретного поля источника.
События отправляются в OpenLineage-совместимый бэкенд — например, Marquez (референсная реализация) или Microsoft Purview. Бэкенд собирает события всех запросов в единый граф lineage, который можно просматривать и обходить.
Lineage из event listener’ов работает только для того, что прошло через Trino. Если часть данных грузится в lakehouse мимо Trino — другим движком или загрузчиком — эти рёбра в графе не появятся. OpenLineage задуман как кросс-системный стандарт: тот же формат событий эмитят Airflow, Spark, dbt. Полная картина происхождения собирается, когда событиями инструментированы все звенья пайплайна, а не один Trino.
Filesystem caching: рычаг производительности lakehouse
Теперь — про производительность. Для lakehouse-нагрузок Trino главное узкое место — не CPU и часто даже не память, а сеть до object storage. Когда fact-таблица лежит Parquet-файлами в S3, MinIO или другом удалённом хранилище, каждый скан тянет данные по сети. А аналитические нагрузки по своей природе читают одни и те же популярные таблицы и партиции снова и снова: дашборды обновляются по расписанию, аналитики бьют по тем же витринам. Каждый раз — та же сеть, тот же байт по проводу.
Filesystem caching в Trino решает это: воркеры кешируют прочитанные из object storage данные на свои локальные диски (NVMe/SSD). Повторное чтение того же файла идёт не по сети до S3, а с локального диска воркера — на порядки быстрее по латентности и без расхода полосы до хранилища. Это и есть рычаг: правильно настроенный кеш превращает медленный сетевой скан в быстрое локальное чтение.
Кеш — это подсистема нативной файловой системы Trino, общая для lakehouse-коннекторов: его поддерживают Iceberg, Delta Lake, Hive, Hudi и единый Lakehouse-коннектор. Включается в свойствах коннектора каталога:
# etc/catalog/iceberg.properties
connector.name=iceberg
iceberg.catalog.type=rest
iceberg.rest-catalog.uri=http://rest-catalog:8181
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
# filesystem caching
fs.cache.enabled=true
fs.cache.directories=/mnt/cache
fs.cache.max-sizes=200GB
Свойство fs.cache.directories — это локальные пути на воркерах под кеш; их можно перечислить несколько (несколько дисков). fs.cache.max-sizes ограничивает размер кеша; при переполнении старые записи вытесняются. Под кеш отводят быстрые локальные диски — именно их латентность определяет выигрыш.
Cache-aware scheduling: почему кеш бесполезен без планировщика
Сам по себе локальный кеш дал бы выигрыш только случайно. Кеш на воркере полезен, лишь если splits именно тех файлов, что в нём лежат, попадут на тот же самый воркер. Если планировщик каждый раз отправляет splits на случайные ноды, кеш-попаданий почти не будет: данные есть на воркере А, а split ушёл на воркер Б.
Поэтому filesystem caching работает в паре с cache-aware scheduling. Планировщик использует консистентное хеширование: split конкретного файла стабильно назначается на один и тот же воркер. В результате повторный скан файла направляется туда, где этот файл уже закеширован, — и чтение идёт с локального диска. Кеш и cache-aware scheduling — две половины одного механизма: кеш хранит данные, планировщик гарантирует, что запрос за ними придёт в правильное место.
Что именно кешируется
Стоит уточнить, что filesystem caching кеширует не «таблицу» и не «результат запроса», а данные файлов object storage на уровне файловой системы. Это важное отличие. Кеш результатов запроса хранил бы готовые ответы и был бы бесполезен, чуть изменись запрос. Кеш файловой системы работает уровнем ниже: он хранит куски прочитанных из S3 файлов — Parquet-данные, метаданные, — и помогает любому запросу, который читает те же файлы, независимо от того, какой это запрос.
Это даёт правильную интуицию о том, когда кеш сработает. Два совершенно разных по тексту запроса — один считает выручку, другой средний чек — если оба сканируют одни и те же партиции raw_sales, оба попадут в один и тот же закешированный набор файлов. Кеш не знает и не должен знать про SQL; он знает только про файлы и их куски. Поэтому горячие партиции, по которым бьёт много разных запросов, — идеальный кандидат на кеширование: один прогрев приносит пользу всем запросам к этим данным.
Из этого же следует, почему кеш надо инвалидировать при изменении данных. Если файл в object storage перезаписан (например, после OPTIMIZE, который компактует таблицу), старый закешированный кусок устарел. Подсистема кеширования отслеживает это и не отдаёт устаревшие данные — корректность важнее, чем cache hit. Но практический вывод такой: после массовой перезаписи таблицы первые запросы снова окажутся cold, пока кеш не прогреется свежими файлами.
Где кеш помогает, а где нет
Filesystem caching — мощный, но не универсальный рычаг.
| Сценарий | Эффект кеша |
|---|---|
| Дашборды и витрины бьют по одним таблицам | Большой выигрыш: горячие данные оседают в кеше |
| Удалённое хранилище с заметной сетевой латентностью | Большой выигрыш: локальный диск кратно быстрее |
| Повторные запросы по одним партициям | Большой выигрыш: высокий cache hit rate |
| Разовые ad-hoc сканы каждый раз новых данных | Эффекта почти нет: каждое чтение — cold |
| Рабочий датасет много больше суммарного кеша | Эффект слабый: постоянное вытеснение, низкий hit rate |
| Очень быстрое локальное хранилище | Эффект мал: сеть и так не узкое место |
Filesystem caching не отменяет грамотного партиционирования, компактных файлов и pushdown — кеш ускоряет чтение тех данных, которые запрос всё равно прочтёт. Если запрос из-за плохого партиционирования сканирует лишнее, кеш просто быстрее отдаст это лишнее. Кеш — рычаг поверх корректной схемы хранения, а не замена ей. Сначала добиваются, чтобы запрос читал только нужное (об этом следующий урок), и лишь потом кешем ускоряют повторные чтения.
Метрики кеша (cache hit rate, объём кеша, вытеснения) доступны через JMX и попадают в Prometheus — мониторить hit rate стоит так же, как heap и очереди. Низкий hit rate означает, что кеш не работает: рабочий датасет не влезает либо нагрузка слишком разнородна.
Попробуй сам
Часть про event listeners — на любом Trino; часть про кеш — концептуальная, на lakehouse-каталоге.
- Включите OpenLineage event listener: создайте
etc/event-listener.propertiesсevent-listener.name=openlineageи (для пробы без бэкенда)openlineage-event-listener.transport.type=CONSOLE. Перезапустите Trino. - Выполните запрос с записью, например
CREATE TABLE memory.default.t AS SELECT * FROM tpch.tiny.nation. Найдите в логах координатора два события OpenLineage —STARTиCOMPLETE— и рассмотрите входные и выходные датасеты. - На lakehouse-каталоге (Iceberg или Hive поверх MinIO) добавьте в properties каталога
fs.cache.enabled=true,fs.cache.directoriesиfs.cache.max-sizes. - Выполните один и тот же тяжёлый скан партиции дважды подряд. Сравните Scheduled-время в Web UI или CPU/wall в
EXPLAIN ANALYZE: второй прогон должен быть заметно быстрее за счёт чтения из кеша. - Через jmx-коннектор найдите MBean’ы кеша и посмотрите cache hit rate после нескольких запросов.
Цель — увидеть, как запрос превращается в событие lineage, и измерить разницу между cold- и warm-чтением lakehouse-таблицы.