Learning Platform
Глоссарий Troubleshooting
Урок 06.06 · 23 мин
Средний
distributed-executionexchangebuffersbackpressure

Exchange model: буферы и обмен данными

Стадии связаны в дерево, и между ними данные идут по сети. Мы называли это словом «exchange», но не разбирали механику. А механика — это «до железа»: не «данные летают между нодами», а конкретные буферы на стороне отправителя, конкретные клиенты на стороне получателя, и механизм, который не даёт быстрому отправителю завалить медленного получателя.

Этот урок разбирает exchange-модель: output buffers, exchange clients, backpressure, а также различие локального и удалённого обмена.


Зачем нужен exchange

Распределённый план — дерево стадий. Данные текут снизу вверх: дочерняя стадия произвела результат, родительская должна его получить. Но стадии — это задачи на разных воркерах, разных физических машинах. Просто так данные между машинами не переходят — нужен механизм передачи.

Exchange — это и есть механизм передачи данных между стадиями запроса. Каждая граница между фрагментами в распределённом плане (помните RemoteSource в EXPLAIN?) — это точка exchange. На уровне операторов exchange представлен оператором exchange, который мы упоминали в прошлом уроке.

Наивная картина — «задача стадии-производителя шлёт данные задаче стадии-потребителя напрямую». Реальность сложнее, потому что отправитель и получатель работают с разной скоростью, асинхронно, и их нужно развязать. Развязывает их буфер — и именно поэтому простой схемы «producer шлёт consumer» недостаточно: между ними стоит буфер с двух сторон, и эту полную схему мы разберём в следующих разделах.


Output buffers: сторона производителя

Когда задача-производитель готова отдать данные, она не «толкает» их получателю. Она складывает готовые Page в output buffer — выходной буфер на своей стороне.

Output buffer — это область памяти на стороне задачи-производителя, где накапливаются произведённые Page в ожидании, пока их заберёт потребитель. Производитель работает в своём темпе: его драйверы прогоняют данные через операторы, готовые Page кладутся в output buffer. Производитель не ждёт получателя — он просто наполняет буфер.

Структура output buffer связана с типом распределения следующей стадии (SOURCE, HASH, BROADCAST из урока про фрагменты). Если следующая стадия — HASH-распределённая, output buffer логически разделён на части по числу получателей: Page для получателя 1 кладётся в одну часть, для получателя 2 — в другую, и хэш-функция от ключа определяет, в какую. Если распределение BROADCAST — один и тот же Page становится доступен всем получателям.


Exchange clients: сторона потребителя

На другом конце — задача-потребитель. Она тоже не ждёт, пока ей что-то «толкнут». Она сама забирает данные через exchange client.

Exchange client — это компонент на стороне задачи-потребителя, который активно вычитывает Page из output buffers задач-производителей. У задачи-потребителя обычно несколько exchange client-ов — по одному (или сгруппированных) на источники данных: ведь данные ей идут от многих задач дочерней стадии. Exchange client запрашивает Page из удалённого output buffer, получает их по сети и подаёт в операторы своей задачи.

Модель, таким образом, pull-ориентированная: получатель тянет данные, а не отправитель их толкает. Производитель наполняет свой output buffer; потребитель через exchange client опустошает его, забирая Page. Это и есть полная схема одного exchange.

Spark: Shuffle — самая дорогая операция Kafka Producer: батчинг и буферизация
Полная схема exchange: buffer и client
Драйверы producerПроизводят Page, кладут в output buffer.
кладут
Output bufferБуфер на стороне producer. Накапливает Page.
exchange client тянет по сети
Exchange clientКомпонент на стороне consumer. Активно забирает Page из удалённого буфера.
подаёт в операторы
Операторы consumerПолучают данные от exchange client и обрабатывают.

Backpressure: как буфер регулирует скорость

Теперь главное «до железа». Производитель и потребитель работают с разной скоростью. Что если производитель быстрый, а потребитель медленный? Производитель будет наполнять output buffer быстрее, чем потребитель его опустошает. Без регулирования буфер рос бы бесконечно — и память кончилась бы.

Регулятор — backpressure (обратное давление), и он встроен в саму конструкцию буфера. Output buffer имеет ограниченный размер. Логика такая:

  1. Производитель кладёт Page в output buffer.
  2. Потребитель через exchange client забирает Page, освобождая место.
  3. Если потребитель медленный — буфер заполняется до предела.
  4. Когда output buffer полон, производитель не может положить новый Page. Его драйверы, упёршиеся в полный буфер, приостанавливаются — встают в ожидание.
  5. Потребитель забирает Page, в буфере освобождается место — приостановленные драйверы производителя продолжают работу.
Backpressure: полный буфер тормозит производителя
Производитель быстрее потребителяДрайверы producer выдают Page быстрее, чем consumer их забирает.
буфер наполняется
Output buffer полонБуфер ограничен по размеру. Места для нового Page нет.
производитель не может положить Page
Драйверы producer приостановленыДрайверы producer ждут, пока в буфере освободится место. Это и есть backpressure.

Backpressure — это саморегуляция через ограниченный буфер. Никакого отдельного «менеджера скоростей» нет: сам факт, что буфер конечен, заставляет быстрого производителя притормозить до темпа медленного потребителя. Скорость всего конвейера выравнивается по самому медленному звену, и память под буферы остаётся ограниченной. Это распространяется по дереву: если медленна верхняя стадия, backpressure через её буферы тормозит стадию под ней, та — стадию под собой, и так до источников. Кластер не захлёбывается данными.

NOTE

Backpressure объясняет, почему Trino не нужно материализовать промежуточные результаты целиком и при этом память не взрывается. Между стадиями стоят ограниченные буферы; быстрый производитель автоматически тормозится медленным потребителем; данные стримятся ровно с той скоростью, с какой их успевают потреблять. Это часть того, почему Trino работает потоково, а не материализует всё.


Локальный и удалённый обмен

Слово «exchange» относится к двум разным по масштабу вещам. Различать их важно.

Удалённый exchange (remote exchange) — это передача данных между стадиями, между задачами на разных воркерах. Идёт по сети. Это то, о чём шла речь выше: output buffer на одном воркере, exchange client на другом, данные пересекают сеть. Каждая граница фрагментов в EXPLAIN — remote exchange.

Локальный exchange (local exchange) — это передача данных внутри одной задачи, между её драйверами. По сети не идёт — всё в памяти одного воркера. Зачем он нужен: внутри задачи бывает нужно перераспределить данные между драйверами. Например, source-драйверы прочитали данные, а дальше внутри той же задачи их надо перегруппировать по ключу между драйверами следующего pipeline-а — это делает локальный exchange, в пределах памяти воркера, без сети.

Локальный и удалённый exchange
Удалённый exchangeМежду стадиями, между задачами на разных воркерах. Идёт по сети. Output buffer плюс exchange client.
другой масштаб
Локальный exchangeВнутри одной задачи, между её драйверами. В памяти одного воркера, без сети.

Разница принципиальна для производительности. Удалённый exchange дорог: данные сериализуются, идут по сети, десериализуются. Локальный exchange дёшев: это перемещение в памяти. Поэтому движок старается делать как можно больше работы локально и минимизировать удалённый обмен — и типы фрагментов (broadcast vs hash) во многом про то, сколько данных уйдёт именно в дорогой удалённый exchange.


Exchange в общей картине

Сведём. Распределённый план — дерево стадий. Между стадиями данные передаёт удалённый exchange: задача-производитель копит Page в ограниченном output buffer, задача-потребитель через exchange client тянет их по сети. Ограниченность буфера даёт backpressure — быстрый производитель тормозится медленным потребителем, память не взрывается, скорости выравниваются по всему дереву. Внутри задачи данные между драйверами перемещает дешёвый локальный exchange без сети.

Exchange — это «кровеносная система» распределённого исполнения. Stage, task, driver, operator — это органы; exchange — то, что переносит между ними данные и регулирует поток. Понимание буферов и backpressure объясняет, почему Trino держит память под контролем, исполняя запрос потоково через весь кластер.


Попробуй сам

Эффекты exchange видны в EXPLAIN ANALYZE и Web UI:

  1. Выполните EXPLAIN (TYPE DISTRIBUTED) для запроса с GROUP BY. Каждый RemoteSource — точка удалённого exchange. Сколько их?
  2. Снимите EXPLAIN ANALYZE того же запроса. Найдите объёмы данных, переданных между фрагментами по сети, — это нагрузка на удалённый exchange.
  3. Сравните два join: маленькой и большой таблицы (broadcast) и двух больших таблиц (hash). Посмотрите, в каком случае по сети через exchange ушло больше данных.
  4. В Web UI на странице запроса поищите статистику буферов / output buffer стадий — признаки backpressure (заполненность буферов).
  5. Сформулируйте письменно, как ограниченный размер output buffer создаёт backpressure, и чем удалённый exchange принципиально дороже локального.

Проверка знанийKnowledge check
Как устроен удалённый exchange через output buffers и exchange clients, что такое backpressure и чем удалённый обмен отличается от локального?
ОтветAnswer
Удалённый exchange передаёт данные между стадиями — между задачами на разных воркерах, по сети. На стороне задачи-производителя есть output buffer — ограниченная по размеру область памяти, куда драйверы складывают готовые Page; производитель работает в своём темпе и просто наполняет буфер, не дожидаясь получателя. На стороне задачи-потребителя есть exchange client — компонент, который активно сам забирает Page из удалённых output buffers задач-производителей по сети и подаёт их в операторы. Модель pull-ориентированная: получатель тянет данные, а не отправитель толкает. Backpressure — это саморегуляция скорости через ограниченный буфер. Если производитель быстрее потребителя, output buffer заполняется до предела; когда буфер полон, производитель не может положить новый Page, и его драйверы приостанавливаются; как только потребитель забирает Page и место освобождается, драйверы продолжают. Никакого отдельного менеджера скоростей нет — сам конечный размер буфера заставляет быстрого производителя притормозить до темпа медленного потребителя, и это распространяется вверх по дереву стадий, так что память под буферы остаётся ограниченной. Локальный exchange — это другой масштаб: передача данных внутри одной задачи, между её драйверами, целиком в памяти одного воркера, без сети. Удалённый exchange дорог — данные сериализуются, идут по сети, десериализуются; локальный дёшев — это перемещение в памяти. Поэтому движок старается делать больше работы локально и минимизировать удалённый обмен.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Как устроена передача данных в удалённом exchange со стороны производителя и потребителя?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 7