Exchange manager: спулинг промежуточных данных
В прошлом уроке мы выяснили: retry-policy=TASK требует, чтобы промежуточные данные обмена были материализованы в надёжном хранилище — иначе упавшую задачу негде переисполнить. Компонент, который этим занимается, называется exchange manager. Это сердце FTE: без него отказоустойчивость на уровне задач невозможна.
Разберём, что такое exchange manager, чем спулящийся обмен отличается от обычного потокового обмена, какие хранилища он поддерживает и как его настроить.
Что такое exchange и почему его спулят
Вернёмся к понятию exchange. Exchange — передача данных между стадиями запроса: вывод одной стадии становится входом следующей. В классическом Trino (retry-policy=NONE) exchange потоковый: данные стримятся по сети напрямую от воркеров-продьюсеров к воркерам-консьюмерам через output buffers и exchange clients, нигде не задерживаясь.
Потоковый обмен быстр, но эфемерен: данные существуют только в момент передачи. Сбой воркера — и переданное им потеряно безвозвратно. Для retry-policy=TASK этого мало: чтобы переисполнить задачу, её вход должен быть доступен и после сбоя любого воркера.
Решение — spooling exchange. Вместо прямого стриминга от продьюсера к консьюмеру промежуточные данные записываются в надёжное внешнее хранилище. Консьюмер читает их уже оттуда. Хранилище переживает падение любого отдельного воркера, поэтому спулённый промежуток всегда доступен для переисполнения задачи.
Exchange manager — это и есть компонент, реализующий spooling exchange: он отвечает за запись промежутка в хранилище и чтение его обратно.
Где живёт exchange manager
Exchange manager настраивается на всех нодах кластера — и на координаторе, и на каждом воркере — отдельным файлом etc/exchange-manager.properties. Файл должен быть идентичным на всех нодах: они все участвуют в обмене и должны одинаково понимать, куда писать и откуда читать промежуток.
# etc/exchange-manager.properties — одинаковый на ВСЕХ нодах кластера
exchange-manager.name=filesystem
Свойство exchange-manager.name задаёт реализацию exchange manager. Значение filesystem покрывает все файлоподобные хранилища — локальную файловую систему и облачные объектные хранилища (S3, GCS, Azure); конкретное хранилище определяется уже параметром базовых директорий. Существует и реализация для HDFS.
etc/exchange-manager.properties обязателен на КАЖДОЙ ноде, не только на координаторе. Если файл забыли положить хотя бы на один воркер или он там отличается, кластер с retry-policy=TASK будет вести себя некорректно: ноды не сойдутся в том, куда спулить обмен. Координатор и воркеры в FTE — равноправные участники спулинга.
Хранилища для спулинга
Куда exchange manager кладёт промежуток, задаёт свойство exchange.base-directories (одна или несколько директорий через запятую). Поддерживаемые варианты:
Объектное хранилище S3 или S3-совместимое (включая MinIO):
# etc/exchange-manager.properties
exchange-manager.name=filesystem
exchange.base-directories=s3://trino-fte-exchange/spool
exchange.s3.region=eu-central-1
exchange.s3.aws-access-key=...
exchange.s3.aws-secret-key=...
Google Cloud Storage:
exchange-manager.name=filesystem
exchange.base-directories=gs://trino-fte-exchange/spool
Azure Blob Storage — аналогично, с директорией abfs://... и параметрами доступа Azure.
HDFS — через реализацию exchange manager для Hadoop-кластеров.
Локальная файловая система:
exchange-manager.name=filesystem
exchange.base-directories=/mnt/trino-exchange
| Хранилище | exchange.base-directories | Продакшен-пригодность |
|---|---|---|
| S3 / S3-совместимое (MinIO) | s3://bucket/path | Да — типичный продакшен-выбор |
| Google Cloud Storage | gs://bucket/path | Да |
| Azure Blob Storage | abfs://... | Да |
| HDFS | путь HDFS | Да, для Hadoop-окружений |
| Локальная файловая система | /mnt/... | Только тесты, НЕ продакшен |
Локальная файловая система для exchange.base-directories годится только для разработки и экспериментов. В продакшене она ломает саму идею FTE: локальный диск умирает вместе со своим воркером, а смысл спулинга — пережить смерть воркера. Спулённый на локальный диск промежуток исчезнет ровно тогда, когда понадобится для переисполнения задачи. Продакшен-FTE требует хранилища, не зависящего от отдельных нод: S3, GCS, Azure Blob или HDFS.
Несколько директорий и распределение нагрузки
exchange.base-directories принимает список директорий через запятую. Зачем несколько? Чтобы распределить I/O-нагрузку спулинга.
Спулинг промежутка — это интенсивная нагрузка на хранилище: весь промежуточный обмен запроса проходит через exchange manager. Указав несколько директорий — на разных бакетах, разных дисках, — нагрузку раскладывают, и хранилище перестаёт быть единственным узким горлышком.
Жизненный цикл спулённых данных
Важно понимать: спулённый промежуток — временный. Он нужен только на время жизни запроса.
- Запрос с
retry-policy=TASKстартует. По ходу исполнения стадии-продьюсеры спулят свой вывод через exchange manager вexchange.base-directories. - Стадии-консьюмеры читают спулённый промежуток из хранилища как свой вход.
- Если воркер падает — затронутые задачи переисполняются на живых воркерах; их вход берётся из того же спулённого промежутка, он по-прежнему в надёжном хранилище.
- Запрос завершается (успехом или окончательной ошибкой) — спулённые данные этого запроса больше не нужны и удаляются.
Спул FTE — не архив и не долговременное хранилище. Это рабочая область на время жизни запроса. Поэтому при оценке нужного объёма исходи из пикового объёма промежутка одновременно идущих FTE-запросов, а не из накопления за дни.
Поскольку спулённый промежуток временный, выделенный под него бакет можно настроить с автоматическим удалением старых объектов (lifecycle policy на стороне S3/GCS) как страховку от мусора, оставшегося после аномально завершённых запросов. Trino и сам убирает за собой, но lifecycle policy — дешёвая дополнительная гарантия, что спул-бакет не разрастётся.
Цена spooling exchange и где это окупается
Spooling exchange надёжен, но не бесплатен. Понимать его цену важно, чтобы не включать FTE там, где он только вредит.
Откуда берётся цена. В потоковом обмене консьюмер начинает работать на первых же строках продьюсера — стадии частично перекрываются во времени. В spooling exchange продьюсер сначала полностью материализует вывод в хранилище, и только потом консьюмер начинает читать. Это добавляет три статьи расходов:
- Латентность барьера. Консьюмер ждёт полного завершения продьюсера — перекрытие стадий теряется.
- I/O записи и чтения. Промежуток пишется в хранилище и читается обратно — это сетевой и дисковый I/O, которого в потоковом обмене нет.
- CPU на сериализацию. Страницы сериализуются перед записью, шифруются, десериализуются при чтении.
Где это окупается. Для долгого batch-запроса добавка барьера и I/O теряется в часах работы, а взамен запрос получает отказоустойчивость — четырёхчасовой ETL не гибнет от сбоя одного воркера. Для короткого интерактивного запроса всё наоборот: запрос на полсекунды от спулинга промежутка может замедлиться в разы — здесь spooling exchange и FTE не нужны, хватает retry-policy=NONE. Это та же логика размена скорости на надёжность, что проходит через весь модуль: spooling exchange — инструмент batch-кластера, не интерактивного.
Как exchange manager связан с остальным FTE
Соберём картину. Exchange manager — это инфраструктурная опора, на которой стоит retry-policy=TASK:
retry-policy=NONE— exchange manager не нужен: обмен потоковый, отказоустойчивости нет.retry-policy=QUERY— exchange manager не нужен: при сбое весь запрос повторяется с нуля, сохранённый промежуток ни к чему.retry-policy=TASK— exchange manager обязателен: переисполнение отдельных задач возможно только потому, что их входной промежуток спулен и доступен после любого сбоя воркера.
В следующем уроке — task sizing и adaptive planning: как FTE, опираясь на спулённый и потому измеримый промежуток, подбирает размер задач и переоптимизирует план в рантайме. Сама возможность это делать вырастает именно из того, что exchange manager материализует промежуток: то, что записано в хранилище, можно измерить и переразбить.
Попробуй сам
Настрой exchange manager и запусти FTE на уровне задач.
- Подними
docker-composeс координатором, 2-3 воркерами Trino и MinIO (S3-совместимое хранилище). В MinIO создай бакет, напримерtrino-fte. - Положи
etc/exchange-manager.propertiesна все ноды кластера (координатор и воркеры) сexchange-manager.name=filesystem,exchange.base-directories=s3://trino-fte/spoolи параметрами доступа к MinIO. Вetc/config.propertiesзадайretry-policy=TASK. Перезапусти кластер. - Запусти тяжёлый batch-запрос — крупный join или агрегацию по
tpch.sf100. Пока он идёт, загляни в бакет MinIO: там появятся объекты спулённого промежутка. После завершения запроса они исчезнут. - Повтори запрос и во время исполнения убей один воркер (
docker kill). Запрос должен пережить сбой за счёт переисполнения задач — сравни с поведениемretry-policy=NONEиз прошлых уроков.
Цель — увидеть спулённый промежуток в хранилище вживую и убедиться, что именно он позволяет retry-policy=TASK пережить сбой воркера.