Concurrency: task.concurrency и настройка параллелизма
Мы прошли весь путь сплита: SplitSource его породил, node scheduler выбрал воркер, очередь pending splits подержала его до момента, когда driver взял его в работу. Осталось понять последнее звено — а сколько вообще driver’ов работает на одном task’е? От этого числа зависит, насколько эффективно воркер использует свои процессорные ядра, и оно настраивается.
Это завершающий урок модуля. Он про параллелизм внутри воркера: про разницу между параллелизмом на уровне сплитов и параллелизмом внутри стадии, про свойство task.concurrency, про то, как Trino превращает один task в набор driver’ов, и про то, когда этот параметр стоит трогать, а когда — нет.
Два уровня параллелизма
Параллелизм в Trino живёт сразу на нескольких уровнях, и их легко перепутать. Разведём два, которые касаются исполнения внутри воркера.
Параллелизм по сплитам. Source-стадия читает данные. Сплитов много, и несколько driver’ов одного task’а одновременно читают разные сплиты. Тут потолок параллелизма очевиден — это число сплитов: нельзя занять чтением больше driver’ов, чем есть кусков данных. Если у таблицы один большой нечленимый файл и, значит, один сплит, чтение пойдёт в один поток, сколько ядер ни дай.
Параллелизм внутри стадии. А вот стадии, которые не читают данные из коннектора, а обрабатывают промежуточные результаты — агрегация, hash join, оконные функции, — параллелятся иначе. Здесь нет сплитов от коннектора. Здесь данные приходят по exchange от предыдущей стадии, и стадия делит эту работу на фиксированное число параллельных driver’ов на каждом воркере. Вот это число и задаёт task.concurrency.
Почему так. Source-стадия привязана к внешним данным, и их «зернистость» (сколько сплитов) задаёт коннектор и раскладка файлов — Trino её не контролирует. Стадия обработки оперирует уже своими, внутренними данными, и тут Trino волен сам решить, на сколько параллельных потоков их раздробить. task.concurrency — это и есть его решение.
Что такое task.concurrency
task.concurrency — число параллельных driver’ов, на которое разбивается работа task’а внутри одного воркера для стадий, не привязанных к сплитам. Иначе это называют local concurrency — локальный (внутриворкерный) параллелизм.
Значение по умолчанию выводится из числа аппаратных ядер машины: Trino при старте определяет количество ядер и выставляет task.concurrency в значение того же порядка, обычно ближайшую степень двойки. На 16-ядерной машине это типично 16. Логика прямая: чтобы загрузить 16 ядер работой, нужно примерно 16 параллельных потоков обработки.
Задаётся свойство в config.properties воркера и переопределяется на уровне сессии:
# etc/config.properties на воркере
task.concurrency=16
-- переопределение для конкретного запроса
SET SESSION task_concurrency = 8;
Важная деталь: task.concurrency должен быть степенью двойки. Это не каприз. Параллельные driver’ы локальной стадии разбирают строки по хэшу — каждый driver обрабатывает свою долю по хэшу join- или group-ключа. Деление по модулю степени двойки сводится к битовой маске вместо честного mod, что и быстрее, и даёт ровное распределение. Поставив значение не степень двойки, вы получите либо ошибку, либо округление.
Как task раскрывается в driver’ы
Соберём механику в одной картине. Стадия превращается в task на каждом задействованном воркере. Внутри воркера task для не-source стадии раскрывается в task.concurrency driver’ов. Каждый driver — это физическая цепочка операторов в памяти (например: локальный exchange -> частичная агрегация -> финальная агрегация), наименьшая единица параллелизма. Driver’ы одного task’а исполняются параллельно, каждый на своей доле строк.
Суммарный параллелизм запроса — это произведение: число воркеров, умноженное на число driver’ов на task. Кластер из 4 воркеров с task.concurrency=16 обрабатывает hash-стадию в 64 параллельных потока. Поэтому task.concurrency — это не «настройка одной машины», а множитель, влияющий на загрузку всего кластера.
Cooperative scheduling: почему driver’ов может быть больше, чем ядер
Возникает вопрос: если ядер 16, почему task.concurrency иногда ставят выше, и почему driver’ов в принципе может одновременно существовать больше, чем ядер? Ведь физически исполняться разом может ровно столько потоков, сколько ядер.
Ответ — в модели исполнения воркера. Воркер не отдаёт driver’у ядро «насовсем». Driver’ы кооперативно делят пул процессорных потоков. Driver работает квантами: получает поток, обрабатывает порцию (несколько Page), затем добровольно уступает (yield) — освобождает поток, возвращается в очередь готовых, и поток подхватывает следующий driver. Это cooperative multitasking: квант ограничен по времени и по объёму, ни один driver не монополизирует ядро.
Зачем уступать. Driver может на время заблокироваться не на CPU: ждёт данные из exchange, ждёт страницу из коннектора, ждёт память. Пока он ждёт, его ядро не должно простаивать — на нём работает другой driver, которому есть что считать. Поэтому полезно иметь driver’ов несколько больше, чем ядер: пока часть заблокирована на ожидании, остальные держат ядра занятыми. Воркер с числом driver’ов ровно по числу ядер при любой блокировке недозагружает процессор.
Cooperative scheduling объясняет, почему Trino — pipeline-движок без простоев. Driver, уткнувшийся в ожидание ввода-вывода, не держит ядро: он уступает, ядро берёт готовый к работе driver. CPU занят полезной работой, пока в принципе есть что считать. Это та же идея, что у корутин: много логических задач поверх ограниченного числа реальных потоков, переключение в точках уступки.
Когда трогать task.concurrency, а когда нет
Значение по умолчанию выбрано по числу ядер и для большинства нагрузок оптимально. Менять его — точечная мера, и вот ориентиры.
Повышать имеет смысл, когда машины большие (десятки ядер), а нагрузка — тяжёлые агрегации и join’ы, которые периодически блокируются на exchange. Чуть больше driver’ов лучше скрывают эти блокировки и плотнее загружают ядра. Сначала меняйте на уровне сессии для конкретного класса запросов, измеряйте EXPLAIN ANALYZE, и только потом, если эффект устойчив, фиксируйте в config.properties.
Понижать имеет смысл при высокой конкурентности — когда кластер одновременно крутит много запросов. Каждый driver — это и память (буферы операторов, фрагменты хэш-таблиц), и переключения контекста. Высокий task.concurrency, помноженный на десятки одновременных запросов, даёт лавину driver’ов: память под буферы растёт, накладные расходы на переключения съедают выигрыш. Понижение task.concurrency снижает память на запрос и сглаживает кластер под многопользовательской нагрузкой.
Не трогать в большинстве случаев. Распространённая ошибка — крутить task.concurrency как «ускоритель», не измеряя. Если запрос упирается не в CPU, а в чтение из S3 (source-стадия, ограниченная числом сплитов) или в сетевой exchange, рост task.concurrency не даст ничего: проблема не в нехватке потоков обработки. Сначала найдите по EXPLAIN ANALYZE реальный bottleneck-фрагмент, потом решайте.
| Ситуация | Действие | Почему |
|---|---|---|
| Большие машины, тяжёлые join/агрегации | Аккуратно повысить | Больше driver’ов скрывают блокировки на exchange |
| Много одновременных запросов | Понизить | Меньше памяти на запрос, меньше переключений контекста |
| Запрос упирается в I/O из S3 | Не трогать | Bottleneck не в потоках обработки; смотри число сплитов |
| Сомневаешься | Не трогать | Дефолт по числу ядер обычно оптимален |
Как увидеть параллелизм в плане
EXPLAIN ANALYZE показывает у каждого фрагмента, в сколько потоков он исполнялся. У фрагмента видно число task’ов (по числу воркеров) и driver’ов:
EXPLAIN ANALYZE
SELECT custkey, count(*)
FROM tpch.sf10.orders
GROUP BY custkey;
Fragment 1 [HASH]
CPU: 19.7s, Scheduled: 21.3s, Input: 15000000 rows
Amount of input data per task is on average 5000000 rows
Aggregate[keys = [custkey]]
Фрагмент с типом HASH — это и есть стадия с локальным параллелизмом по task.concurrency. Если CPU time у такого фрагмента заметно ниже Scheduled time, фрагмент много простаивал — возможно, driver’ов мало и они часто блокируются, или, наоборот, проблема не здесь. Решение всегда одно: смотреть на конкретные числа фрагмента, а не подкручивать параметр вслепую.
Попробуй сам
На песочнице курса (Trino 481):
-
Выполните
EXPLAIN ANALYZEдля агрегации:SELECT custkey, count(*) FROM tpch.sf10.orders GROUP BY custkey;. ЗапишитеCPUиScheduledtime уHASH-фрагмента. -
Теперь
SET SESSION task_concurrency = 2;и повторите тот жеEXPLAIN ANALYZE. СравнитеCPUиScheduledtime с предыдущим запуском. Объясните направление изменения. Затем вернитеSET SESSION task_concurrency = 16;(или дефолт песочницы). -
Рассуждение в двух абзацах. Первый: почему source-стадия чтения одного большого нечленимого Parquet-файла не ускорится от роста
task.concurrency— какой потолок параллелизма тут главный. Второй: почему на кластере, который крутит 50 запросов одновременно, разумно скорее понизитьtask.concurrency, чем повысить.