Learning Platform
Глоссарий Troubleshooting
Урок 03.05 · 16 мин
Средний
Windowrow_numberrankdense_rankleadlagpartitionByorderByCumulative

Window-функции

Зачем нужны window-функции?

Агрегатные функции (groupBy + agg) схлопывают строки — результат содержит по одной строке на группу. Но что если вам нужно вычислить агрегат по группе, сохранив каждую исходную строку? Именно для этого существуют window-функции.

Типичные задачи:

  • Пронумеровать строки внутри каждой группы
  • Найти зарплату, которая на одну позицию выше в рейтинге
  • Посчитать накопительную сумму продаж по месяцам
  • Сравнить текущую строку с предыдущей/следующей

Window-функции вычисляют значение по окну строк относительно текущей строки, не уменьшая количество строк в результате.

Window Specification: partitionBy + orderBy

Любая window-функция требует спецификацию окна — набор правил, определяющих:

  1. partitionBy — по каким колонкам группировать (аналог GROUP BY, но без схлопывания)
  2. orderBy — в каком порядке обрабатывать строки внутри группы
from pyspark.sql import Window
from pyspark.sql.functions import row_number, rank, dense_rank, col

# Определяем спецификацию окна
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

# Применяем window-функцию
df_ranked = employees.withColumn("rank", rank().over(window_spec))

В SQL эквиваленте это выглядит так:

SELECT *,
  RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees
TIP

partitionBy в Window — это не GROUP BY! GROUP BY агрегирует строки в одну на группу. Window.partitionBy определяет рамки, в которых вычисляется функция, но каждая исходная строка остаётся в результате. Количество строк на входе = количеству строк на выходе.

Ранжирующие функции: row_number, rank, dense_rank

Три функции для нумерации строк внутри окна — отличия проявляются при одинаковых значениях (ties):

from pyspark.sql.functions import row_number, rank, dense_rank

window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

result = employees.select(
    "name", "department", "salary",
    row_number().over(window_spec).alias("row_num"),
    rank().over(window_spec).alias("rank"),
    dense_rank().over(window_spec).alias("dense_rank")
)
result.show()
+------+----------+------+-------+----+----------+
|  name|department|salary|row_num|rank|dense_rank|
+------+----------+------+-------+----+----------+
| Alice|     Sales| 80000|      1|   1|         1|
|   Bob|     Sales| 80000|      2|   1|         1|
| Carol|     Sales| 70000|      3|   3|         2|
|  Dave|     Sales| 60000|      4|   4|         3|
+------+----------+------+-------+----+----------+
ФункцияПоведение при tiesПропуски
row_number()Всегда уникальный номер (порядок при ties не определён)Нет
rank()Одинаковые значения получают одинаковый рангДа (1, 1, 3, 4)
dense_rank()Одинаковые значения получают одинаковый рангНет (1, 1, 2, 3)

Типичное применение: row_number() для дедупликации (оставить одну строку из группы), rank()/dense_rank() для рейтингов.

Проверка знанийKnowledge check
В чём разница между rank() и dense_rank() при наличии одинаковых значений?
ОтветAnswer
rank() присваивает одинаковый ранг строкам с равными значениями, но пропускает следующие номера -- например, 1, 1, 3 (пропущен 2). dense_rank() также даёт одинаковый ранг при ties, но не пропускает номера -- 1, 1, 2, 3. row_number() всегда возвращает уникальный номер, даже при ties (порядок между ties не гарантирован).

Аналитические функции: lead и lag

lead() и lag() позволяют сравнивать текущую строку с соседними внутри окна:

from pyspark.sql.functions import lead, lag

window_by_date = Window.partitionBy("product").orderBy("sale_date")

sales_analysis = sales.select(
    "product", "sale_date", "revenue",
    lag("revenue", 1).over(window_by_date).alias("prev_revenue"),
    lead("revenue", 1).over(window_by_date).alias("next_revenue")
).withColumn(
    "growth", col("revenue") - col("prev_revenue")
)
  • lag("column", N) — значение N строк назад (предыдущие)
  • lead("column", N) — значение N строк вперёд (следующие)
  • Третий аргумент — значение по умолчанию (если нет предыдущей/следующей строки): lag("revenue", 1, 0)

Классическое применение: сравнение с предыдущим периодом (month-over-month, week-over-week).

Агрегатные window-функции: running sum, moving average

Стандартные агрегатные функции (sum, avg, count) можно использовать как window-функции для накопительных и скользящих вычислений:

from pyspark.sql.functions import sum as _sum, avg as _avg

# Накопительная сумма (running sum)
window_cumulative = (
    Window.partitionBy("department")
    .orderBy("month")
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
)

df.withColumn("cumulative_revenue", _sum("revenue").over(window_cumulative))

# Скользящее среднее за 3 месяца
window_moving = (
    Window.partitionBy("department")
    .orderBy("month")
    .rowsBetween(-2, Window.currentRow)
)

df.withColumn("moving_avg_3m", _avg("revenue").over(window_moving))

Frame specification: rowsBetween и rangeBetween

По умолчанию при наличии orderBy окно включает строки от начала партиции до текущей строки. Вы можете явно задать рамку окна:

МетодЕдиница измеренияПример
rowsBetween(start, end)Физические строкиrowsBetween(-2, 0) — 3 строки
rangeBetween(start, end)Диапазон значенийrangeBetween(-1000, 0) — значения в пределах 1000

Специальные константы:

  • Window.unboundedPreceding — от начала партиции
  • Window.unboundedFollowing — до конца партиции
  • Window.currentRow — текущая строка (0)
# Все строки партиции (полный агрегат без схлопывания)
Window.partitionBy("dept").rowsBetween(
    Window.unboundedPreceding, Window.unboundedFollowing
)

Анти-паттерн: Window без partitionBy

# ОПАСНО: без partitionBy весь DataFrame -- одна партиция!
window_all = Window.orderBy("salary")
df.withColumn("global_rank", rank().over(window_all))

Без partitionBy Spark обрабатывает весь DataFrame как одну партицию. Все данные перемещаются на один executor, что приводит к:

  • OOM на executor при больших данных
  • Нулевому параллелизму — одна задача обрабатывает всё

Всегда указывайте partitionBy, если только вам действительно не нужен глобальный ранг по всему датасету. Для глобального ранга рассмотрите альтернативы: monotonically_increasing_id() или сэмплирование.

WARNING

Shuffle при window-операциях: Window операции с partitionBy вызывают shuffle для перегруппировки данных — все строки с одинаковым ключом партиции должны оказаться на одном executor. Это аналогично shuffle при groupBy, но с дополнительной сортировкой внутри каждой партиции по orderBy. Подробнее о механике shuffle — в уроке по архитектуре (M01, Shuffle Deep-Dive).

Проверка знанийKnowledge check
Почему использование Window.orderBy() без partitionBy() на большом DataFrame приводит к проблемам производительности?
ОтветAnswer
Без partitionBy() Spark рассматривает весь DataFrame как одну гигантскую партицию. Все данные перемещаются на один executor, что убивает параллелизм (одна задача вместо сотен) и может привести к OutOfMemoryError, если данные не помещаются в память одного executor. Всегда используйте partitionBy() для ограничения размера окна.

Что дальше?

В следующем уроке мы изучим Spark SQL и Catalog API — как выполнять SQL-запросы поверх DataFrame, регистрировать временные представления и работать с метаданными через spark.catalog.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. В чём ключевое отличие window-функций от groupBy + agg?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8