Worker: исполнение задач, обмен промежуточными данными
В прошлом уроке мы разобрали координатор — мозг кластера, который думает и дирижирует, но не двигает данные. Этот урок про вторую роль: worker — воркер. Если координатор думает, то воркеры работают. Они — те самые «много автономных узлов» из урока про MPP, которые превращают идею массового параллелизма в реальную обработку данных.
Понять воркеры важно потому, что именно здесь происходит вся тяжёлая работа запроса: чтение данных из источников, фильтрация, джойны, агрегации. Когда запрос быстрый — это воркеры быстро отработали. Когда запрос упёрся в память — это память воркера закончилась. Воркеры — место, где «до железа» становится буквальным.
Воркер — это рабочая лошадка кластера
Worker — это сервер Trino, который исполняет задачи и обрабатывает данные. Воркер забирает данные из коннекторов (то есть из подключённых источников), прогоняет их через операции запроса и обменивается промежуточными результатами с другими воркерами.
В кластере воркеров — ноль или более. На практике их обычно несколько, десятки, иногда сотни: чем больше воркеров, тем больше параллелизма и тем быстрее тяжёлые запросы (вспомните почти линейное масштабирование shared-nothing из первого урока модуля). Все воркеры равноправны и взаимозаменяемы — нет «главного» воркера, любой может взять любой кусок работы.
Узел становится воркером через тот же файл etc/config.properties, что и координатор, только с другим значением:
coordinator=false
http-server.http.port=8080
discovery.uri=http://trino-coordinator:8080
coordinator=false объявляет узел воркером. discovery.uri указывает на координатор — по этому адресу воркер при старте регистрируется в кластере (механизм discovery — следующий урок). После регистрации координатор знает про воркер и начинает раздавать ему работу.
Что делает воркер: три обязанности
Работу воркера удобно свести к трём обязанностям.
Обязанность 1. Чтение данных из источников. Когда запросу нужны собственно данные таблицы, их читают воркеры. Воркер обращается к коннектору — реализации интерфейса, через которую Trino работает с конкретным типом источника, — и забирает у него данные: строки из PostgreSQL, файлы Parquet с S3, сообщения из Kafka. Координатор сюда не вовлечён: данные текут из источника прямо в воркеры. Это и есть та параллельная часть работы, которую невозможно делать на одном узле.
Обязанность 2. Исполнение задач. Координатор раздаёт воркерам задачи (tasks) — конкретные единицы работы из распределённого плана. Воркер исполняет свои задачи: применяет к данным операции — фильтры, проекции, джойны, агрегации, сортировки. Внутри одна задача разбивается ещё мельче и исполняется параллельно несколькими потоками, задействуя все ядра воркера. Точную иерархию исполнения — task, split, driver, operator — мы детально разберём в модуле про распределённое исполнение. Здесь главное: воркер — это узел, на котором операции запроса фактически выполняются над строками.
Обязанность 3. Обмен промежуточными данными. Воркер редко может довести запрос до конца в одиночку. Возьмём джойн двух таблиц: чтобы соединить строки по ключу, строки с одинаковым ключом должны оказаться на одном воркере. А прочитаны они могли быть на разных. Значит, воркеры должны перераспределить данные между собой — переслать строки туда, где они нужны. Этот обмен промежуточными результатами по сети называется exchange, и он — неотъемлемая часть работы воркера. Exchange — отдельная большая тема (модули 2 и 5); пока зафиксируем: воркеры не только считают, но и активно гоняют промежуточные данные друг другу.
Как воркеры взаимодействуют с координатором
Воркеры не висят в вакууме — они работают под управлением координатора, и важно понимать характер этого взаимодействия.
Координатор создаёт на воркерах задачи и отслеживает их состояние. Воркеры, в свою очередь, регулярно сообщают координатору о своём прогрессе: сколько данных обработано, в каком состоянии задачи, не возникло ли ошибки. Координатор на основе этого дирижирует исполнением: видит общую картину запроса, реагирует на завершение и сбои, в конце собирает у воркеров финальный результат, чтобы отдать клиенту.
При этом обмен собственно данными между воркерами идёт напрямую, минуя координатор. Когда воркеру нужно переслать промежуточные строки другому воркеру (exchange), они летят между воркерами по сети, координатор в этот поток данных не вовлечён. Это принципиально для производительности: если бы все данные проходили через координатор, он мгновенно стал бы узким местом. Координатор — это канал управления (control plane), а воркеры между собой — канал данных (data plane). Управляющие сигналы идут через координатор, тяжёлые данные — напрямую между воркерами.
Стоит оценить разницу в объёмах двух этих потоков. По каналу управления идут компактные сообщения: «создай задачу», «вот твой кусок плана», «как прогресс», «задача завершена». Это килобайты, и их немного. По каналу данных при тяжёлом запросе могут летать гигабайты и десятки гигабайт промежуточных строк, перераспределяемых между воркерами. Разница на порядки. Именно поэтому архитектурно важно, чтобы тяжёлый поток шёл по схеме «многие ко многим» — каждый воркер с каждым напрямую, — а не сходился в единственную точку. Один координатор физически не пропустил бы через себя такой объём, и масштабируемость кластера упёрлась бы в его сетевой канал. Разведение control plane и data plane — это то, что позволяет добавлять воркеры и наращивать совокупную пропускную способность обмена линейно.
Сам процесс сервера Trino на координаторе и на воркере — это одна и та же программа. Узел не «другой по сборке»; роль определяется только строкой coordinator=true или coordinator=false в конфигурации. Один и тот же дистрибутив Trino, одна и та же команда запуска — разное лишь значение одной настройки.
Масштабирование через воркеров
Воркеры — это рычаг масштабирования Trino. Поскольку они равноправны, взаимозаменяемы и shared-nothing (у каждого свой CPU, память, диск), управлять мощностью кластера просто: добавить воркеров — больше параллелизма и быстрее тяжёлые запросы; убрать воркеров — меньше потребляемых ресурсов. Координатор при этом не трогают, он один и остаётся.
Это даёт кластеру эластичность. В облаке или в Kubernetes число воркеров можно менять под нагрузку автоматически: днём, когда аналитики активны, воркеров больше; ночью — меньше. Поскольку воркер не хранит ничего постоянного (его состояние — только промежуточные данные текущих запросов в оперативной памяти), добавить или вывести воркер — рутинная и быстрая операция.
Стоит понимать, что добавление воркеров ускоряет именно ту часть работы, которой воркеры заняты, — обработку данных. Тяжёлый запрос, который сканирует и агрегирует терабайты, на удвоенном числе воркеров пойдёт примерно вдвое быстрее, потому что данные раскладываются по большему числу узлов и ядер. А вот часть, за которую отвечает координатор — парсинг и планирование, — добавление воркеров не ускоряет: это работа одного узла. Поэтому воркеры — рычаг для пропускной способности по данным и для конкурентности, но не лекарство от медленного планирования. Это разделение мы ещё раз проговорим в уроке про координатор и в уроке про stateless-дизайн.
Важно и то, что воркеры в кластере обычно делают одинаковыми по ресурсам — равный объём памяти, равное число ядер. Причина в модели shared-nothing и в том, что работа раскладывается по воркерам равномерно: если один воркер слабее остальных, он становится отстающим — кластер ждёт его, и общий запрос идёт со скоростью самого медленного участника. Однородный пул воркеров — стандартная практика эксплуатации Trino.
| Свойство | Координатор | Воркер |
|---|---|---|
| Количество в кластере | Ровно один | Ноль или более (обычно много) |
| Читает данные из источников | Нет | Да |
| Исполняет операции над строками | Нет (по умолчанию) | Да |
| Обменивается данными с другими узлами | Нет (канал управления) | Да (exchange, канал данных) |
| Рычаг масштабирования | Не масштабируется числом | Добавляют и убирают под нагрузку |
Воркер хранит промежуточные данные текущих запросов только в оперативной памяти. Если воркер падает посреди запроса, эти промежуточные данные теряются безвозвратно. По умолчанию это означает падение всего запроса — прямое следствие stateless-дизайна, которому посвящён последний урок модуля. Отказоустойчивость в Trino — отдельный опциональный режим.
Попробуй сам
В одиночном Docker-контейнере воркер совмещён с координатором, и увидеть «настоящего» воркера сложно. Попробуйте поднять кластер из нескольких узлов: один контейнер-координатор (coordinator=true) и два контейнера-воркера (coordinator=false), указав всем трём один и тот же discovery.uri на координатор. Это можно сделать через docker-compose.yml.
Когда кластер поднимется, выполните через CLI запрос SELECT node_id, coordinator FROM system.runtime.nodes; — вы должны увидеть три строки: одну с coordinator = true и две с false. Затем запустите тяжёлый запрос к tpch.sf1 и в Web UI координатора посмотрите на вкладку со стадиями: обратите внимание, что работа разложена по нескольким узлам. Запишите своими словами, какие из трёх обязанностей воркера (чтение, исполнение, обмен) вы можете опознать в том, что показывает Web UI.