Learning Platform
Глоссарий Troubleshooting
Урок 06.04 · 22 мин
Средний
distributed-executiontasksdriverspipeline

Tasks и drivers: pipeline-исполнение

Из урока про таксономию: task — runtime-исполнение стадии на воркере, driver — наименьшая единица параллелизма, физическая цепочка операторов. Теперь спустимся внутрь воркера и разберём, как именно task работает: как она распадается на драйверы, что значит «pipeline-исполнение» и почему параллелизм меряется драйверами, а не задачами.

Это самый «железный» уровень после Page и Block — то, что реально крутится на ядрах процессора воркера.


Что происходит внутри задачи

Task пришла на воркер. Coordinator прислал ей описание: какой фрагмент плана исполнять и какие splits обрабатывать. Внутри воркера task должна превратить это в реальную работу на нескольких ядрах.

Task не исполняет фрагмент в одиночку. Она разворачивает его в pipeline — конвейер операторов — и запускает несколько драйверов, каждый из которых прогоняет данные через копию этого конвейера. Структура такая:

Внутри задачи: pipeline и драйверы
Task на воркереИсполнение стадии на воркере. Получила фрагмент плана и splits.
разворачивает фрагмент в
PipelineКонвейер операторов — шаблон цепочки: scan, filter, project и так далее.
запускает по шаблону
Driver 1Один экземпляр конвейера. Прогоняет свою порцию данных.
Driver 2Другой экземпляр того же конвейера. Параллельно, своя порция.
Driver 3Ещё один экземпляр. Драйверов столько, сколько нужно для параллелизма.

Pipeline — это шаблон: описание, из каких операторов и в каком порядке состоит цепочка. Driver — экземпляр этого шаблона: реальная цепочка операторов в памяти, которая прогоняет конкретные данные. Один pipeline — много драйверов. Драйверы одной задачи устроены одинаково (один шаблон), но работают независимо, каждый над своим куском данных.


Driver — наименьшая единица параллелизма

Повторим и углубим определение из урока про таксономию, потому что оно центральное.

Driver — это последовательность экземпляров операторов в памяти, с одним входом и одним выходом. И driver — наименьшая единица параллелизма в Trino.

Почему именно driver, а не task. Task — это «вся работа стадии на этом воркере». Но «вся работа на воркере» — не атомарна: воркер многоядерный, и одна task должна загрузить несколько ядер. Делает она это, запуская несколько драйверов. Каждый driver — отдельная независимая единица работы, которую планировщик воркера может поставить на отдельное ядро. Атомом параллелизма является driver: ниже него уже неделимо (операторы внутри driver работают строго последовательно — данные идут по цепочке).

Параллелизм, таким образом, двухуровневый. Уровень stage -> task — это параллелизм по машинам: одна стадия даёт много задач на разных воркерах. Уровень task -> driver — параллелизм по ядрам: одна задача даёт много драйверов на ядрах воркера. А driver — атом: ниже него уже неделимо.

Отсюда формула реального параллелизма запроса: сумма по всем стадиям от (число task в стадии, умноженное на число drivers в каждой task). Если у стадии 10 task и в каждой по 8 drivers — стадия исполняется в 80 параллельных потоков. Считать только задачи (10) — занизить параллелизм в 8 раз.

Число драйверов на задачу регулируется. За него отвечает настройка task.concurrency — она задаёт степень параллелизма внутри задачи (типично — близко к числу ядер воркера). Тонкая настройка параллелизма — тема отдельного модуля про планировщик; здесь важно понять принцип: драйверов на задачу несколько, и это и есть параллелизм по ядрам.


Pipeline-исполнение: данные текут, не материализуются

Теперь — как driver обрабатывает данные. Ключевое слово — pipeline (конвейер, потоковая обработка).

Driver — цепочка операторов. Данные проходят её насквозь: первый оператор берёт порцию данных, обрабатывает, отдаёт второму; второй обрабатывает, отдаёт третьему; и так до конца цепочки. Порция данных — это Page (колоночный батч, ему посвящён отдельный урок). Driver работает Page за Page: взял Page, прогнал через всю цепочку операторов, взял следующий.

Принципиально важно, чего driver не делает. Он не материализует промежуточные результаты целиком. Оператор filter не ждёт, пока scan прочитает всю таблицу, чтобы потом начать фильтровать. Как только scan выдал первый Page, filter уже его обрабатывает, а scan параллельно готовит следующий. Данные текут по конвейеру, а не накапливаются между операторами.

Pipeline: Page течёт через цепочку операторов
ScanЧитает данные, выдаёт Page за Page. Не ждёт конца таблицы.
Page
FilterПолучает Page от scan сразу, фильтрует, передаёт дальше. Не ждёт всех данных.
Page
ProjectПолучает Page от filter, вычисляет столбцы, отдаёт на выход driver.

Преимущество потоковой обработки — в эффективности по памяти и по латентности. По памяти: в каждый момент в работе лишь несколько Page-ов, а не вся таблица целиком — driver не держит в памяти весь промежуточный результат. По латентности: первые строки результата могут пойти к клиенту, как только первые Page прошли конвейер, — не нужно ждать обработки всех данных.

NOTE

Pipeline-исполнение объясняет распространённое заблуждение о скорости Trino. Trino быстр НЕ потому, что грузит данные в память и кэширует. Он быстр потому, что данные стримятся через конвейеры операторов: между операторами почти ничего не материализуется, работа идёт батчами, несколько драйверов жмут параллельно на ядрах. Скорость — от MPP-распределённости и pipelined-исполнения, а не от хранения данных в памяти.

Spark: модель executor и pipeline-исполнение DuckDB: pipeline-исполнение и морзелизация

Не все операторы одинаково «текучи». Есть блокирующие операторы — которым по природе нужны все входные данные, прежде чем выдать первую строку результата. Например, сортировка (ORDER BY) не может выдать первую строку, не увидев всех строк, — иначе непонятно, какая первая. Финальная агрегация без GROUP BY тоже: count(*) готов, только когда посчитаны все строки. Такие операторы — точки, где конвейер вынужденно «копит» данные. Но даже вокруг них остальная часть плана работает потоково.


Кооперативная многозадачность воркера

Последняя деталь «до железа». Драйверов на воркере может быть больше, чем ядер: десятки задач, у каждой несколько драйверов. Как воркер делит ядра между ними честно.

Trino использует кооперативную многозадачность. Driver работает не до полного завершения, а квантами: он обрабатывает данные ограниченное время (квант порядка долей секунды), после чего сам уступает ядро — возвращает управление планировщику воркера. Планировщик ставит на ядро следующий готовый driver. Отработавший driver встанет в очередь и продолжит, когда снова получит квант.

Это и есть «cooperative»: driver добровольно уступает ядро, не дожидаясь, пока его вытеснят. Зачем так. Если бы длинный driver занимал ядро, пока не доделает весь свой split, короткие быстрые запросы застревали бы за ним в очереди. Квантование выравнивает: ни один driver не монополизирует ядро, и сотни конкурентных запросов делят воркер плавно, без того чтобы тяжёлый запрос заморозил лёгкие.

Кооперативное квантование драйверов на ядре
Driver работает квантDriver обрабатывает данные ограниченное время — порядка долей секунды.
сам уступает ядро
Планировщик берёт следующийУправление вернулось планировщику воркера — он ставит на ядро другой готовый driver.
прежний — в очередь
Driver ждёт следующего квантаОтработавший driver встаёт в очередь и продолжит, когда снова получит ядро.

Картина уровня task-driver целиком

Соберём. Task приходит на воркер с фрагментом плана и splits. Task разворачивает фрагмент в pipeline — шаблон цепочки операторов — и запускает несколько drivers, экземпляров этого конвейера; число драйверов задаёт параллелизм внутри воркера (task.concurrency). Каждый driver потоково прогоняет Page за Page через свою цепочку операторов, не материализуя промежуточные результаты. Драйверы делят ядра воркера через кооперативное квантование — каждый работает квант и уступает ядро.

Это и есть «рабочая лошадка» Trino. Stage — концепция, task — её воплощение на воркере, а driver — то, что в реальности молотит данные на ядре процессора.


Попробуй сам

Уровень task-driver виден в Web UI и EXPLAIN ANALYZE:

  1. Выполните EXPLAIN ANALYZE SELECT orderstatus, count(*) FROM tpch.sf10.orders GROUP BY orderstatus. Откройте этот запрос в Web UI, страница деталей. Для одной стадии найдите число task и счётчики драйверов (часто в pipeline-статистике).
  2. Прикиньте реальный параллелизм стадии: число task умножить на число drivers в задаче. Сравните с числом ядер на воркерах кластера.
  3. Найдите в плане оператор сортировки или финальной агрегации. Подумайте, почему он блокирующий — почему он не может работать чисто потоково.
  4. Сформулируйте письменно: что такое pipeline-исполнение, почему driver — наименьшая единица параллелизма, и зачем драйверы уступают ядро квантами вместо работы до конца.

Проверка знанийKnowledge check
Как задача распадается на драйверы, почему driver — наименьшая единица параллелизма, что такое pipeline-исполнение и зачем драйверы делят ядра кооперативно?
ОтветAnswer
Task приходит на воркер с фрагментом плана и splits. Она не исполняет фрагмент в одиночку: она разворачивает его в pipeline — шаблон цепочки операторов — и запускает несколько драйверов, каждый из которых является экземпляром этого конвейера. Один pipeline — много драйверов; они устроены одинаково, но работают независимо над своими порциями данных, число драйверов задаёт настройка task.concurrency. Driver — наименьшая единица параллелизма, потому что task это вся работа стадии на воркере, но воркер многоядерный, и загрузить несколько ядер task может только запустив несколько драйверов; каждый driver планировщик воркера ставит на отдельное ядро, а внутри driver операторы строго последовательны — ниже driver уже неделимо. Реальный параллелизм запроса — это число task, умноженное на число drivers в каждой. Pipeline-исполнение означает, что данные текут через цепочку операторов Page за Page без материализации промежуточных результатов: оператор filter не ждёт, пока scan прочитает всю таблицу, — он обрабатывает первый же выданный Page, пока scan готовит следующий. Это экономит память (в работе лишь несколько Page, а не вся таблица) и снижает латентность (первые строки идут к клиенту рано). Драйверы делят ядра кооперативно: driver работает квант времени порядка долей секунды и сам уступает ядро планировщику воркера, после чего встаёт в очередь. Так ни один driver не монополизирует ядро, и сотни конкурентных запросов делят воркер плавно — тяжёлый запрос не замораживает лёгкие.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём разница между pipeline и driver внутри задачи?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7