pandas API on Spark
Вы уже знаете pandas — вот как его масштабировать
Если вы работали с данными на Python, вы почти наверняка знаете pandas. Его API интуитивен, документация превосходна, экосистема огромна. Единственная проблема — pandas работает на одной машине. Когда данные перестают помещаться в RAM, pandas бессилен.
pandas API on Spark (модуль pyspark.pandas) позволяет использовать знакомый pandas-синтаксис для распределённой обработки данных. Ваш код выглядит как pandas, но выполняется на кластере Spark:
import pyspark.pandas as ps
# Выглядит как pandas, но обрабатывает терабайты
df = ps.read_parquet("/data/events/2024/")
result = df.groupby("user_id")["revenue"].sum()
result.to_parquet("/output/user_revenue/")
За кулисами pyspark.pandas транслирует pandas-операции в Spark DataFrame API, который проходит через Catalyst optimizer и выполняется распределённо.
Decision Matrix: pandas vs pandas-on-Spark vs PySpark
Выбор API зависит от объёма данных и требований к контролю:
| Критерий | pandas | pandas on Spark (ps) | Native PySpark |
|---|---|---|---|
| Размер данных | < 1 ГБ (помещается в RAM) | 1 ГБ — 1 ТБ | Любой (ТБ+) |
| Среда выполнения | Одна машина | Кластер Spark | Кластер Spark |
| API | pd.DataFrame | ps.DataFrame | spark.DataFrame |
| Знакомость | Стандарт индустрии | ~90% совместимость с pandas | Свой API (select, filter, groupBy) |
| Производительность | Максимальная для single-node | Хорошая, но overhead трансляции | Максимальная для распределённых данных |
| Контроль | Полный | Частичный (нет гарантий порядка) | Полный (партиционирование, broadcast, cache) |
| Оптимизация | Ручная (vectorized NumPy) | Catalyst optimizer | Catalyst + ручная оптимизация |
| Когда выбирать | Прототипирование, небольшие данные | Миграция pandas-кода на кластер | Production ETL, сложные пайплайны |
Практическое правило выбора API:
- Данные помещаются в память ноутбука? -> pandas
- Данные 1-100 ГБ, и у вас есть pandas-код? ->
pyspark.pandas - Данные > 100 ГБ, production ETL, нужен полный контроль? -> Native PySpark DataFrame API
- Прототип на pandas, потом масштабирование? -> Начните с pandas, переключитесь на
psили PySpark
Начало работы: import pyspark.pandas as ps
import pyspark.pandas as ps
# Создание из Python-данных (аналог pd.DataFrame)
df = ps.DataFrame({
"name": ["Alice", "Bob", "Carol", "Dave"],
"city": ["Moscow", "SPb", "Moscow", "Kazan"],
"salary": [80000, 65000, 75000, 70000]
})
# Знакомые pandas-операции
df.head()
df.describe()
df["salary"].mean()
df.groupby("city")["salary"].agg(["mean", "count"])
Совместимость API: что работает, а что нет
pyspark.pandas поддерживает ~90% pandas API, но есть важные отличия:
| Категория | Поддержка | Примеры |
|---|---|---|
| Индексирование | Частичная | df["col"], df.loc[], df.iloc[] — работают; .at[], .iat[] — ограниченно |
| Группировка | Полная | groupby(), agg(), transform(), apply() |
| Merge/Join | Полная | pd.merge(), df.merge(), df.join() |
| Pivot/Melt | Полная | df.pivot_table(), df.melt() |
| String ops | Полная | df["col"].str.upper(), .str.contains(), .str.split() |
| DateTime ops | Частичная | df["col"].dt.year, .dt.month — работают; некоторые методы нет |
| Plotting | Ограниченная | df.plot() работает через matplotlib, но не все типы графиков |
| MultiIndex | Не поддержан | Используйте reset_index() для работы с flat index |
| Custom apply | Ограниченная | df.apply(func) работает, но медленнее чем в pandas из-за распределённого выполнения |
Миграция: от pandas к pyspark.pandas
Шаг 1: Замена импорта
# Было (pandas)
import pandas as pd
df = pd.read_csv("data.csv")
# Стало (pyspark.pandas)
import pyspark.pandas as ps
df = ps.read_csv("data.csv")
Шаг 2: Конвертация между форматами
import pandas as pd
import pyspark.pandas as ps
# pandas -> pyspark.pandas
pandas_df = pd.DataFrame({"a": [1, 2, 3]})
ps_df = ps.from_pandas(pandas_df)
# pyspark.pandas -> pandas (ОСТОРОЖНО с большими данными!)
back_to_pandas = ps_df.to_pandas()
# pyspark.pandas -> PySpark DataFrame
spark_df = ps_df.to_spark()
# PySpark DataFrame -> pyspark.pandas
ps_df_again = ps_df.pandas_api()
# или
ps_df_again = spark_df.to_pandas_on_spark()
Шаг 3: Адаптация кода
# pandas (работает на одной машине)
def process_pandas(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["amount"] > 0] # фильтрация
df["amount_eur"] = df["amount"] * 0.92 # новая колонка
summary = df.groupby("category")["amount_eur"].agg(["sum", "mean"])
return summary.sort_values("sum", ascending=False)
# pyspark.pandas (работает на кластере, тот же синтаксис)
def process_spark_pandas(df: ps.DataFrame) -> ps.DataFrame:
df = df[df["amount"] > 0] # тот же синтаксис!
df["amount_eur"] = df["amount"] * 0.92
summary = df.groupby("category")["amount_eur"].agg(["sum", "mean"])
return summary.sort_values("sum", ascending=False)
Сравнение производительности: одна операция — три API
# Задача: средняя зарплата по городу для сотрудников > 30 лет
# === pandas ===
import pandas as pd
df = pd.read_parquet("employees.parquet")
result = df[df["age"] > 30].groupby("city")["salary"].mean()
# === pyspark.pandas ===
import pyspark.pandas as ps
df = ps.read_parquet("employees.parquet")
result = df[df["age"] > 30].groupby("city")["salary"].mean()
# === Native PySpark ===
from pyspark.sql.functions import col, avg
df = spark.read.parquet("employees.parquet")
result = (
df.filter(col("age") > 30)
.groupBy("city")
.agg(avg("salary").alias("avg_salary"))
)
Код pandas и pyspark.pandas почти идентичен. PySpark API отличается синтаксически, но даёт больше контроля: вы явно видите трансформации, которые проходят через Catalyst.
Ограничения pyspark.pandas
1. Нет гарантии порядка строк
# pandas: порядок строк гарантирован
df = pd.DataFrame({"a": [3, 1, 2]})
print(df["a"].values) # [3, 1, 2] -- всегда
# pyspark.pandas: порядок НЕ гарантирован
df = ps.DataFrame({"a": [3, 1, 2]})
print(df["a"].to_list()) # может быть [1, 2, 3] или [3, 1, 2]
Данные распределены по партициям — локальный порядок внутри партиции сохраняется, но глобальный порядок не определён без явной сортировки.
2. Нет RangeIndex по умолчанию
# pandas: автоматический RangeIndex (0, 1, 2, ...)
# pyspark.pandas: по умолчанию distributed sequence (медленнее)
ps.set_option("compute.default_index_type", "distributed-sequence")
3. .to_pandas() собирает все данные на driver
# ОПАСНО на больших данных!
big_df = ps.read_parquet("/data/100tb_dataset/")
pandas_df = big_df.to_pandas() # OOM! 100 ТБ -> память одной машины
Правило: используйте .to_pandas() только после агрегации, когда результат гарантированно мал.
4. Некоторые операции медленнее
Операции, требующие глобального порядка (sort_values, iloc[n], head(n)) в pyspark.pandas медленнее, чем в pandas, так как требуют координации между партициями.
История: от Koalas к pyspark.pandas
Проект Koalas был создан Databricks в 2019 году как отдельная библиотека для pandas-совместимого API поверх Spark. В PySpark 3.2 (2021) Koalas был интегрирован в PySpark как модуль pyspark.pandas.
# Устаревший импорт (Koalas как отдельный пакет)
# import databricks.koalas as ks # НЕ ИСПОЛЬЗУЙТЕ
# Текущий импорт (PySpark 3.2+)
import pyspark.pandas as ps
Если вы встречаете код с import databricks.koalas, замените на import pyspark.pandas as ps — API практически идентичен.
Анти-паттерн: .to_pandas() на больших данных
# ОПАСНО: collect всех данных на driver
big_result = ps_df.groupby("user_id").agg({"revenue": "sum"})
pandas_df = big_result.to_pandas() # Если 10M пользователей -- OOM
# БЕЗОПАСНО: агрегируйте до малого результата
top_users = (
ps_df.groupby("user_id")["revenue"]
.sum()
.nlargest(100)
.to_pandas() # Только 100 строк -- безопасно
)
Что дальше?
Поздравляем! Вы завершили модуль DataFrames + Spark SQL — от создания DataFrame до продвинутых window-функций, UDF и pandas API. Далее вас ждут квизы и практические задания для закрепления материала, а затем мы перейдём к модулю Performance и оптимизация.