Learning Platform
Глоссарий Troubleshooting
Урок 03.08 · 16 мин
Продвинутый
pandas APIpyspark.pandasps.DataFrameMigrationKoalasDistributed pandas

pandas API on Spark

Вы уже знаете pandas — вот как его масштабировать

Если вы работали с данными на Python, вы почти наверняка знаете pandas. Его API интуитивен, документация превосходна, экосистема огромна. Единственная проблема — pandas работает на одной машине. Когда данные перестают помещаться в RAM, pandas бессилен.

pandas API on Spark (модуль pyspark.pandas) позволяет использовать знакомый pandas-синтаксис для распределённой обработки данных. Ваш код выглядит как pandas, но выполняется на кластере Spark:

import pyspark.pandas as ps

# Выглядит как pandas, но обрабатывает терабайты
df = ps.read_parquet("/data/events/2024/")
result = df.groupby("user_id")["revenue"].sum()
result.to_parquet("/output/user_revenue/")

За кулисами pyspark.pandas транслирует pandas-операции в Spark DataFrame API, который проходит через Catalyst optimizer и выполняется распределённо.

Decision Matrix: pandas vs pandas-on-Spark vs PySpark

Выбор API зависит от объёма данных и требований к контролю:

Критерийpandaspandas on Spark (ps)Native PySpark
Размер данных< 1 ГБ (помещается в RAM)1 ГБ — 1 ТБЛюбой (ТБ+)
Среда выполненияОдна машинаКластер SparkКластер Spark
APIpd.DataFrameps.DataFramespark.DataFrame
ЗнакомостьСтандарт индустрии~90% совместимость с pandasСвой API (select, filter, groupBy)
ПроизводительностьМаксимальная для single-nodeХорошая, но overhead трансляцииМаксимальная для распределённых данных
КонтрольПолныйЧастичный (нет гарантий порядка)Полный (партиционирование, broadcast, cache)
ОптимизацияРучная (vectorized NumPy)Catalyst optimizerCatalyst + ручная оптимизация
Когда выбиратьПрототипирование, небольшие данныеМиграция pandas-кода на кластерProduction ETL, сложные пайплайны
TIP

Практическое правило выбора API:

  • Данные помещаются в память ноутбука? -> pandas
  • Данные 1-100 ГБ, и у вас есть pandas-код? -> pyspark.pandas
  • Данные > 100 ГБ, production ETL, нужен полный контроль? -> Native PySpark DataFrame API
  • Прототип на pandas, потом масштабирование? -> Начните с pandas, переключитесь на ps или PySpark

Начало работы: import pyspark.pandas as ps

import pyspark.pandas as ps

# Создание из Python-данных (аналог pd.DataFrame)
df = ps.DataFrame({
    "name": ["Alice", "Bob", "Carol", "Dave"],
    "city": ["Moscow", "SPb", "Moscow", "Kazan"],
    "salary": [80000, 65000, 75000, 70000]
})

# Знакомые pandas-операции
df.head()
df.describe()
df["salary"].mean()
df.groupby("city")["salary"].agg(["mean", "count"])

Совместимость API: что работает, а что нет

pyspark.pandas поддерживает ~90% pandas API, но есть важные отличия:

КатегорияПоддержкаПримеры
ИндексированиеЧастичнаяdf["col"], df.loc[], df.iloc[] — работают; .at[], .iat[] — ограниченно
ГруппировкаПолнаяgroupby(), agg(), transform(), apply()
Merge/JoinПолнаяpd.merge(), df.merge(), df.join()
Pivot/MeltПолнаяdf.pivot_table(), df.melt()
String opsПолнаяdf["col"].str.upper(), .str.contains(), .str.split()
DateTime opsЧастичнаяdf["col"].dt.year, .dt.month — работают; некоторые методы нет
PlottingОграниченнаяdf.plot() работает через matplotlib, но не все типы графиков
MultiIndexНе поддержанИспользуйте reset_index() для работы с flat index
Custom applyОграниченнаяdf.apply(func) работает, но медленнее чем в pandas из-за распределённого выполнения
Проверка знанийKnowledge check
Почему pyspark.pandas не поддерживает MultiIndex?
ОтветAnswer
Spark DataFrame не имеет концепции многоуровневого индекса -- данные хранятся как плоские строки с колонками. pyspark.pandas транслирует pandas-операции в Spark DataFrame API, и MultiIndex не имеет естественного маппинга в эту модель. Workaround: используйте reset_index() для превращения уровней индекса в обычные колонки, или работайте с flat index.

Миграция: от pandas к pyspark.pandas

Шаг 1: Замена импорта

# Было (pandas)
import pandas as pd

df = pd.read_csv("data.csv")

# Стало (pyspark.pandas)
import pyspark.pandas as ps

df = ps.read_csv("data.csv")

Шаг 2: Конвертация между форматами

import pandas as pd
import pyspark.pandas as ps

# pandas -> pyspark.pandas
pandas_df = pd.DataFrame({"a": [1, 2, 3]})
ps_df = ps.from_pandas(pandas_df)

# pyspark.pandas -> pandas (ОСТОРОЖНО с большими данными!)
back_to_pandas = ps_df.to_pandas()

# pyspark.pandas -> PySpark DataFrame
spark_df = ps_df.to_spark()

# PySpark DataFrame -> pyspark.pandas
ps_df_again = ps_df.pandas_api()
# или
ps_df_again = spark_df.to_pandas_on_spark()

Шаг 3: Адаптация кода

# pandas (работает на одной машине)
def process_pandas(df: pd.DataFrame) -> pd.DataFrame:
    df = df[df["amount"] > 0]                    # фильтрация
    df["amount_eur"] = df["amount"] * 0.92        # новая колонка
    summary = df.groupby("category")["amount_eur"].agg(["sum", "mean"])
    return summary.sort_values("sum", ascending=False)

# pyspark.pandas (работает на кластере, тот же синтаксис)
def process_spark_pandas(df: ps.DataFrame) -> ps.DataFrame:
    df = df[df["amount"] > 0]                    # тот же синтаксис!
    df["amount_eur"] = df["amount"] * 0.92
    summary = df.groupby("category")["amount_eur"].agg(["sum", "mean"])
    return summary.sort_values("sum", ascending=False)

Сравнение производительности: одна операция — три API

# Задача: средняя зарплата по городу для сотрудников > 30 лет

# === pandas ===
import pandas as pd

df = pd.read_parquet("employees.parquet")
result = df[df["age"] > 30].groupby("city")["salary"].mean()

# === pyspark.pandas ===
import pyspark.pandas as ps

df = ps.read_parquet("employees.parquet")
result = df[df["age"] > 30].groupby("city")["salary"].mean()

# === Native PySpark ===
from pyspark.sql.functions import col, avg
df = spark.read.parquet("employees.parquet")
result = (
    df.filter(col("age") > 30)
    .groupBy("city")
    .agg(avg("salary").alias("avg_salary"))
)

Код pandas и pyspark.pandas почти идентичен. PySpark API отличается синтаксически, но даёт больше контроля: вы явно видите трансформации, которые проходят через Catalyst.

Ограничения pyspark.pandas

1. Нет гарантии порядка строк

# pandas: порядок строк гарантирован
df = pd.DataFrame({"a": [3, 1, 2]})
print(df["a"].values)  # [3, 1, 2] -- всегда

# pyspark.pandas: порядок НЕ гарантирован
df = ps.DataFrame({"a": [3, 1, 2]})
print(df["a"].to_list())  # может быть [1, 2, 3] или [3, 1, 2]

Данные распределены по партициям — локальный порядок внутри партиции сохраняется, но глобальный порядок не определён без явной сортировки.

2. Нет RangeIndex по умолчанию

# pandas: автоматический RangeIndex (0, 1, 2, ...)
# pyspark.pandas: по умолчанию distributed sequence (медленнее)
ps.set_option("compute.default_index_type", "distributed-sequence")

3. .to_pandas() собирает все данные на driver

# ОПАСНО на больших данных!
big_df = ps.read_parquet("/data/100tb_dataset/")
pandas_df = big_df.to_pandas()  # OOM! 100 ТБ -> память одной машины

Правило: используйте .to_pandas() только после агрегации, когда результат гарантированно мал.

4. Некоторые операции медленнее

Операции, требующие глобального порядка (sort_values, iloc[n], head(n)) в pyspark.pandas медленнее, чем в pandas, так как требуют координации между партициями.

Проверка знанийKnowledge check
Почему вызов .to_pandas() на большом ps.DataFrame опасен?
ОтветAnswer
.to_pandas() собирает все данные со всех executors на driver -- одну машину. Если ps.DataFrame содержит 100 ГБ данных, все 100 ГБ будут перемещены в память driver. Типичный driver имеет 1-8 ГБ RAM -- результат: OutOfMemoryError и крах приложения. Используйте .to_pandas() только после агрегации (groupby().sum()), когда результат содержит тысячи строк, а не миллионы.

История: от Koalas к pyspark.pandas

Проект Koalas был создан Databricks в 2019 году как отдельная библиотека для pandas-совместимого API поверх Spark. В PySpark 3.2 (2021) Koalas был интегрирован в PySpark как модуль pyspark.pandas.

# Устаревший импорт (Koalas как отдельный пакет)
# import databricks.koalas as ks  # НЕ ИСПОЛЬЗУЙТЕ

# Текущий импорт (PySpark 3.2+)
import pyspark.pandas as ps

Если вы встречаете код с import databricks.koalas, замените на import pyspark.pandas as ps — API практически идентичен.

Анти-паттерн: .to_pandas() на больших данных

# ОПАСНО: collect всех данных на driver
big_result = ps_df.groupby("user_id").agg({"revenue": "sum"})
pandas_df = big_result.to_pandas()  # Если 10M пользователей -- OOM

# БЕЗОПАСНО: агрегируйте до малого результата
top_users = (
    ps_df.groupby("user_id")["revenue"]
    .sum()
    .nlargest(100)
    .to_pandas()  # Только 100 строк -- безопасно
)

Что дальше?

Поздравляем! Вы завершили модуль DataFrames + Spark SQL — от создания DataFrame до продвинутых window-функций, UDF и pandas API. Далее вас ждут квизы и практические задания для закрепления материала, а затем мы перейдём к модулю Performance и оптимизация.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Чем ps.DataFrame (pyspark.pandas) отличается от pd.DataFrame (pandas)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 8