DataFrame API в Python
Python DataFrame API в DataFusion повторяет Rust API, сохраняя ленивую модель выполнения. Каждый метод возвращает новый DataFrame, формируя цепочку трансформаций. Данные обрабатываются только при вызове terminal-операции.
Построение выражений
Основа DataFrame API --- функции col() и lit() из модуля datafusion:
from datafusion import SessionContext, col, lit
ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")
# col --- ссылка на колонку
# lit --- литеральное значение
filtered = df.filter(col("amount") > lit(100))
Фильтрация: filter()
from datafusion import SessionContext, col, lit
ctx = SessionContext()
df = ctx.read_parquet("data/orders.parquet")
# Простое условие
completed = df.filter(col("status").eq(lit("completed")))
# Составные условия
large_completed = df.filter(
col("status").eq(lit("completed")) & (col("amount") > lit(1000))
)
# Python-операторы для сравнения
recent = df.filter(col("amount") >= lit(500))
Используйте & (AND) и | (OR) для комбинирования условий. Каждое условие оборачивайте в скобки из-за приоритета операторов Python.
Проекция: select()
# Выбор колонок
subset = df.select(col("region"), col("amount"))
# Вычисляемые колонки
with_tax = df.select(
col("region"),
col("amount"),
(col("amount") * lit(0.2)).alias("tax"),
)
Метод select_columns() принимает строковые имена для простых случаев:
subset = df.select_columns("region", "amount", "status")
Агрегация: aggregate()
from datafusion import functions as f
# Группировка и агрегация
summary = df.aggregate(
[col("region")], # GROUP BY
[
f.count(col("order_id")).alias("order_count"),
f.sum(col("amount")).alias("total_amount"),
f.avg(col("amount")).alias("avg_amount"),
],
)
Доступные агрегатные функции в модуле datafusion.functions:
from datafusion import functions as f
f.count(expr) # Количество
f.sum(expr) # Сумма
f.avg(expr) # Среднее
f.min(expr) # Минимум
f.max(expr) # Максимум
f.count_distinct(expr) # Уникальные значения
Соединения: join()
orders = ctx.read_parquet("data/orders.parquet")
products = ctx.read_parquet("data/products.parquet")
# INNER JOIN
joined = orders.join(
products,
join_keys=(["product_id"], ["id"]),
how="inner",
)
# LEFT JOIN
left_joined = orders.join(
products,
join_keys=(["product_id"], ["id"]),
how="left",
)
Параметр join_keys принимает кортеж из двух списков: колонки левой и правой таблицы. Параметр how поддерживает: inner, left, right, full, semi, anti.
Сортировка и ограничение
# Сортировка по убыванию
sorted_df = df.sort(col("amount").sort(ascending=False))
# Ограничение количества строк
top_10 = df.sort(col("amount").sort(ascending=False)).limit(10)
# Пропуск строк (offset)
page_2 = df.sort(col("amount").sort(ascending=False)).limit(10, offset=10)
Terminal-операции
Terminal-операции запускают выполнение плана и возвращают результат:
collect()
# Возвращает список RecordBatch (PyArrow)
batches = df.collect()
for batch in batches:
print(f"Rows: {batch.num_rows}, Columns: {batch.num_columns}")
show()
# Печатает результат в табличном формате
df.show()
# С ограничением количества строк
df.show(num=5)
to_pandas()
# Конвертация в pandas DataFrame
pandas_df = df.to_pandas()
print(pandas_df.describe())
to_arrow_table()
# Конвертация в единую PyArrow Table
table = df.to_arrow_table()
print(f"Rows: {table.num_rows}")
print(table.schema)
Метаданные и отладка
schema()
# Схема DataFrame (без выполнения запроса)
schema = df.schema()
for field in schema:
print(f"{field.name}: {field.type}")
describe()
# Статистика по числовым колонкам
stats = df.describe()
stats.show()
schema() и describe() не запускают полное вычисление. schema() возвращает схему из LogicalPlan, describe() вычисляет только агрегатные статистики.
Jupyter HTML-рендеринг
В Jupyter Notebook DataFrame автоматически отображается как HTML-таблица:
# В ячейке Jupyter --- просто вернуть DataFrame
df = ctx.sql("SELECT * FROM orders LIMIT 5")
df # Отобразится как HTML-таблица
Полный пример: аналитический pipeline
from datafusion import SessionContext, col, lit
from datafusion import functions as f
ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")
ctx.register_parquet("products", "data/products.parquet")
# Аналитический pipeline
result = (
ctx.table("orders")
.join(
ctx.table("products"),
join_keys=(["product_id"], ["id"]),
how="inner",
)
.filter(col("status").eq(lit("completed")))
.aggregate(
[col("category")],
[
f.count(col("order_id")).alias("orders"),
f.sum(col("amount")).alias("revenue"),
f.avg(col("amount")).alias("avg_order"),
],
)
.sort(col("revenue").sort(ascending=False))
.limit(5)
)
# Вывод результата
result.show()
# Или конвертация в pandas для дальнейшего анализа
pandas_df = result.to_pandas()
Итоги
col()иlit()--- базовые строительные блоки выраженийfilter(),select(),aggregate(),join(),sort(),limit()--- ленивые трансформацииcollect(),show(),to_pandas(),to_arrow_table()--- terminal-операцииschema(),describe()--- метаданные без полного вычисления- Цепочка методов формирует LogicalPlan, исполняемый при terminal-вызове