Трансформации: select, filter, withColumn
Трансформации — это основа работы с DataFrame. В этом уроке мы разберём операции, которые вы будете использовать в каждом Spark-приложении: выбор колонок, фильтрация строк, добавление и переименование колонок.
Все трансформации в этом уроке — narrow: они не вызывают shuffle и обрабатываются внутри каждой партиции независимо (подробнее о narrow vs wide трансформациях — см. M01 урок 06).
select(): выбор колонок
select() возвращает DataFrame с указанными колонками. Работает аналогично SELECT в SQL.
По имени строкой
df.select("name", "salary").show()
-- Spark SQL эквивалент
SELECT name, salary FROM employees
Через col() выражения
col() позволяет строить выражения:
from pyspark.sql.functions import col
df.select(
col("name"),
col("salary"),
(col("salary") * 0.13).alias("tax")
).show()
-- Spark SQL эквивалент
SELECT name, salary, salary * 0.13 AS tax FROM employees
Через Column объект DataFrame
df.select(df.name, df.salary, (df.salary * 12).alias("annual")).show()
Анти-паттерн: df.select("*")
Не используйте select("*") или select(df["*"]) — это загружает все колонки. В таблице с 200 колонками это означает чтение лишних данных с диска (особенно в columnar форматах как Parquet, где column pruning пропускает ненужные колонки). Всегда выбирайте только нужные колонки:
# ПЛОХО: читает все 200 колонок
df.select("*").filter(df.status == "active")
# ХОРОШО: читает только 3 колонки (Parquet column pruning)
df.select("id", "name", "status").filter(df.status == "active")filter() / where(): фильтрация строк
filter() и where() — полностью идентичны. where() существует для разработчиков, привыкших к SQL-синтаксису.
Условие через Column
# Оба варианта идентичны:
df.filter(col("salary") > 60000).show()
df.where(col("salary") > 60000).show()
# Через Column объект DataFrame:
df.filter(df.salary > 60000).show()
-- Spark SQL эквивалент
SELECT * FROM employees WHERE salary > 60000
Составные условия
from pyspark.sql.functions import col
# AND -- оператор &
df.filter(
(col("salary") > 60000) & (col("department") == "Engineering")
).show()
# OR -- оператор |
df.filter(
(col("salary") > 80000) | (col("department") == "Marketing")
).show()
# NOT -- оператор ~
df.filter(~col("name").startswith("A")).show()
-- Spark SQL эквивалент
SELECT * FROM employees
WHERE salary > 60000 AND department = 'Engineering'
SELECT * FROM employees
WHERE salary > 80000 OR department = 'Marketing'
SELECT * FROM employees
WHERE name NOT LIKE 'A%'
KnowledgeCheck: Почему в PySpark используется & вместо and для логического И?
Ответ: В Python and — это оператор для скалярных значений (bool). Column-выражения PySpark — это объекты, а не булевы значения. Оператор & перегружен в классе Column для создания выражений, которые Catalyst optimizer может оптимизировать. Попытка использовать and вызовет ошибку ValueError: The truth value of a Column is ambiguous. Не забывайте скобки: (cond1) & (cond2) — приоритет & выше, чем ==.
Фильтрация с isin, between, like
# isin -- проверка вхождения в список
df.filter(col("department").isin("Engineering", "Marketing")).show()
# between -- диапазон значений
df.filter(col("salary").between(50000, 80000)).show()
# like -- паттерн LIKE из SQL
df.filter(col("name").like("A%")).show()
# isNull / isNotNull
df.filter(col("bonus").isNotNull()).show()
withColumn(): добавление и изменение колонок
withColumn() добавляет новую колонку или заменяет существующую:
from pyspark.sql.functions import col, lit, when
# Добавить колонку
df2 = df.withColumn("tax", col("salary") * 0.13)
# Изменить существующую колонку (тот же синтаксис)
df3 = df2.withColumn("salary", col("salary") * 1.1) # повышение на 10%
# Условная колонка с when/otherwise
df4 = df.withColumn(
"level",
when(col("salary") > 80000, "senior")
.when(col("salary") > 50000, "middle")
.otherwise("junior")
)
-- Spark SQL эквивалент
SELECT *,
salary * 0.13 AS tax,
CASE
WHEN salary > 80000 THEN 'senior'
WHEN salary > 50000 THEN 'middle'
ELSE 'junior'
END AS level
FROM employees
Анти-паттерн: цепочка withColumn() вместо одного select()
Каждый вызов withColumn() создаёт новый DataFrame с копированием метаданных всех колонок. Для 10+ новых колонок используйте один select():
# ПЛОХО: 5 промежуточных DataFrame
df = df.withColumn("tax", col("salary") * 0.13)
df = df.withColumn("net", col("salary") - col("tax"))
df = df.withColumn("annual", col("salary") * 12)
df = df.withColumn("bonus_pct", col("bonus") / col("salary"))
df = df.withColumn("total", col("salary") + col("bonus"))
# ХОРОШО: один select с вычисленными колонками
df = df.select(
"*",
(col("salary") * 0.13).alias("tax"),
(col("salary") - col("salary") * 0.13).alias("net"),
(col("salary") * 12).alias("annual"),
(col("bonus") / col("salary")).alias("bonus_pct"),
(col("salary") + col("bonus")).alias("total"),
)Catalyst optimizer может обработать обе формы одинаково, но при десятках колонок цепочка withColumn создаёт глубокий логический план, замедляя анализ.
withColumnRenamed() и drop()
Переименование колонок
df.withColumnRenamed("salary", "base_salary").show()
-- Spark SQL эквивалент
SELECT name, department, salary AS base_salary FROM employees
Удаление колонок
df.drop("department", "bonus").show()
Цепочка трансформаций
Трансформации возвращают новый DataFrame, поэтому их можно объединять в цепочку:
result = (
df
.filter(col("salary") > 50000)
.select("name", "department", "salary")
.withColumn("tax", col("salary") * 0.13)
.withColumnRenamed("salary", "base_salary")
.orderBy(col("base_salary").desc())
)
result.show()
-- Spark SQL эквивалент
SELECT
name,
department,
salary AS base_salary,
salary * 0.13 AS tax
FROM employees
WHERE salary > 50000
ORDER BY base_salary DESC
Благодаря lazy evaluation, вся цепочка анализируется Catalyst optimizer как единый план, а не выполняется пошагово. Catalyst может переставить операции для оптимальности (например, перенести фильтр ближе к источнику данных).
KnowledgeCheck: Трансформации select и filter — narrow операции, не вызывающие shuffle (см. M01 урок 06). Почему это важно для производительности?
Ответ: Narrow трансформации обрабатывают данные внутри каждой партиции независимо, без обмена данными между executors. Это означает: (1) нет сетевого трафика, (2) нет записи shuffle-файлов на диск, (3) задачи выполняются параллельно без координации. Wide трансформации (groupBy, join) требуют shuffle — самой дорогой операции в Spark. Поэтому цепочка select -> filter -> withColumn выполняется значительно быстрее, чем groupBy -> join.
Что дальше?
В следующем уроке мы разберём joins — одну из самых критичных и потенциально дорогих операций в Spark. Вы узнаете все типы join (inner, left, outer, cross, semi, anti), и главное — какую стратегию join выберет Spark и как на это повлиять.