GroupBy и агрегации
groupBy — это wide трансформация, которая всегда вызывает shuffle. Данные с одинаковыми ключами группировки должны оказаться на одном executor для вычисления агрегатов. Понимание этого факта критично для производительности (подробнее о shuffle — см. M01 урок 06).
Исходные партиции
P0
P1
P2
P3
Результат (в тех же партициях)
P0
P1
P2
P3
Основы: groupBy + agg
Простая агрегация
from pyspark.sql.functions import count, sum, avg, min, max
df = spark.createDataFrame([
("Alice", "Engineering", 75000),
("Bob", "Marketing", 55000),
("Carol", "Engineering", 80000),
("Dave", "Marketing", 60000),
("Eve", "Engineering", 70000),
], ["name", "department", "salary"])
# Количество сотрудников по отделам
df.groupBy("department").count().show()
+-----------+-----+
| department|count|
+-----------+-----+
|Engineering| 3|
| Marketing| 2|
+-----------+-----+
-- Spark SQL эквивалент
SELECT department, COUNT(*) as count
FROM employees
GROUP BY department
Несколько агрегатов через agg()
agg() позволяет вычислить несколько агрегатов за один проход:
from pyspark.sql.functions import count, sum, avg, min, max, round
df.groupBy("department").agg(
count("*").alias("employee_count"),
round(avg("salary"), 2).alias("avg_salary"),
min("salary").alias("min_salary"),
max("salary").alias("max_salary"),
sum("salary").alias("total_salary"),
).show()
+-----------+--------------+----------+----------+----------+------------+
| department|employee_count|avg_salary|min_salary|max_salary|total_salary|
+-----------+--------------+----------+----------+----------+------------+
|Engineering| 3| 75000.0 | 70000| 80000| 225000|
| Marketing| 2| 57500.0 | 55000| 60000| 115000|
+-----------+--------------+----------+----------+----------+------------+
-- Spark SQL эквивалент
SELECT
department,
COUNT(*) AS employee_count,
ROUND(AVG(salary), 2) AS avg_salary,
MIN(salary) AS min_salary,
MAX(salary) AS max_salary,
SUM(salary) AS total_salary
FROM employees
GROUP BY department
countDistinct и collect_list / collect_set
from pyspark.sql.functions import countDistinct, collect_list, collect_set
orders = spark.createDataFrame([
("Alice", "laptop", "Electronics"),
("Alice", "phone", "Electronics"),
("Bob", "laptop", "Electronics"),
("Bob", "book", "Books"),
("Carol", "phone", "Electronics"),
], ["customer", "product", "category"])
orders.groupBy("customer").agg(
countDistinct("category").alias("unique_categories"),
collect_list("product").alias("all_products"),
collect_set("product").alias("unique_products"),
).show(truncate=False)
+--------+-----------------+------------------+---------------+
|customer|unique_categories|all_products |unique_products|
+--------+-----------------+------------------+---------------+
|Alice |1 |[laptop, phone] |[laptop, phone]|
|Bob |2 |[laptop, book] |[laptop, book] |
|Carol |1 |[phone] |[phone] |
+--------+-----------------+------------------+---------------+
Анти-паттерн: collect_list() на high-cardinality колонках
collect_list() и collect_set() собирают все значения группы в один массив в памяти одного executor. Для группы с миллионами значений это вызывает OOM:
# ОПАСНО: если у пользователя 10 миллионов событий
events.groupBy("user_id").agg(
collect_list("event_id").alias("all_events") # OOM!
)
# БЕЗОПАСНО: ограничьте количество или используйте approx
events.groupBy("user_id").agg(
count("event_id").alias("event_count"), # скаляр
countDistinct("event_type").alias("unique_types") # скаляр
)Используйте collect_list/collect_set только для групп с десятками-сотнями значений (например, продукты в заказе), а не для миллионов.
HAVING: фильтрация после агрегации
В PySpark фильтрация результатов агрегации — это обычный filter() после groupBy().agg():
df.groupBy("department").agg(
count("*").alias("emp_count"),
avg("salary").alias("avg_salary"),
).filter(col("emp_count") > 2).show()
-- Spark SQL эквивалент
SELECT department, COUNT(*) AS emp_count, AVG(salary) AS avg_salary
FROM employees
GROUP BY department
HAVING COUNT(*) > 2
KnowledgeCheck: Чем отличается filter() до groupBy от filter() после groupBy().agg()?
Ответ: filter() до groupBy фильтрует исходные строки перед агрегацией (аналог WHERE в SQL). Это уменьшает объём данных для shuffle. filter() после groupBy().agg() фильтрует результаты агрегации (аналог HAVING в SQL). Для производительности всегда фильтруйте как можно раньше: df.filter(condition).groupBy().agg() передаёт меньше данных через shuffle, чем df.groupBy().agg().filter(condition).
pivot(): кросс-табуляция
pivot() преобразует значения колонки в отдельные столбцы:
sales = spark.createDataFrame([
("Alice", "Q1", 100), ("Alice", "Q2", 150),
("Bob", "Q1", 200), ("Bob", "Q2", 120),
("Carol", "Q1", 180), ("Carol", "Q2", 160),
], ["name", "quarter", "amount"])
sales.groupBy("name").pivot("quarter").sum("amount").show()
+-----+---+---+
| name| Q1| Q2|
+-----+---+---+
|Alice|100|150|
| Bob|200|120|
|Carol|180|160|
+-----+---+---+
-- Spark SQL эквивалент
SELECT * FROM sales
PIVOT (SUM(amount) FOR quarter IN ('Q1', 'Q2'))
Оптимизация pivot: Если вы знаете возможные значения, передайте их явно. Без этого Spark делает дополнительный проход по данным для определения уникальных значений:
# БЫСТРЕЕ: явный список значений
sales.groupBy("name").pivot("quarter", ["Q1", "Q2", "Q3", "Q4"]).sum("amount")
# МЕДЛЕННЕЕ: Spark сканирует данные для определения уникальных значений quarter
sales.groupBy("name").pivot("quarter").sum("amount")
cube() и rollup(): OLAP-агрегации
rollup(): иерархическая агрегация
rollup() вычисляет агрегаты для иерархии группировок, от самой детализированной до общего итога:
sales = spark.createDataFrame([
("Russia", "Moscow", 1000),
("Russia", "SPb", 800),
("USA", "NYC", 1200),
("USA", "SF", 900),
], ["country", "city", "revenue"])
sales.rollup("country", "city").sum("revenue").orderBy("country", "city").show()
+-------+------+------------+
|country| city|sum(revenue)|
+-------+------+------------+
| NULL| NULL| 3900| -- общий итог
| Russia| NULL| 1800| -- итог по России
| Russia|Moscow| 1000| -- детализация
| Russia| SPb| 800|
| USA| NULL| 2100| -- итог по USA
| USA| NYC| 1200|
| USA| SF| 900|
+-------+------+------------+
-- Spark SQL эквивалент
SELECT country, city, SUM(revenue)
FROM sales
GROUP BY ROLLUP(country, city)
cube(): все комбинации
cube() вычисляет агрегаты для всех возможных комбинаций группировочных колонок:
sales.cube("country", "city").sum("revenue").orderBy("country", "city").show()
В отличие от rollup, cube добавляет агрегаты по каждому отдельному измерению (например, итог по городу Moscow вне зависимости от страны).
-- Spark SQL эквивалент
SELECT country, city, SUM(revenue)
FROM sales
GROUP BY CUBE(country, city)
cube() и rollup() особенно полезны для OLAP-аналитики и построения отчётов с промежуточными итогами. Они заменяют множественные UNION ALL запросов с разными уровнями GROUP BY.
Анти-паттерн: groupBy на skewed ключах
Если данные неравномерно распределены (data skew), одна партиция получит непропорционально много данных:
# ПРОБЛЕМА: 90% заказов имеют country="USA"
# Один executor обрабатывает 90% данных, остальные простаивают
orders.groupBy("country").agg(sum("amount"))
Решение — salting (добавление случайного суффикса к ключу):
from pyspark.sql.functions import col, lit, rand, floor, concat
# Шаг 1: добавляем salt (случайное число 0-9)
salted = orders.withColumn("salt", floor(rand() * 10).cast("string"))
salted = salted.withColumn("salted_key", concat(col("country"), lit("_"), col("salt")))
# Шаг 2: агрегация по salted key (распределяет нагрузку)
partial = salted.groupBy("salted_key").agg(sum("amount").alias("partial_sum"))
# Шаг 3: убираем salt и финальная агрегация
result = partial.withColumn("country", col("salted_key").substr(1, 3)) # упрощённо
result = result.groupBy("country").agg(sum("partial_sum").alias("total"))
Этот паттерн называется two-stage aggregation — первый groupBy распределяет нагрузку равномерно, второй собирает финальный результат. AQE в Spark 3.2+ может автоматически обнаруживать skew в некоторых случаях.
KnowledgeCheck: Почему groupBy — wide трансформация, а filter — narrow?
Ответ: filter() обрабатывает каждую строку независимо внутри своей партиции: проверяет условие и либо оставляет строку, либо отбрасывает. Данные не перемещаются между executors. groupBy() требует, чтобы все строки с одинаковым ключом оказались на одном executor для вычисления агрегата. Это означает перераспределение данных через сеть (shuffle). Shuffle — самая дорогая операция в Spark (см. M01 урок 06): сериализация, запись на диск, сетевой трансфер, десериализация, merge-сортировка.
Что дальше?
В следующем уроке мы разберём window functions — мощный инструмент для аналитических вычислений, позволяющий вычислять ранги, скользящие средние и кумулятивные суммы без потери детализации строк.