Learning Platform
Глоссарий Troubleshooting
Урок 03.02 · 14 мин
Начальный
selectfilterwherewithColumnwithColumnRenameddropNarrow Transformations

Трансформации: select, filter, withColumn

Трансформации — это основа работы с DataFrame. В этом уроке мы разберём операции, которые вы будете использовать в каждом Spark-приложении: выбор колонок, фильтрация строк, добавление и переименование колонок.

Все трансформации в этом уроке — narrow: они не вызывают shuffle и обрабатываются внутри каждой партиции независимо (подробнее о narrow vs wide трансформациях — см. M01 урок 06).

select(): выбор колонок

select() возвращает DataFrame с указанными колонками. Работает аналогично SELECT в SQL.

По имени строкой

df.select("name", "salary").show()
-- Spark SQL эквивалент
SELECT name, salary FROM employees

Через col() выражения

col() позволяет строить выражения:

from pyspark.sql.functions import col

df.select(
    col("name"),
    col("salary"),
    (col("salary") * 0.13).alias("tax")
).show()
-- Spark SQL эквивалент
SELECT name, salary, salary * 0.13 AS tax FROM employees

Через Column объект DataFrame

df.select(df.name, df.salary, (df.salary * 12).alias("annual")).show()
WARNING

Анти-паттерн: df.select("*")

Не используйте select("*") или select(df["*"]) — это загружает все колонки. В таблице с 200 колонками это означает чтение лишних данных с диска (особенно в columnar форматах как Parquet, где column pruning пропускает ненужные колонки). Всегда выбирайте только нужные колонки:

# ПЛОХО: читает все 200 колонок
df.select("*").filter(df.status == "active")

# ХОРОШО: читает только 3 колонки (Parquet column pruning)
df.select("id", "name", "status").filter(df.status == "active")

filter() / where(): фильтрация строк

filter() и where()полностью идентичны. where() существует для разработчиков, привыкших к SQL-синтаксису.

Условие через Column

# Оба варианта идентичны:
df.filter(col("salary") > 60000).show()
df.where(col("salary") > 60000).show()

# Через Column объект DataFrame:
df.filter(df.salary > 60000).show()
-- Spark SQL эквивалент
SELECT * FROM employees WHERE salary > 60000

Составные условия

from pyspark.sql.functions import col

# AND -- оператор &
df.filter(
    (col("salary") > 60000) & (col("department") == "Engineering")
).show()

# OR -- оператор |
df.filter(
    (col("salary") > 80000) | (col("department") == "Marketing")
).show()

# NOT -- оператор ~
df.filter(~col("name").startswith("A")).show()
-- Spark SQL эквивалент
SELECT * FROM employees
WHERE salary > 60000 AND department = 'Engineering'

SELECT * FROM employees
WHERE salary > 80000 OR department = 'Marketing'

SELECT * FROM employees
WHERE name NOT LIKE 'A%'
TIP

KnowledgeCheck: Почему в PySpark используется & вместо and для логического И?

Ответ: В Python and — это оператор для скалярных значений (bool). Column-выражения PySpark — это объекты, а не булевы значения. Оператор & перегружен в классе Column для создания выражений, которые Catalyst optimizer может оптимизировать. Попытка использовать and вызовет ошибку ValueError: The truth value of a Column is ambiguous. Не забывайте скобки: (cond1) & (cond2) — приоритет & выше, чем ==.

Фильтрация с isin, between, like

# isin -- проверка вхождения в список
df.filter(col("department").isin("Engineering", "Marketing")).show()

# between -- диапазон значений
df.filter(col("salary").between(50000, 80000)).show()

# like -- паттерн LIKE из SQL
df.filter(col("name").like("A%")).show()

# isNull / isNotNull
df.filter(col("bonus").isNotNull()).show()

withColumn(): добавление и изменение колонок

withColumn() добавляет новую колонку или заменяет существующую:

from pyspark.sql.functions import col, lit, when

# Добавить колонку
df2 = df.withColumn("tax", col("salary") * 0.13)

# Изменить существующую колонку (тот же синтаксис)
df3 = df2.withColumn("salary", col("salary") * 1.1)  # повышение на 10%

# Условная колонка с when/otherwise
df4 = df.withColumn(
    "level",
    when(col("salary") > 80000, "senior")
    .when(col("salary") > 50000, "middle")
    .otherwise("junior")
)
-- Spark SQL эквивалент
SELECT *,
    salary * 0.13 AS tax,
    CASE
        WHEN salary > 80000 THEN 'senior'
        WHEN salary > 50000 THEN 'middle'
        ELSE 'junior'
    END AS level
FROM employees
WARNING

Анти-паттерн: цепочка withColumn() вместо одного select()

Каждый вызов withColumn() создаёт новый DataFrame с копированием метаданных всех колонок. Для 10+ новых колонок используйте один select():

# ПЛОХО: 5 промежуточных DataFrame
df = df.withColumn("tax", col("salary") * 0.13)
df = df.withColumn("net", col("salary") - col("tax"))
df = df.withColumn("annual", col("salary") * 12)
df = df.withColumn("bonus_pct", col("bonus") / col("salary"))
df = df.withColumn("total", col("salary") + col("bonus"))

# ХОРОШО: один select с вычисленными колонками
df = df.select(
    "*",
    (col("salary") * 0.13).alias("tax"),
    (col("salary") - col("salary") * 0.13).alias("net"),
    (col("salary") * 12).alias("annual"),
    (col("bonus") / col("salary")).alias("bonus_pct"),
    (col("salary") + col("bonus")).alias("total"),
)

Catalyst optimizer может обработать обе формы одинаково, но при десятках колонок цепочка withColumn создаёт глубокий логический план, замедляя анализ.

withColumnRenamed() и drop()

Переименование колонок

df.withColumnRenamed("salary", "base_salary").show()
-- Spark SQL эквивалент
SELECT name, department, salary AS base_salary FROM employees

Удаление колонок

df.drop("department", "bonus").show()

Цепочка трансформаций

Трансформации возвращают новый DataFrame, поэтому их можно объединять в цепочку:

result = (
    df
    .filter(col("salary") > 50000)
    .select("name", "department", "salary")
    .withColumn("tax", col("salary") * 0.13)
    .withColumnRenamed("salary", "base_salary")
    .orderBy(col("base_salary").desc())
)

result.show()
-- Spark SQL эквивалент
SELECT
    name,
    department,
    salary AS base_salary,
    salary * 0.13 AS tax
FROM employees
WHERE salary > 50000
ORDER BY base_salary DESC

Благодаря lazy evaluation, вся цепочка анализируется Catalyst optimizer как единый план, а не выполняется пошагово. Catalyst может переставить операции для оптимальности (например, перенести фильтр ближе к источнику данных).

TIP

KnowledgeCheck: Трансформации select и filter — narrow операции, не вызывающие shuffle (см. M01 урок 06). Почему это важно для производительности?

Ответ: Narrow трансформации обрабатывают данные внутри каждой партиции независимо, без обмена данными между executors. Это означает: (1) нет сетевого трафика, (2) нет записи shuffle-файлов на диск, (3) задачи выполняются параллельно без координации. Wide трансформации (groupBy, join) требуют shuffle — самой дорогой операции в Spark. Поэтому цепочка select -> filter -> withColumn выполняется значительно быстрее, чем groupBy -> join.

Что дальше?

В следующем уроке мы разберём joins — одну из самых критичных и потенциально дорогих операций в Spark. Вы узнаете все типы join (inner, left, outer, cross, semi, anti), и главное — какую стратегию join выберет Spark и как на это повлиять.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какой вариант правильно описывает разницу между select() и selectExpr() в PySpark?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8