Window-функции
Зачем нужны window-функции?
Агрегатные функции (groupBy + agg) схлопывают строки — результат содержит по одной строке на группу. Но что если вам нужно вычислить агрегат по группе, сохранив каждую исходную строку? Именно для этого существуют window-функции.
Типичные задачи:
- Пронумеровать строки внутри каждой группы
- Найти зарплату, которая на одну позицию выше в рейтинге
- Посчитать накопительную сумму продаж по месяцам
- Сравнить текущую строку с предыдущей/следующей
Window-функции вычисляют значение по окну строк относительно текущей строки, не уменьшая количество строк в результате.
Window Specification: partitionBy + orderBy
Любая window-функция требует спецификацию окна — набор правил, определяющих:
- partitionBy — по каким колонкам группировать (аналог GROUP BY, но без схлопывания)
- orderBy — в каком порядке обрабатывать строки внутри группы
from pyspark.sql import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col
# Определяем спецификацию окна
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
# Применяем window-функцию
df_ranked = employees.withColumn("rank", rank().over(window_spec))
В SQL эквиваленте это выглядит так:
SELECT *,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees
partitionBy в Window — это не GROUP BY! GROUP BY агрегирует строки в одну на группу. Window.partitionBy определяет рамки, в которых вычисляется функция, но каждая исходная строка остаётся в результате. Количество строк на входе = количеству строк на выходе.
Ранжирующие функции: row_number, rank, dense_rank
Три функции для нумерации строк внутри окна — отличия проявляются при одинаковых значениях (ties):
from pyspark.sql.functions import row_number, rank, dense_rank
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())
result = employees.select(
"name", "department", "salary",
row_number().over(window_spec).alias("row_num"),
rank().over(window_spec).alias("rank"),
dense_rank().over(window_spec).alias("dense_rank")
)
result.show()
+------+----------+------+-------+----+----------+
| name|department|salary|row_num|rank|dense_rank|
+------+----------+------+-------+----+----------+
| Alice| Sales| 80000| 1| 1| 1|
| Bob| Sales| 80000| 2| 1| 1|
| Carol| Sales| 70000| 3| 3| 2|
| Dave| Sales| 60000| 4| 4| 3|
+------+----------+------+-------+----+----------+
| Функция | Поведение при ties | Пропуски |
|---|---|---|
row_number() | Всегда уникальный номер (порядок при ties не определён) | Нет |
rank() | Одинаковые значения получают одинаковый ранг | Да (1, 1, 3, 4) |
dense_rank() | Одинаковые значения получают одинаковый ранг | Нет (1, 1, 2, 3) |
Типичное применение: row_number() для дедупликации (оставить одну строку из группы), rank()/dense_rank() для рейтингов.
Аналитические функции: lead и lag
lead() и lag() позволяют сравнивать текущую строку с соседними внутри окна:
from pyspark.sql.functions import lead, lag
window_by_date = Window.partitionBy("product").orderBy("sale_date")
sales_analysis = sales.select(
"product", "sale_date", "revenue",
lag("revenue", 1).over(window_by_date).alias("prev_revenue"),
lead("revenue", 1).over(window_by_date).alias("next_revenue")
).withColumn(
"growth", col("revenue") - col("prev_revenue")
)
lag("column", N)— значение N строк назад (предыдущие)lead("column", N)— значение N строк вперёд (следующие)- Третий аргумент — значение по умолчанию (если нет предыдущей/следующей строки):
lag("revenue", 1, 0)
Классическое применение: сравнение с предыдущим периодом (month-over-month, week-over-week).
Агрегатные window-функции: running sum, moving average
Стандартные агрегатные функции (sum, avg, count) можно использовать как window-функции для накопительных и скользящих вычислений:
from pyspark.sql.functions import sum as _sum, avg as _avg
# Накопительная сумма (running sum)
window_cumulative = (
Window.partitionBy("department")
.orderBy("month")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
df.withColumn("cumulative_revenue", _sum("revenue").over(window_cumulative))
# Скользящее среднее за 3 месяца
window_moving = (
Window.partitionBy("department")
.orderBy("month")
.rowsBetween(-2, Window.currentRow)
)
df.withColumn("moving_avg_3m", _avg("revenue").over(window_moving))
Frame specification: rowsBetween и rangeBetween
По умолчанию при наличии orderBy окно включает строки от начала партиции до текущей строки. Вы можете явно задать рамку окна:
| Метод | Единица измерения | Пример |
|---|---|---|
rowsBetween(start, end) | Физические строки | rowsBetween(-2, 0) — 3 строки |
rangeBetween(start, end) | Диапазон значений | rangeBetween(-1000, 0) — значения в пределах 1000 |
Специальные константы:
Window.unboundedPreceding— от начала партицииWindow.unboundedFollowing— до конца партицииWindow.currentRow— текущая строка (0)
# Все строки партиции (полный агрегат без схлопывания)
Window.partitionBy("dept").rowsBetween(
Window.unboundedPreceding, Window.unboundedFollowing
)
Анти-паттерн: Window без partitionBy
# ОПАСНО: без partitionBy весь DataFrame -- одна партиция!
window_all = Window.orderBy("salary")
df.withColumn("global_rank", rank().over(window_all))
Без partitionBy Spark обрабатывает весь DataFrame как одну партицию. Все данные перемещаются на один executor, что приводит к:
- OOM на executor при больших данных
- Нулевому параллелизму — одна задача обрабатывает всё
Всегда указывайте partitionBy, если только вам действительно не нужен глобальный ранг по всему датасету. Для глобального ранга рассмотрите альтернативы: monotonically_increasing_id() или сэмплирование.
Shuffle при window-операциях: Window операции с partitionBy вызывают shuffle для перегруппировки данных — все строки с одинаковым ключом партиции должны оказаться на одном executor. Это аналогично shuffle при groupBy, но с дополнительной сортировкой внутри каждой партиции по orderBy. Подробнее о механике shuffle — в уроке по архитектуре (M01, Shuffle Deep-Dive).
Что дальше?
В следующем уроке мы изучим Spark SQL и Catalog API — как выполнять SQL-запросы поверх DataFrame, регистрировать временные представления и работать с метаданными через spark.catalog.