Learning Platform
Глоссарий Troubleshooting
Урок 03.04 · 14 мин
Средний
groupByaggcountsumavgminmaxpivotcuberollup

GroupBy и агрегации

groupBy — это wide трансформация, которая всегда вызывает shuffle. Данные с одинаковыми ключами группировки должны оказаться на одном executor для вычисления агрегатов. Понимание этого факта критично для производительности (подробнее о shuffle — см. M01 урок 06).

Визуализация Shuffle
df.filter(col("city") == "Moscow") → Narrow: данные остаются в своих партициях

Исходные партиции

P0

Moscow
SPb
Kazan

P1

SPb
Moscow
Moscow

P2

Kazan
Kazan
SPb

P3

Moscow
SPb
Kazan

Результат (в тех же партициях)

P0

Moscow

P1

Moscow
Moscow

P2

пусто

P3

Moscow
Строк до12
Строк после4
Партиций4 → 4
MoscowSPbKazan

Основы: groupBy + agg

Простая агрегация

from pyspark.sql.functions import count, sum, avg, min, max

df = spark.createDataFrame([
    ("Alice", "Engineering", 75000),
    ("Bob", "Marketing", 55000),
    ("Carol", "Engineering", 80000),
    ("Dave", "Marketing", 60000),
    ("Eve", "Engineering", 70000),
], ["name", "department", "salary"])

# Количество сотрудников по отделам
df.groupBy("department").count().show()
+-----------+-----+
| department|count|
+-----------+-----+
|Engineering|    3|
|  Marketing|    2|
+-----------+-----+
-- Spark SQL эквивалент
SELECT department, COUNT(*) as count
FROM employees
GROUP BY department

Несколько агрегатов через agg()

agg() позволяет вычислить несколько агрегатов за один проход:

from pyspark.sql.functions import count, sum, avg, min, max, round

df.groupBy("department").agg(
    count("*").alias("employee_count"),
    round(avg("salary"), 2).alias("avg_salary"),
    min("salary").alias("min_salary"),
    max("salary").alias("max_salary"),
    sum("salary").alias("total_salary"),
).show()
+-----------+--------------+----------+----------+----------+------------+
| department|employee_count|avg_salary|min_salary|max_salary|total_salary|
+-----------+--------------+----------+----------+----------+------------+
|Engineering|             3|  75000.0 |     70000|     80000|      225000|
|  Marketing|             2|  57500.0 |     55000|     60000|      115000|
+-----------+--------------+----------+----------+----------+------------+
-- Spark SQL эквивалент
SELECT
    department,
    COUNT(*) AS employee_count,
    ROUND(AVG(salary), 2) AS avg_salary,
    MIN(salary) AS min_salary,
    MAX(salary) AS max_salary,
    SUM(salary) AS total_salary
FROM employees
GROUP BY department

countDistinct и collect_list / collect_set

from pyspark.sql.functions import countDistinct, collect_list, collect_set

orders = spark.createDataFrame([
    ("Alice", "laptop", "Electronics"),
    ("Alice", "phone", "Electronics"),
    ("Bob", "laptop", "Electronics"),
    ("Bob", "book", "Books"),
    ("Carol", "phone", "Electronics"),
], ["customer", "product", "category"])

orders.groupBy("customer").agg(
    countDistinct("category").alias("unique_categories"),
    collect_list("product").alias("all_products"),
    collect_set("product").alias("unique_products"),
).show(truncate=False)
+--------+-----------------+------------------+---------------+
|customer|unique_categories|all_products      |unique_products|
+--------+-----------------+------------------+---------------+
|Alice   |1                |[laptop, phone]   |[laptop, phone]|
|Bob     |2                |[laptop, book]    |[laptop, book] |
|Carol   |1                |[phone]           |[phone]        |
+--------+-----------------+------------------+---------------+
WARNING

Анти-паттерн: collect_list() на high-cardinality колонках

collect_list() и collect_set() собирают все значения группы в один массив в памяти одного executor. Для группы с миллионами значений это вызывает OOM:

# ОПАСНО: если у пользователя 10 миллионов событий
events.groupBy("user_id").agg(
    collect_list("event_id").alias("all_events")  # OOM!
)

# БЕЗОПАСНО: ограничьте количество или используйте approx
events.groupBy("user_id").agg(
    count("event_id").alias("event_count"),          # скаляр
    countDistinct("event_type").alias("unique_types") # скаляр
)

Используйте collect_list/collect_set только для групп с десятками-сотнями значений (например, продукты в заказе), а не для миллионов.

HAVING: фильтрация после агрегации

В PySpark фильтрация результатов агрегации — это обычный filter() после groupBy().agg():

df.groupBy("department").agg(
    count("*").alias("emp_count"),
    avg("salary").alias("avg_salary"),
).filter(col("emp_count") > 2).show()
-- Spark SQL эквивалент
SELECT department, COUNT(*) AS emp_count, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
HAVING COUNT(*) > 2
TIP

KnowledgeCheck: Чем отличается filter() до groupBy от filter() после groupBy().agg()?

Ответ: filter() до groupBy фильтрует исходные строки перед агрегацией (аналог WHERE в SQL). Это уменьшает объём данных для shuffle. filter() после groupBy().agg() фильтрует результаты агрегации (аналог HAVING в SQL). Для производительности всегда фильтруйте как можно раньше: df.filter(condition).groupBy().agg() передаёт меньше данных через shuffle, чем df.groupBy().agg().filter(condition).

pivot(): кросс-табуляция

pivot() преобразует значения колонки в отдельные столбцы:

sales = spark.createDataFrame([
    ("Alice", "Q1", 100), ("Alice", "Q2", 150),
    ("Bob", "Q1", 200), ("Bob", "Q2", 120),
    ("Carol", "Q1", 180), ("Carol", "Q2", 160),
], ["name", "quarter", "amount"])

sales.groupBy("name").pivot("quarter").sum("amount").show()
+-----+---+---+
| name| Q1| Q2|
+-----+---+---+
|Alice|100|150|
|  Bob|200|120|
|Carol|180|160|
+-----+---+---+
-- Spark SQL эквивалент
SELECT * FROM sales
PIVOT (SUM(amount) FOR quarter IN ('Q1', 'Q2'))

Оптимизация pivot: Если вы знаете возможные значения, передайте их явно. Без этого Spark делает дополнительный проход по данным для определения уникальных значений:

# БЫСТРЕЕ: явный список значений
sales.groupBy("name").pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]).sum("amount")

# МЕДЛЕННЕЕ: Spark сканирует данные для определения уникальных значений quarter
sales.groupBy("name").pivot("quarter").sum("amount")

cube() и rollup(): OLAP-агрегации

rollup(): иерархическая агрегация

rollup() вычисляет агрегаты для иерархии группировок, от самой детализированной до общего итога:

sales = spark.createDataFrame([
    ("Russia", "Moscow", 1000),
    ("Russia", "SPb", 800),
    ("USA", "NYC", 1200),
    ("USA", "SF", 900),
], ["country", "city", "revenue"])

sales.rollup("country", "city").sum("revenue").orderBy("country", "city").show()
+-------+------+------------+
|country|  city|sum(revenue)|
+-------+------+------------+
|   NULL|  NULL|        3900|  -- общий итог
| Russia|  NULL|        1800|  -- итог по России
| Russia|Moscow|        1000|  -- детализация
| Russia|   SPb|         800|
|    USA|  NULL|        2100|  -- итог по USA
|    USA|   NYC|        1200|
|    USA|    SF|         900|
+-------+------+------------+
-- Spark SQL эквивалент
SELECT country, city, SUM(revenue)
FROM sales
GROUP BY ROLLUP(country, city)

cube(): все комбинации

cube() вычисляет агрегаты для всех возможных комбинаций группировочных колонок:

sales.cube("country", "city").sum("revenue").orderBy("country", "city").show()

В отличие от rollup, cube добавляет агрегаты по каждому отдельному измерению (например, итог по городу Moscow вне зависимости от страны).

-- Spark SQL эквивалент
SELECT country, city, SUM(revenue)
FROM sales
GROUP BY CUBE(country, city)

cube() и rollup() особенно полезны для OLAP-аналитики и построения отчётов с промежуточными итогами. Они заменяют множественные UNION ALL запросов с разными уровнями GROUP BY.

Анти-паттерн: groupBy на skewed ключах

Если данные неравномерно распределены (data skew), одна партиция получит непропорционально много данных:

# ПРОБЛЕМА: 90% заказов имеют country="USA"
# Один executor обрабатывает 90% данных, остальные простаивают
orders.groupBy("country").agg(sum("amount"))

Решение — salting (добавление случайного суффикса к ключу):

from pyspark.sql.functions import col, lit, rand, floor, concat

# Шаг 1: добавляем salt (случайное число 0-9)
salted = orders.withColumn("salt", floor(rand() * 10).cast("string"))
salted = salted.withColumn("salted_key", concat(col("country"), lit("_"), col("salt")))

# Шаг 2: агрегация по salted key (распределяет нагрузку)
partial = salted.groupBy("salted_key").agg(sum("amount").alias("partial_sum"))

# Шаг 3: убираем salt и финальная агрегация
result = partial.withColumn("country", col("salted_key").substr(1, 3))  # упрощённо
result = result.groupBy("country").agg(sum("partial_sum").alias("total"))

Этот паттерн называется two-stage aggregation — первый groupBy распределяет нагрузку равномерно, второй собирает финальный результат. AQE в Spark 3.2+ может автоматически обнаруживать skew в некоторых случаях.

TIP

KnowledgeCheck: Почему groupBy — wide трансформация, а filter — narrow?

Ответ: filter() обрабатывает каждую строку независимо внутри своей партиции: проверяет условие и либо оставляет строку, либо отбрасывает. Данные не перемещаются между executors. groupBy() требует, чтобы все строки с одинаковым ключом оказались на одном executor для вычисления агрегата. Это означает перераспределение данных через сеть (shuffle). Shuffle — самая дорогая операция в Spark (см. M01 урок 06): сериализация, запись на диск, сетевой трансфер, десериализация, merge-сортировка.

Что дальше?

В следующем уроке мы разберём window functions — мощный инструмент для аналитических вычислений, позволяющий вычислять ранги, скользящие средние и кумулятивные суммы без потери детализации строк.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 5. Какой метод позволяет вычислить несколько агрегатов (count, avg, max) за один проход данных через shuffle?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8