Лабораторная: DataFusion Python Analytics Engine
В этом уроке вы реализуете мини аналитический движок, спроектированный в предыдущем уроке. Работа ведётся в Docker-среде с тремя упражнениями нарастающей сложности.
Подготовка среды
Требования
- Docker Desktop 4.x или выше
- 4 GB оперативной памяти
Запуск
Лабораторные файлы расположены в labs/capstone/ относительно корня курса:
cd labs/capstone
docker compose up -d --build
Первая сборка занимает 1—2 минуты: Docker устанавливает Python-зависимости (datafusion, pyarrow, pandas) и генерирует тестовые данные.
Подключение к контейнеру:
docker compose exec lab bash
Проверка:
python -c "import datafusion; print(datafusion.__version__)"
Директория exercises/ смонтирована как volume --- изменения в файлах упражнений на хост-машине сразу видны в контейнере.
Структура среды
/app/
data/
sales_events.parquet # 10 000 записей о продажах
products.parquet # 200 продуктов
exercises/
01_explore_data.py # Упражнение 1
02_custom_udf.py # Упражнение 2
03_analytics_engine.py # Упражнение 3
Упражнение 1: Исследование данных
Файл: exercises/01_explore_data.py
Цель: научиться читать Parquet, выполнять SQL-запросы и использовать DataFrame API.
Шаг 1: Регистрация данных
Файл 01_explore_data.py содержит блоки TODO. Начните с регистрации данных:
from datafusion import SessionContext
ctx = SessionContext()
ctx.register_parquet("sales", "data/sales_events.parquet")
ctx.register_parquet("products", "data/products.parquet")
register_parquet создаёт таблицу в каталоге SessionContext. После регистрации файл доступен по имени в SQL. Подробно это описано в уроке 4.1.
Шаг 2: SQL-запросы
Выполните базовые аналитические запросы:
# Первые 10 записей
ctx.sql("SELECT * FROM sales LIMIT 10").show()
# Общее количество
ctx.sql("SELECT COUNT(*) AS total FROM sales").show()
# Топ-5 категорий по выручке
ctx.sql("""
SELECT category, SUM(amount) AS revenue
FROM sales
GROUP BY category
ORDER BY revenue DESC
LIMIT 5
""").show()
SQL в DataFusion Python --- это тот же SQL, который вы изучали в модуле 3. Запрос транслируется в LogicalPlan, оптимизируется и выполняется движком.
Шаг 3: DataFrame API
Те же операции через программный интерфейс:
from datafusion import col
from datafusion import functions as f
sales_df = ctx.read_parquet("data/sales_events.parquet")
# Фильтрация
big_sales = sales_df.filter(col("amount") > 10000)
big_sales.show()
# Группировка
regional = sales_df.aggregate(
[col("region")],
[f.avg(col("amount")).alias("avg_amount"),
f.count(col("event_id")).alias("cnt")]
)
regional.show()
DataFrame API описан в уроке 4.2. SQL и DataFrame транслируются в один и тот же LogicalPlan --- выбор интерфейса зависит от задачи.
Шаг 4: Сохранение результатов
regional.write_parquet("data/regional_report.parquet")
# Проверка
verify = ctx.read_parquet("data/regional_report.parquet")
verify.show()
Если write_parquet вызывает ошибку, убедитесь что директория существует. DataFusion Python создаёт файл, но не создаёт промежуточные директории.
Упражнение 2: Пользовательские функции
Файл: exercises/02_custom_udf.py
Цель: создать скалярные UDF и агрегатные UDAF для бизнес-логики.
Скалярный UDF
UDF --- функция, которая обрабатывает каждую строку. В уроке 4.4 вы изучали создание UDF. Здесь применим это к бизнес-задаче --- классификации сумм:
from datafusion import udf
import pyarrow as pa
def classify_amount(amounts: pa.Array) -> pa.Array:
result = []
for val in amounts:
v = val.as_py()
if v is None:
result.append(None)
elif v < 1000:
result.append("small")
elif v < 10000:
result.append("medium")
else:
result.append("large")
return pa.array(result, type=pa.string())
classify_udf = udf(
classify_amount,
[pa.float64()],
pa.string(),
"immutable"
)
ctx.register_udf(classify_udf)
Четвёртый аргумент volatility обязателен. Если пропустить, DataFusion вернёт ошибку. Допустимые значения: "immutable", "stable", "volatile".
Теперь UDF доступен в SQL:
ctx.sql("""
SELECT category,
classify_amount(amount) AS size_class,
COUNT(*) AS cnt
FROM sales
GROUP BY category, classify_amount(amount)
ORDER BY category, cnt DESC
""").show()
Агрегатный UDAF
UDAF --- функция, которая агрегирует группу строк в одно значение. Реализуется через Accumulator (урок 4.4):
from datafusion import udaf, Accumulator
class WeightedAvg(Accumulator):
def __init__(self):
self._sum_vw = 0.0
self._sum_w = 0.0
def state(self) -> list[pa.Scalar]:
return [pa.scalar(self._sum_vw), pa.scalar(self._sum_w)]
def update(self, values: pa.Array, weights: pa.Array) -> None:
for v, w in zip(values, weights):
if v.is_valid and w.is_valid:
self._sum_vw += v.as_py() * w.as_py()
self._sum_w += w.as_py()
def merge(self, states: list[pa.Array]) -> None:
self._sum_vw += states[0].as_py()
self._sum_w += states[1].as_py()
def evaluate(self) -> pa.Scalar:
if self._sum_w == 0:
return pa.scalar(None)
return pa.scalar(self._sum_vw / self._sum_w)
weighted_avg = udaf(
WeightedAvg,
[pa.float64(), pa.int32()],
pa.float64(),
[pa.float64(), pa.float64()],
"immutable"
)
ctx.register_udaf(weighted_avg)
Четыре метода Accumulator:
state()--- возвращает текущее состояние как список скаляровupdate()--- принимает новые данные, обновляет состояниеmerge()--- сливает состояния из разных партицийevaluate()--- вычисляет финальный результат
Комбинированный запрос
Объедините UDF и UDAF:
ctx.sql("""
SELECT
region,
classify_amount(amount) AS size_class,
weighted_avg(amount, quantity) AS wavg,
COUNT(*) AS cnt
FROM sales
GROUP BY region, classify_amount(amount)
ORDER BY region, wavg DESC
""").show()
Упражнение 3: Мини аналитический движок
Файл: exercises/03_analytics_engine.py
Цель: собрать все компоненты в единый аналитический конвейер.
Архитектура
Упражнение реализует класс AnalyticsEngine с четырьмя этапами:
Реализация
Откройте exercises/03_analytics_engine.py. Файл содержит каркас класса с блоками TODO. Вам нужно реализовать:
- load_sources --- автоматическая регистрация всех Parquet-файлов из
data/ - register_functions --- UDF
revenue_tierдля классификации выручки - run_sales_summary --- SQL-запрос: сводка по категориям и регионам
- run_channel_analysis --- SQL-запрос: анализ каналов продаж
- run_top_users --- SQL-запрос: топ пользователей по выручке
- save_report --- сохранение DataFrame в Parquet
Подсказки
load_sources: используйте os.listdir() для поиска .parquet файлов и ctx.register_parquet() для регистрации.
register_functions: аналогично classify_amount из упражнения 2, но с другими порогами. Используйте udf() и ctx.register_udf().
Аналитические запросы: комбинируйте GROUP BY, SUM, AVG, COUNT, ORDER BY. Используйте ctx.sql() для выполнения.
save_report: метод df.write_parquet(path) сохраняет DataFrame в файл.
Запуск
python exercises/03_analytics_engine.py
Ожидаемый результат: три отчёта в data/reports/:
sales_summary.parquet--- сводка продажchannel_analysis.parquet--- анализ каналовtop_users.parquet--- топ пользователей
Проверка результатов
После выполнения всех упражнений проверьте:
from datafusion import SessionContext
import os
ctx = SessionContext()
# Проверяем отчёты
reports_dir = "data/reports"
for f in sorted(os.listdir(reports_dir)):
if f.endswith(".parquet"):
path = os.path.join(reports_dir, f)
df = ctx.read_parquet(path)
print(f"\n=== {f} ===")
df.show(5)
Завершение работы
exit # выход из контейнера
docker compose down # остановка
docker compose down --rmi local # удаление образа
Итоги
В этой лабораторной вы:
- Развернули Docker-среду с DataFusion Python
- Освоили чтение Parquet и SQL/DataFrame-запросы в контексте реальной задачи
- Создали скалярный UDF и агрегатный UDAF для бизнес-логики
- Построили аналитический движок, объединив все компоненты курса
В следующем уроке --- обзор пройденного и рекомендации по дальнейшему изучению.