Learning Platform
Глоссарий Troubleshooting
Урок 07.05 · 22 мин
Средний
concurrencytask-concurrencydriversparallelism

Concurrency: task.concurrency и настройка параллелизма

Мы прошли весь путь сплита: SplitSource его породил, node scheduler выбрал воркер, очередь pending splits подержала его до момента, когда driver взял его в работу. Осталось понять последнее звено — а сколько вообще driver’ов работает на одном task’е? От этого числа зависит, насколько эффективно воркер использует свои процессорные ядра, и оно настраивается.

Это завершающий урок модуля. Он про параллелизм внутри воркера: про разницу между параллелизмом на уровне сплитов и параллелизмом внутри стадии, про свойство task.concurrency, про то, как Trino превращает один task в набор driver’ов, и про то, когда этот параметр стоит трогать, а когда — нет.


Два уровня параллелизма

Параллелизм в Trino живёт сразу на нескольких уровнях, и их легко перепутать. Разведём два, которые касаются исполнения внутри воркера.

Параллелизм по сплитам. Source-стадия читает данные. Сплитов много, и несколько driver’ов одного task’а одновременно читают разные сплиты. Тут потолок параллелизма очевиден — это число сплитов: нельзя занять чтением больше driver’ов, чем есть кусков данных. Если у таблицы один большой нечленимый файл и, значит, один сплит, чтение пойдёт в один поток, сколько ядер ни дай.

Параллелизм внутри стадии. А вот стадии, которые не читают данные из коннектора, а обрабатывают промежуточные результаты — агрегация, hash join, оконные функции, — параллелятся иначе. Здесь нет сплитов от коннектора. Здесь данные приходят по exchange от предыдущей стадии, и стадия делит эту работу на фиксированное число параллельных driver’ов на каждом воркере. Вот это число и задаёт task.concurrency.

Два уровня параллелизма внутри воркера
Source-стадия: чтениеDriver'ы читают сплиты от коннектора. Потолок параллелизма — число сплитов, которые сгенерировал SplitSource
данные идут по exchange
Стадия обработки: агрегация, joinНет сплитов от коннектора. Работа делится на фиксированное число driver'ов на воркер — это и есть task.concurrency

Почему так. Source-стадия привязана к внешним данным, и их «зернистость» (сколько сплитов) задаёт коннектор и раскладка файлов — Trino её не контролирует. Стадия обработки оперирует уже своими, внутренними данными, и тут Trino волен сам решить, на сколько параллельных потоков их раздробить. task.concurrency — это и есть его решение.


Что такое task.concurrency

task.concurrency — число параллельных driver’ов, на которое разбивается работа task’а внутри одного воркера для стадий, не привязанных к сплитам. Иначе это называют local concurrency — локальный (внутриворкерный) параллелизм.

Значение по умолчанию выводится из числа аппаратных ядер машины: Trino при старте определяет количество ядер и выставляет task.concurrency в значение того же порядка, обычно ближайшую степень двойки. На 16-ядерной машине это типично 16. Логика прямая: чтобы загрузить 16 ядер работой, нужно примерно 16 параллельных потоков обработки.

Задаётся свойство в config.properties воркера и переопределяется на уровне сессии:

# etc/config.properties на воркере
task.concurrency=16
-- переопределение для конкретного запроса
SET SESSION task_concurrency = 8;

Важная деталь: task.concurrency должен быть степенью двойки. Это не каприз. Параллельные driver’ы локальной стадии разбирают строки по хэшу — каждый driver обрабатывает свою долю по хэшу join- или group-ключа. Деление по модулю степени двойки сводится к битовой маске вместо честного mod, что и быстрее, и даёт ровное распределение. Поставив значение не степень двойки, вы получите либо ошибку, либо округление.


Как task раскрывается в driver’ы

Соберём механику в одной картине. Стадия превращается в task на каждом задействованном воркере. Внутри воркера task для не-source стадии раскрывается в task.concurrency driver’ов. Каждый driver — это физическая цепочка операторов в памяти (например: локальный exchange -> частичная агрегация -> финальная агрегация), наименьшая единица параллелизма. Driver’ы одного task’а исполняются параллельно, каждый на своей доле строк.

Стадия -> task -> driver'ы
Стадия (HASH)Концептуальная фаза распределённого плана, например финальная агрегация. Сама на воркерах не исполняется
по task на воркер
Task на воркереИсполняемая реализация стадии на конкретном воркере. Раскрывается в task.concurrency параллельных driver'ов
task.concurrency штук
DriverФизическая цепочка операторов в памяти, наименьшая единица параллелизма. Обрабатывает свою долю строк по хэшу

Суммарный параллелизм запроса — это произведение: число воркеров, умноженное на число driver’ов на task. Кластер из 4 воркеров с task.concurrency=16 обрабатывает hash-стадию в 64 параллельных потока. Поэтому task.concurrency — это не «настройка одной машины», а множитель, влияющий на загрузку всего кластера.


Cooperative scheduling: почему driver’ов может быть больше, чем ядер

Возникает вопрос: если ядер 16, почему task.concurrency иногда ставят выше, и почему driver’ов в принципе может одновременно существовать больше, чем ядер? Ведь физически исполняться разом может ровно столько потоков, сколько ядер.

DuckDB: pipeline и morsel-based parallelism — аналог cooperative scheduling

Ответ — в модели исполнения воркера. Воркер не отдаёт driver’у ядро «насовсем». Driver’ы кооперативно делят пул процессорных потоков. Driver работает квантами: получает поток, обрабатывает порцию (несколько Page), затем добровольно уступает (yield) — освобождает поток, возвращается в очередь готовых, и поток подхватывает следующий driver. Это cooperative multitasking: квант ограничен по времени и по объёму, ни один driver не монополизирует ядро.

Зачем уступать. Driver может на время заблокироваться не на CPU: ждёт данные из exchange, ждёт страницу из коннектора, ждёт память. Пока он ждёт, его ядро не должно простаивать — на нём работает другой driver, которому есть что считать. Поэтому полезно иметь driver’ов несколько больше, чем ядер: пока часть заблокирована на ожидании, остальные держат ядра занятыми. Воркер с числом driver’ов ровно по числу ядер при любой блокировке недозагружает процессор.

NOTE

Cooperative scheduling объясняет, почему Trino — pipeline-движок без простоев. Driver, уткнувшийся в ожидание ввода-вывода, не держит ядро: он уступает, ядро берёт готовый к работе driver. CPU занят полезной работой, пока в принципе есть что считать. Это та же идея, что у корутин: много логических задач поверх ограниченного числа реальных потоков, переключение в точках уступки.


Когда трогать task.concurrency, а когда нет

Значение по умолчанию выбрано по числу ядер и для большинства нагрузок оптимально. Менять его — точечная мера, и вот ориентиры.

Повышать имеет смысл, когда машины большие (десятки ядер), а нагрузка — тяжёлые агрегации и join’ы, которые периодически блокируются на exchange. Чуть больше driver’ов лучше скрывают эти блокировки и плотнее загружают ядра. Сначала меняйте на уровне сессии для конкретного класса запросов, измеряйте EXPLAIN ANALYZE, и только потом, если эффект устойчив, фиксируйте в config.properties.

Понижать имеет смысл при высокой конкурентности — когда кластер одновременно крутит много запросов. Каждый driver — это и память (буферы операторов, фрагменты хэш-таблиц), и переключения контекста. Высокий task.concurrency, помноженный на десятки одновременных запросов, даёт лавину driver’ов: память под буферы растёт, накладные расходы на переключения съедают выигрыш. Понижение task.concurrency снижает память на запрос и сглаживает кластер под многопользовательской нагрузкой.

Не трогать в большинстве случаев. Распространённая ошибка — крутить task.concurrency как «ускоритель», не измеряя. Если запрос упирается не в CPU, а в чтение из S3 (source-стадия, ограниченная числом сплитов) или в сетевой exchange, рост task.concurrency не даст ничего: проблема не в нехватке потоков обработки. Сначала найдите по EXPLAIN ANALYZE реальный bottleneck-фрагмент, потом решайте.

СитуацияДействиеПочему
Большие машины, тяжёлые join/агрегацииАккуратно повыситьБольше driver’ов скрывают блокировки на exchange
Много одновременных запросовПонизитьМеньше памяти на запрос, меньше переключений контекста
Запрос упирается в I/O из S3Не трогатьBottleneck не в потоках обработки; смотри число сплитов
СомневаешьсяНе трогатьДефолт по числу ядер обычно оптимален

Как увидеть параллелизм в плане

EXPLAIN ANALYZE показывает у каждого фрагмента, в сколько потоков он исполнялся. У фрагмента видно число task’ов (по числу воркеров) и driver’ов:

EXPLAIN ANALYZE
SELECT custkey, count(*)
FROM tpch.sf10.orders
GROUP BY custkey;
Fragment 1 [HASH]
    CPU: 19.7s, Scheduled: 21.3s, Input: 15000000 rows
    Amount of input data per task is on average 5000000 rows
    Aggregate[keys = [custkey]]

Фрагмент с типом HASH — это и есть стадия с локальным параллелизмом по task.concurrency. Если CPU time у такого фрагмента заметно ниже Scheduled time, фрагмент много простаивал — возможно, driver’ов мало и они часто блокируются, или, наоборот, проблема не здесь. Решение всегда одно: смотреть на конкретные числа фрагмента, а не подкручивать параметр вслепую.


Попробуй сам

На песочнице курса (Trino 481):

  1. Выполните EXPLAIN ANALYZE для агрегации: SELECT custkey, count(*) FROM tpch.sf10.orders GROUP BY custkey;. Запишите CPU и Scheduled time у HASH-фрагмента.

  2. Теперь SET SESSION task_concurrency = 2; и повторите тот же EXPLAIN ANALYZE. Сравните CPU и Scheduled time с предыдущим запуском. Объясните направление изменения. Затем верните SET SESSION task_concurrency = 16; (или дефолт песочницы).

  3. Рассуждение в двух абзацах. Первый: почему source-стадия чтения одного большого нечленимого Parquet-файла не ускорится от роста task.concurrency — какой потолок параллелизма тут главный. Второй: почему на кластере, который крутит 50 запросов одновременно, разумно скорее понизить task.concurrency, чем повысить.


Проверка знанийKnowledge check
Что задаёт свойство task.concurrency, почему его значение должно быть степенью двойки, и почему driver'ов на воркере полезно иметь несколько больше, чем процессорных ядер?
ОтветAnswer
task.concurrency задаёт число параллельных driver'ов, на которое разбивается работа task'а внутри одного воркера для стадий, не привязанных к сплитам коннектора — то есть для стадий обработки промежуточных данных: агрегаций, hash join'ов, оконных функций. Это локальный, внутриворкерный параллелизм. Source-стадии чтения параллелятся иначе — по числу сплитов, и task.concurrency на них не влияет. Значение по умолчанию выводится из числа аппаратных ядер машины. Оно должно быть степенью двойки, потому что параллельные driver'ы локальной стадии разбирают строки по хэшу join- или group-ключа: при делении по модулю степени двойки модуль сводится к быстрой битовой маске и даёт ровное распределение строк по driver'ам. Driver'ов полезно иметь несколько больше, чем ядер, из-за модели cooperative scheduling: driver работает квантами и добровольно уступает поток, в том числе когда блокируется не на CPU — ждёт данные из exchange, страницу из коннектора или память. Пока часть driver'ов заблокирована на ожидании, остальные должны держать ядра занятыми полезной работой. Если driver'ов ровно по числу ядер, то при любой блокировке процессор недозагружается; небольшой избыток driver'ов скрывает блокировки и держит ядра занятыми, пока в принципе есть что считать.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Что задаёт свойство task.concurrency?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5