SQL через Python
DataFusion Python предоставляет полноценный SQL-интерфейс через метод ctx.sql(). Вы пишете SQL-запросы как строки и получаете DataFrame, который можно дальше обрабатывать через DataFrame API.
Выполнение SQL
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_csv("orders", "data/orders.csv")
# SQL-запрос возвращает DataFrame
df = ctx.sql("SELECT region, SUM(amount) as total FROM orders GROUP BY region")
df.show()
Метод ctx.sql() возвращает не результат, а DataFrame. Это значит, что запрос не выполняется до вызова terminal-операции:
# Запрос НЕ выполняется --- создаётся только план
df = ctx.sql("SELECT * FROM orders WHERE amount > 1000")
# Запрос выполняется здесь
df.show()
Регистрация таблиц
Перед использованием SQL таблицы должны быть зарегистрированы в каталоге SessionContext:
Из файлов
ctx = SessionContext()
# CSV
ctx.register_csv("orders", "data/orders.csv")
# Parquet
ctx.register_parquet("events", "data/events.parquet")
# JSON (NDJSON)
ctx.register_json("logs", "data/logs.json")
Из существующего DataFrame
# Создаём DataFrame из Python-данных
data = {"name": ["Alice", "Bob"], "score": [95, 82]}
df = ctx.from_pydict(data)
# Регистрируем как таблицу для SQL
ctx.register_record_batches("students", [df.collect()])
df_sql = ctx.sql("SELECT * FROM students WHERE score > 90")
df_sql.show()
Из PyArrow Table
import pyarrow as pa
table = pa.table({
"city": ["Moscow", "Berlin", "Tokyo"],
"population": [12_600_000, 3_700_000, 14_000_000],
})
ctx.register_record_batches("cities", [table.to_batches()])
df = ctx.sql("SELECT city FROM cities WHERE population > 10000000")
df.show()
INFORMATION_SCHEMA
DataFusion поддерживает INFORMATION_SCHEMA для инспекции зарегистрированных таблиц:
ctx = SessionContext()
ctx.register_csv("orders", "data/orders.csv")
ctx.register_parquet("products", "data/products.parquet")
# Список таблиц
tables = ctx.sql("""
SELECT table_catalog, table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
tables.show()
# Колонки таблицы
columns = ctx.sql("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'orders'
""")
columns.show()
INFORMATION_SCHEMA включён по умолчанию в Python API. В Rust API его нужно включать явно через SessionConfig::with_information_schema(true).
Комбинирование SQL и DataFrame API
Одна из сильных сторон DataFusion --- возможность переключаться между SQL и DataFrame API в одном pipeline:
from datafusion import SessionContext, col, lit
from datafusion import functions as f
ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")
# Начинаем с SQL (удобно для сложных JOIN)
df = ctx.sql("""
SELECT region, status, amount, order_date
FROM orders
WHERE order_date >= '2024-01-01'
""")
# Продолжаем через DataFrame API (удобно для программной логики)
result = (
df
.filter(col("status").eq(lit("completed")))
.aggregate(
[col("region")],
[
f.sum(col("amount")).alias("total"),
f.count(col("region")).alias("count"),
],
)
.sort(col("total").sort(ascending=False))
.limit(10)
)
result.show()
Обратное направление: DataFrame в SQL
Если у вас DataFrame и нужно использовать его в SQL-запросе, зарегистрируйте его как таблицу:
# DataFrame из Python-данных
thresholds = ctx.from_pydict(
{"region": ["EU", "US", "APAC"], "min_amount": [100, 200, 150]},
name="thresholds",
)
# Теперь thresholds доступен в SQL
df = ctx.sql("""
SELECT o.region, o.amount
FROM orders o
JOIN thresholds t ON o.region = t.region
WHERE o.amount >= t.min_amount
""")
df.show()
Jupyter-рабочий процесс
В Jupyter Notebook SQL-запросы удобны для интерактивного исследования:
# Ячейка 1: Загрузка данных
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")
ctx.register_parquet("products", "data/products.parquet")
# Ячейка 2: Быстрый обзор
ctx.sql("SELECT * FROM orders LIMIT 5")
# Ячейка 3: Структура таблицы
ctx.sql("""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'orders'
""")
# Ячейка 4: Аналитика
ctx.sql("""
SELECT region,
COUNT(*) as orders,
ROUND(AVG(amount), 2) as avg_amount
FROM orders
GROUP BY region
ORDER BY avg_amount DESC
""")
В Jupyter последнее выражение в ячейке автоматически отображается как HTML-таблица. Не нужно вызывать .show() --- достаточно вернуть DataFrame.
Множественные операторы
ctx.sql() принимает один SQL-оператор. Для выполнения нескольких запросов вызывайте ctx.sql() несколько раз:
# Создание таблицы через SQL
ctx.sql("CREATE EXTERNAL TABLE logs STORED AS CSV LOCATION 'data/logs.csv'")
# Запрос к созданной таблице
df = ctx.sql("SELECT * FROM logs WHERE level = 'ERROR'")
df.show()
Итоги
ctx.sql()выполняет SQL и возвращает DataFrame --- это мост между SQL и Pythonregister_csv,register_parquet,register_json--- регистрация файлов как SQL-таблицregister_record_batches--- регистрация данных из Python/PyArrowINFORMATION_SCHEMA--- инспекция зарегистрированных таблиц и колонок- SQL и DataFrame API свободно комбинируются: SQL для сложных JOIN, DataFrame для программной логики
- В Jupyter DataFrame автоматически рендерится как HTML-таблица