Learning Platform
Глоссарий Troubleshooting
Урок 05.02 · 15 мин
Средний
DataFramefilterselectaggregatejoincollitcollectto_pandas

DataFrame API в Python

Python DataFrame API в DataFusion повторяет Rust API, сохраняя ленивую модель выполнения. Каждый метод возвращает новый DataFrame, формируя цепочку трансформаций. Данные обрабатываются только при вызове terminal-операции.

Построение выражений

Основа DataFrame API --- функции col() и lit() из модуля datafusion:

from datafusion import SessionContext, col, lit

ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")

# col --- ссылка на колонку
# lit --- литеральное значение
filtered = df.filter(col("amount") > lit(100))
DataFrame: цепочка трансформаций в Python
ctx.read_parquet(“orders.parquet”)Чтение Parquet-файла — создаёт ленивый DataFrame без загрузки данных
.filter()
DataFrame (LogicalPlan: Filter)Фильтрация — добавляет узел Filter в LogicalPlan
.select()
DataFrame (LogicalPlan: Projection)Проекция — выбор и вычисление колонок в плане
.aggregate()
DataFrame (LogicalPlan: Aggregate)Агрегация — группировка и вычисление итогов (SUM, AVG, COUNT)
.collect()
List[RecordBatch]Список PyArrow RecordBatch — результат выполнения всего плана

Фильтрация: filter()

from datafusion import SessionContext, col, lit

ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")

# Простое условие
completed = df.filter(col("status").eq(lit("completed")))

# Составные условия
large_completed = df.filter(
    col("status").eq(lit("completed")) & (col("amount") > lit(1000))
)

# Python-операторы для сравнения
recent = df.filter(col("amount") >= lit(500))
NOTE

Используйте & (AND) и | (OR) для комбинирования условий. Каждое условие оборачивайте в скобки из-за приоритета операторов Python.

Проекция: select()

# Выбор колонок
subset = df.select(col("region"), col("amount"))

# Вычисляемые колонки
with_tax = df.select(
    col("region"),
    col("amount"),
    (col("amount") * lit(0.2)).alias("tax"),
)

Метод select_columns() принимает строковые имена для простых случаев:

subset = df.select_columns("region", "amount", "status")

Агрегация: aggregate()

from datafusion import functions as f

# Группировка и агрегация
summary = df.aggregate(
    [col("region")],                    # GROUP BY
    [
        f.count(col("order_id")).alias("order_count"),
        f.sum(col("amount")).alias("total_amount"),
        f.avg(col("amount")).alias("avg_amount"),
    ],
)

Доступные агрегатные функции в модуле datafusion.functions:

from datafusion import functions as f

f.count(expr)       # Количество
f.sum(expr)         # Сумма
f.avg(expr)         # Среднее
f.min(expr)         # Минимум
f.max(expr)         # Максимум
f.count_distinct(expr)  # Уникальные значения

Соединения: join()

orders = ctx.read_parquet("data/orders.parquet")
products = ctx.read_parquet("data/products.parquet")

# INNER JOIN
joined = orders.join(
    products,
    join_keys=(["product_id"], ["id"]),
    how="inner",
)

# LEFT JOIN
left_joined = orders.join(
    products,
    join_keys=(["product_id"], ["id"]),
    how="left",
)

Параметр join_keys принимает кортеж из двух списков: колонки левой и правой таблицы. Параметр how поддерживает: inner, left, right, full, semi, anti.

Сортировка и ограничение

# Сортировка по убыванию
sorted_df = df.sort(col("amount").sort(ascending=False))

# Ограничение количества строк
top_10 = df.sort(col("amount").sort(ascending=False)).limit(10)

# Пропуск строк (offset)
page_2 = df.sort(col("amount").sort(ascending=False)).limit(10, offset=10)

Terminal-операции

Terminal-операции запускают выполнение плана и возвращают результат:

collect()

# Возвращает список RecordBatch (PyArrow)
batches = df.collect()

for batch in batches:
    print(f"Rows: {batch.num_rows}, Columns: {batch.num_columns}")

show()

# Печатает результат в табличном формате
df.show()

# С ограничением количества строк
df.show(num=5)

to_pandas()

# Конвертация в pandas DataFrame
pandas_df = df.to_pandas()
print(pandas_df.describe())

to_arrow_table()

# Конвертация в единую PyArrow Table
table = df.to_arrow_table()
print(f"Rows: {table.num_rows}")
print(table.schema)

Метаданные и отладка

schema()

# Схема DataFrame (без выполнения запроса)
schema = df.schema()
for field in schema:
    print(f"{field.name}: {field.type}")

describe()

# Статистика по числовым колонкам
stats = df.describe()
stats.show()
TIP

schema() и describe() не запускают полное вычисление. schema() возвращает схему из LogicalPlan, describe() вычисляет только агрегатные статистики.

Jupyter HTML-рендеринг

В Jupyter Notebook DataFrame автоматически отображается как HTML-таблица:

# В ячейке Jupyter --- просто вернуть DataFrame
df = ctx.sql("SELECT * FROM orders LIMIT 5")
df  # Отобразится как HTML-таблица

Полный пример: аналитический pipeline

from datafusion import SessionContext, col, lit
from datafusion import functions as f

ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")
ctx.register_parquet("products", "data/products.parquet")

# Аналитический pipeline
result = (
    ctx.table("orders")
    .join(
        ctx.table("products"),
        join_keys=(["product_id"], ["id"]),
        how="inner",
    )
    .filter(col("status").eq(lit("completed")))
    .aggregate(
        [col("category")],
        [
            f.count(col("order_id")).alias("orders"),
            f.sum(col("amount")).alias("revenue"),
            f.avg(col("amount")).alias("avg_order"),
        ],
    )
    .sort(col("revenue").sort(ascending=False))
    .limit(5)
)

# Вывод результата
result.show()

# Или конвертация в pandas для дальнейшего анализа
pandas_df = result.to_pandas()

Итоги

  • col() и lit() --- базовые строительные блоки выражений
  • filter(), select(), aggregate(), join(), sort(), limit() --- ленивые трансформации
  • collect(), show(), to_pandas(), to_arrow_table() --- terminal-операции
  • schema(), describe() --- метаданные без полного вычисления
  • Цепочка методов формирует LogicalPlan, исполняемый при terminal-вызове
PyArrow: Python ↔ Arrow мост Decision matrix: pandas / Polars / DuckDB / DataFusion-Python

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие две функции из модуля datafusion являются основой для построения выражений в DataFrame API?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6