Exchange model: буферы и обмен данными
Стадии связаны в дерево, и между ними данные идут по сети. Мы называли это словом «exchange», но не разбирали механику. А механика — это «до железа»: не «данные летают между нодами», а конкретные буферы на стороне отправителя, конкретные клиенты на стороне получателя, и механизм, который не даёт быстрому отправителю завалить медленного получателя.
Этот урок разбирает exchange-модель: output buffers, exchange clients, backpressure, а также различие локального и удалённого обмена.
Зачем нужен exchange
Распределённый план — дерево стадий. Данные текут снизу вверх: дочерняя стадия произвела результат, родительская должна его получить. Но стадии — это задачи на разных воркерах, разных физических машинах. Просто так данные между машинами не переходят — нужен механизм передачи.
Exchange — это и есть механизм передачи данных между стадиями запроса. Каждая граница между фрагментами в распределённом плане (помните RemoteSource в EXPLAIN?) — это точка exchange. На уровне операторов exchange представлен оператором exchange, который мы упоминали в прошлом уроке.
Наивная картина — «задача стадии-производителя шлёт данные задаче стадии-потребителя напрямую». Реальность сложнее, потому что отправитель и получатель работают с разной скоростью, асинхронно, и их нужно развязать. Развязывает их буфер — и именно поэтому простой схемы «producer шлёт consumer» недостаточно: между ними стоит буфер с двух сторон, и эту полную схему мы разберём в следующих разделах.
Output buffers: сторона производителя
Когда задача-производитель готова отдать данные, она не «толкает» их получателю. Она складывает готовые Page в output buffer — выходной буфер на своей стороне.
Output buffer — это область памяти на стороне задачи-производителя, где накапливаются произведённые Page в ожидании, пока их заберёт потребитель. Производитель работает в своём темпе: его драйверы прогоняют данные через операторы, готовые Page кладутся в output buffer. Производитель не ждёт получателя — он просто наполняет буфер.
Структура output buffer связана с типом распределения следующей стадии (SOURCE, HASH, BROADCAST из урока про фрагменты). Если следующая стадия — HASH-распределённая, output buffer логически разделён на части по числу получателей: Page для получателя 1 кладётся в одну часть, для получателя 2 — в другую, и хэш-функция от ключа определяет, в какую. Если распределение BROADCAST — один и тот же Page становится доступен всем получателям.
Exchange clients: сторона потребителя
На другом конце — задача-потребитель. Она тоже не ждёт, пока ей что-то «толкнут». Она сама забирает данные через exchange client.
Exchange client — это компонент на стороне задачи-потребителя, который активно вычитывает Page из output buffers задач-производителей. У задачи-потребителя обычно несколько exchange client-ов — по одному (или сгруппированных) на источники данных: ведь данные ей идут от многих задач дочерней стадии. Exchange client запрашивает Page из удалённого output buffer, получает их по сети и подаёт в операторы своей задачи.
Модель, таким образом, pull-ориентированная: получатель тянет данные, а не отправитель их толкает. Производитель наполняет свой output buffer; потребитель через exchange client опустошает его, забирая Page. Это и есть полная схема одного exchange.
Spark: Shuffle — самая дорогая операция Kafka Producer: батчинг и буферизацияBackpressure: как буфер регулирует скорость
Теперь главное «до железа». Производитель и потребитель работают с разной скоростью. Что если производитель быстрый, а потребитель медленный? Производитель будет наполнять output buffer быстрее, чем потребитель его опустошает. Без регулирования буфер рос бы бесконечно — и память кончилась бы.
Регулятор — backpressure (обратное давление), и он встроен в саму конструкцию буфера. Output buffer имеет ограниченный размер. Логика такая:
- Производитель кладёт Page в output buffer.
- Потребитель через exchange client забирает Page, освобождая место.
- Если потребитель медленный — буфер заполняется до предела.
- Когда output buffer полон, производитель не может положить новый Page. Его драйверы, упёршиеся в полный буфер, приостанавливаются — встают в ожидание.
- Потребитель забирает Page, в буфере освобождается место — приостановленные драйверы производителя продолжают работу.
Backpressure — это саморегуляция через ограниченный буфер. Никакого отдельного «менеджера скоростей» нет: сам факт, что буфер конечен, заставляет быстрого производителя притормозить до темпа медленного потребителя. Скорость всего конвейера выравнивается по самому медленному звену, и память под буферы остаётся ограниченной. Это распространяется по дереву: если медленна верхняя стадия, backpressure через её буферы тормозит стадию под ней, та — стадию под собой, и так до источников. Кластер не захлёбывается данными.
Backpressure объясняет, почему Trino не нужно материализовать промежуточные результаты целиком и при этом память не взрывается. Между стадиями стоят ограниченные буферы; быстрый производитель автоматически тормозится медленным потребителем; данные стримятся ровно с той скоростью, с какой их успевают потреблять. Это часть того, почему Trino работает потоково, а не материализует всё.
Локальный и удалённый обмен
Слово «exchange» относится к двум разным по масштабу вещам. Различать их важно.
Удалённый exchange (remote exchange) — это передача данных между стадиями, между задачами на разных воркерах. Идёт по сети. Это то, о чём шла речь выше: output buffer на одном воркере, exchange client на другом, данные пересекают сеть. Каждая граница фрагментов в EXPLAIN — remote exchange.
Локальный exchange (local exchange) — это передача данных внутри одной задачи, между её драйверами. По сети не идёт — всё в памяти одного воркера. Зачем он нужен: внутри задачи бывает нужно перераспределить данные между драйверами. Например, source-драйверы прочитали данные, а дальше внутри той же задачи их надо перегруппировать по ключу между драйверами следующего pipeline-а — это делает локальный exchange, в пределах памяти воркера, без сети.
Разница принципиальна для производительности. Удалённый exchange дорог: данные сериализуются, идут по сети, десериализуются. Локальный exchange дёшев: это перемещение в памяти. Поэтому движок старается делать как можно больше работы локально и минимизировать удалённый обмен — и типы фрагментов (broadcast vs hash) во многом про то, сколько данных уйдёт именно в дорогой удалённый exchange.
Exchange в общей картине
Сведём. Распределённый план — дерево стадий. Между стадиями данные передаёт удалённый exchange: задача-производитель копит Page в ограниченном output buffer, задача-потребитель через exchange client тянет их по сети. Ограниченность буфера даёт backpressure — быстрый производитель тормозится медленным потребителем, память не взрывается, скорости выравниваются по всему дереву. Внутри задачи данные между драйверами перемещает дешёвый локальный exchange без сети.
Exchange — это «кровеносная система» распределённого исполнения. Stage, task, driver, operator — это органы; exchange — то, что переносит между ними данные и регулирует поток. Понимание буферов и backpressure объясняет, почему Trino держит память под контролем, исполняя запрос потоково через весь кластер.
Попробуй сам
Эффекты exchange видны в EXPLAIN ANALYZE и Web UI:
- Выполните
EXPLAIN (TYPE DISTRIBUTED)для запроса сGROUP BY. КаждыйRemoteSource— точка удалённого exchange. Сколько их? - Снимите
EXPLAIN ANALYZEтого же запроса. Найдите объёмы данных, переданных между фрагментами по сети, — это нагрузка на удалённый exchange. - Сравните два join: маленькой и большой таблицы (broadcast) и двух больших таблиц (hash). Посмотрите, в каком случае по сети через exchange ушло больше данных.
- В Web UI на странице запроса поищите статистику буферов / output buffer стадий — признаки backpressure (заполненность буферов).
- Сформулируйте письменно, как ограниченный размер output buffer создаёт backpressure, и чем удалённый exchange принципиально дороже локального.