Learning Platform
Глоссарий Troubleshooting
Урок 05.03 · 15 мин
Средний
ctx.sqlregister_csvregister_parquetINFORMATION_SCHEMASQL + DataFrame

SQL через Python

DataFusion Python предоставляет полноценный SQL-интерфейс через метод ctx.sql(). Вы пишете SQL-запросы как строки и получаете DataFrame, который можно дальше обрабатывать через DataFrame API.

Выполнение SQL

from datafusion import SessionContext

ctx = SessionContext()
ctx.register_csv("orders", "data/orders.csv")

# SQL-запрос возвращает DataFrame
df = ctx.sql("SELECT region, SUM(amount) as total FROM orders GROUP BY region")
df.show()

Метод ctx.sql() возвращает не результат, а DataFrame. Это значит, что запрос не выполняется до вызова terminal-операции:

# Запрос НЕ выполняется --- создаётся только план
df = ctx.sql("SELECT * FROM orders WHERE amount > 1000")

# Запрос выполняется здесь
df.show()

Регистрация таблиц

Перед использованием SQL таблицы должны быть зарегистрированы в каталоге SessionContext:

Из файлов

ctx = SessionContext()

# CSV
ctx.register_csv("orders", "data/orders.csv")

# Parquet
ctx.register_parquet("events", "data/events.parquet")

# JSON (NDJSON)
ctx.register_json("logs", "data/logs.json")

Из существующего DataFrame

# Создаём DataFrame из Python-данных
data = {"name": ["Alice", "Bob"], "score": [95, 82]}
df = ctx.from_pydict(data)

# Регистрируем как таблицу для SQL
ctx.register_record_batches("students", [df.collect()])

df_sql = ctx.sql("SELECT * FROM students WHERE score > 90")
df_sql.show()

Из PyArrow Table

import pyarrow as pa

table = pa.table({
    "city": ["Moscow", "Berlin", "Tokyo"],
    "population": [12_600_000, 3_700_000, 14_000_000],
})

ctx.register_record_batches("cities", [table.to_batches()])

df = ctx.sql("SELECT city FROM cities WHERE population > 10000000")
df.show()

INFORMATION_SCHEMA

DataFusion поддерживает INFORMATION_SCHEMA для инспекции зарегистрированных таблиц:

ctx = SessionContext()
ctx.register_csv("orders", "data/orders.csv")
ctx.register_parquet("products", "data/products.parquet")

# Список таблиц
tables = ctx.sql("""
    SELECT table_catalog, table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
""")
tables.show()

# Колонки таблицы
columns = ctx.sql("""
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_name = 'orders'
""")
columns.show()
NOTE

INFORMATION_SCHEMA включён по умолчанию в Python API. В Rust API его нужно включать явно через SessionConfig::with_information_schema(true).

Комбинирование SQL и DataFrame API

Одна из сильных сторон DataFusion --- возможность переключаться между SQL и DataFrame API в одном pipeline:

from datafusion import SessionContext, col, lit
from datafusion import functions as f

ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")

# Начинаем с SQL (удобно для сложных JOIN)
df = ctx.sql("""
    SELECT region, status, amount, order_date
    FROM orders
    WHERE order_date >= '2024-01-01'
""")

# Продолжаем через DataFrame API (удобно для программной логики)
result = (
    df
    .filter(col("status").eq(lit("completed")))
    .aggregate(
        [col("region")],
        [
            f.sum(col("amount")).alias("total"),
            f.count(col("region")).alias("count"),
        ],
    )
    .sort(col("total").sort(ascending=False))
    .limit(10)
)

result.show()
SQL + DataFrame: комбинированный pipeline
ctx.sql(“SELECT … FROM orders WHERE …“)SQL-запрос через ctx.sql() — возвращает ленивый DataFrame, а не результат
DataFrame
.filter(col(“status”).eq(lit(“completed”)))Дополнительная фильтрация через DataFrame API поверх SQL-результата
.aggregate([…], […])Агрегация — программное добавление GROUP BY к SQL-плану
.sort(…).limit(10)Сортировка и ограничение — финальные трансформации плана
.show()
РезультатВывод результата — терминальная операция запускает выполнение всего плана

Обратное направление: DataFrame в SQL

Если у вас DataFrame и нужно использовать его в SQL-запросе, зарегистрируйте его как таблицу:

# DataFrame из Python-данных
thresholds = ctx.from_pydict(
    {"region": ["EU", "US", "APAC"], "min_amount": [100, 200, 150]},
    name="thresholds",
)

# Теперь thresholds доступен в SQL
df = ctx.sql("""
    SELECT o.region, o.amount
    FROM orders o
    JOIN thresholds t ON o.region = t.region
    WHERE o.amount >= t.min_amount
""")
df.show()

Jupyter-рабочий процесс

В Jupyter Notebook SQL-запросы удобны для интерактивного исследования:

# Ячейка 1: Загрузка данных
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet("orders", "data/orders.parquet")
ctx.register_parquet("products", "data/products.parquet")

# Ячейка 2: Быстрый обзор
ctx.sql("SELECT * FROM orders LIMIT 5")

# Ячейка 3: Структура таблицы
ctx.sql("""
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_name = 'orders'
""")

# Ячейка 4: Аналитика
ctx.sql("""
    SELECT region,
           COUNT(*) as orders,
           ROUND(AVG(amount), 2) as avg_amount
    FROM orders
    GROUP BY region
    ORDER BY avg_amount DESC
""")
TIP

В Jupyter последнее выражение в ячейке автоматически отображается как HTML-таблица. Не нужно вызывать .show() --- достаточно вернуть DataFrame.

Множественные операторы

ctx.sql() принимает один SQL-оператор. Для выполнения нескольких запросов вызывайте ctx.sql() несколько раз:

# Создание таблицы через SQL
ctx.sql("CREATE EXTERNAL TABLE logs STORED AS CSV LOCATION 'data/logs.csv'")

# Запрос к созданной таблице
df = ctx.sql("SELECT * FROM logs WHERE level = 'ERROR'")
df.show()

Итоги

  • ctx.sql() выполняет SQL и возвращает DataFrame --- это мост между SQL и Python
  • register_csv, register_parquet, register_json --- регистрация файлов как SQL-таблиц
  • register_record_batches --- регистрация данных из Python/PyArrow
  • INFORMATION_SCHEMA --- инспекция зарегистрированных таблиц и колонок
  • SQL и DataFrame API свободно комбинируются: SQL для сложных JOIN, DataFrame для программной логики
  • В Jupyter DataFrame автоматически рендерится как HTML-таблица

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Что возвращает метод ctx.sql() при выполнении SQL-запроса?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6