Learning Platform
Глоссарий Troubleshooting
Урок 14.02 · 30 мин
Продвинутый
labDockerDataFusion PythonUDFUDAFParquetSQLDataFrameanalytics

Лабораторная: DataFusion Python Analytics Engine

В этом уроке вы реализуете мини аналитический движок, спроектированный в предыдущем уроке. Работа ведётся в Docker-среде с тремя упражнениями нарастающей сложности.

Подготовка среды

Требования

  • Docker Desktop 4.x или выше
  • 4 GB оперативной памяти

Запуск

Лабораторные файлы расположены в labs/capstone/ относительно корня курса:

cd labs/capstone
docker compose up -d --build

Первая сборка занимает 1—2 минуты: Docker устанавливает Python-зависимости (datafusion, pyarrow, pandas) и генерирует тестовые данные.

Подключение к контейнеру:

docker compose exec lab bash

Проверка:

python -c "import datafusion; print(datafusion.__version__)"
NOTE

Директория exercises/ смонтирована как volume --- изменения в файлах упражнений на хост-машине сразу видны в контейнере.

Структура среды

/app/
  data/
    sales_events.parquet   # 10 000 записей о продажах
    products.parquet        # 200 продуктов
  exercises/
    01_explore_data.py     # Упражнение 1
    02_custom_udf.py       # Упражнение 2
    03_analytics_engine.py # Упражнение 3

Упражнение 1: Исследование данных

Файл: exercises/01_explore_data.py

Цель: научиться читать Parquet, выполнять SQL-запросы и использовать DataFrame API.

Шаг 1: Регистрация данных

Файл 01_explore_data.py содержит блоки TODO. Начните с регистрации данных:

from datafusion import SessionContext

ctx = SessionContext()
ctx.register_parquet("sales", "data/sales_events.parquet")
ctx.register_parquet("products", "data/products.parquet")

register_parquet создаёт таблицу в каталоге SessionContext. После регистрации файл доступен по имени в SQL. Подробно это описано в уроке 4.1.

Шаг 2: SQL-запросы

Выполните базовые аналитические запросы:

# Первые 10 записей
ctx.sql("SELECT * FROM sales LIMIT 10").show()

# Общее количество
ctx.sql("SELECT COUNT(*) AS total FROM sales").show()

# Топ-5 категорий по выручке
ctx.sql("""
    SELECT category, SUM(amount) AS revenue
    FROM sales
    GROUP BY category
    ORDER BY revenue DESC
    LIMIT 5
""").show()

SQL в DataFusion Python --- это тот же SQL, который вы изучали в модуле 3. Запрос транслируется в LogicalPlan, оптимизируется и выполняется движком.

Шаг 3: DataFrame API

Те же операции через программный интерфейс:

from datafusion import col
from datafusion import functions as f

sales_df = ctx.read_parquet("data/sales_events.parquet")

# Фильтрация
big_sales = sales_df.filter(col("amount") > 10000)
big_sales.show()

# Группировка
regional = sales_df.aggregate(
    [col("region")],
    [f.avg(col("amount")).alias("avg_amount"),
     f.count(col("event_id")).alias("cnt")]
)
regional.show()

DataFrame API описан в уроке 4.2. SQL и DataFrame транслируются в один и тот же LogicalPlan --- выбор интерфейса зависит от задачи.

Шаг 4: Сохранение результатов

regional.write_parquet("data/regional_report.parquet")

# Проверка
verify = ctx.read_parquet("data/regional_report.parquet")
verify.show()
TIP

Если write_parquet вызывает ошибку, убедитесь что директория существует. DataFusion Python создаёт файл, но не создаёт промежуточные директории.

Упражнение 2: Пользовательские функции

Файл: exercises/02_custom_udf.py

Цель: создать скалярные UDF и агрегатные UDAF для бизнес-логики.

Скалярный UDF

UDF --- функция, которая обрабатывает каждую строку. В уроке 4.4 вы изучали создание UDF. Здесь применим это к бизнес-задаче --- классификации сумм:

from datafusion import udf
import pyarrow as pa

def classify_amount(amounts: pa.Array) -> pa.Array:
    result = []
    for val in amounts:
        v = val.as_py()
        if v is None:
            result.append(None)
        elif v < 1000:
            result.append("small")
        elif v < 10000:
            result.append("medium")
        else:
            result.append("large")
    return pa.array(result, type=pa.string())

classify_udf = udf(
    classify_amount,
    [pa.float64()],
    pa.string(),
    "immutable"
)
ctx.register_udf(classify_udf)
WARNING

Четвёртый аргумент volatility обязателен. Если пропустить, DataFusion вернёт ошибку. Допустимые значения: "immutable", "stable", "volatile".

Теперь UDF доступен в SQL:

ctx.sql("""
    SELECT category,
           classify_amount(amount) AS size_class,
           COUNT(*) AS cnt
    FROM sales
    GROUP BY category, classify_amount(amount)
    ORDER BY category, cnt DESC
""").show()

Агрегатный UDAF

UDAF --- функция, которая агрегирует группу строк в одно значение. Реализуется через Accumulator (урок 4.4):

from datafusion import udaf, Accumulator

class WeightedAvg(Accumulator):
    def __init__(self):
        self._sum_vw = 0.0
        self._sum_w = 0.0

    def state(self) -> list[pa.Scalar]:
        return [pa.scalar(self._sum_vw), pa.scalar(self._sum_w)]

    def update(self, values: pa.Array, weights: pa.Array) -> None:
        for v, w in zip(values, weights):
            if v.is_valid and w.is_valid:
                self._sum_vw += v.as_py() * w.as_py()
                self._sum_w += w.as_py()

    def merge(self, states: list[pa.Array]) -> None:
        self._sum_vw += states[0].as_py()
        self._sum_w += states[1].as_py()

    def evaluate(self) -> pa.Scalar:
        if self._sum_w == 0:
            return pa.scalar(None)
        return pa.scalar(self._sum_vw / self._sum_w)

weighted_avg = udaf(
    WeightedAvg,
    [pa.float64(), pa.int32()],
    pa.float64(),
    [pa.float64(), pa.float64()],
    "immutable"
)
ctx.register_udaf(weighted_avg)

Четыре метода Accumulator:

  • state() --- возвращает текущее состояние как список скаляров
  • update() --- принимает новые данные, обновляет состояние
  • merge() --- сливает состояния из разных партиций
  • evaluate() --- вычисляет финальный результат

Комбинированный запрос

Объедините UDF и UDAF:

ctx.sql("""
    SELECT
        region,
        classify_amount(amount) AS size_class,
        weighted_avg(amount, quantity) AS wavg,
        COUNT(*) AS cnt
    FROM sales
    GROUP BY region, classify_amount(amount)
    ORDER BY region, wavg DESC
""").show()

Упражнение 3: Мини аналитический движок

Файл: exercises/03_analytics_engine.py

Цель: собрать все компоненты в единый аналитический конвейер.

Архитектура

Упражнение реализует класс AnalyticsEngine с четырьмя этапами:

Конвейер AnalyticsEngine
[1] load_sources()Автоматическая регистрация всех Parquet-файлов из директории data/
[2] register_functions()Регистрация скалярных UDF и агрегатных UDAF для бизнес-логики
[3] run_sales_summary() / run_channel_analysis() / run_top_users()SQL-запросы для аналитических отчётов — сводки, каналы, топ пользователей
[4] save_report() --- Parquet-отчётыСохранение DataFrame-результатов в Parquet-файлы для дальнейшего использования

Реализация

Откройте exercises/03_analytics_engine.py. Файл содержит каркас класса с блоками TODO. Вам нужно реализовать:

  1. load_sources --- автоматическая регистрация всех Parquet-файлов из data/
  2. register_functions --- UDF revenue_tier для классификации выручки
  3. run_sales_summary --- SQL-запрос: сводка по категориям и регионам
  4. run_channel_analysis --- SQL-запрос: анализ каналов продаж
  5. run_top_users --- SQL-запрос: топ пользователей по выручке
  6. save_report --- сохранение DataFrame в Parquet

Подсказки

load_sources: используйте os.listdir() для поиска .parquet файлов и ctx.register_parquet() для регистрации.

register_functions: аналогично classify_amount из упражнения 2, но с другими порогами. Используйте udf() и ctx.register_udf().

Аналитические запросы: комбинируйте GROUP BY, SUM, AVG, COUNT, ORDER BY. Используйте ctx.sql() для выполнения.

save_report: метод df.write_parquet(path) сохраняет DataFrame в файл.

Запуск

python exercises/03_analytics_engine.py

Ожидаемый результат: три отчёта в data/reports/:

  • sales_summary.parquet --- сводка продаж
  • channel_analysis.parquet --- анализ каналов
  • top_users.parquet --- топ пользователей

Проверка результатов

После выполнения всех упражнений проверьте:

from datafusion import SessionContext
import os

ctx = SessionContext()

# Проверяем отчёты
reports_dir = "data/reports"
for f in sorted(os.listdir(reports_dir)):
    if f.endswith(".parquet"):
        path = os.path.join(reports_dir, f)
        df = ctx.read_parquet(path)
        print(f"\n=== {f} ===")
        df.show(5)

Завершение работы

exit                          # выход из контейнера
docker compose down           # остановка
docker compose down --rmi local  # удаление образа

Итоги

В этой лабораторной вы:

  • Развернули Docker-среду с DataFusion Python
  • Освоили чтение Parquet и SQL/DataFrame-запросы в контексте реальной задачи
  • Создали скалярный UDF и агрегатный UDAF для бизнес-логики
  • Построили аналитический движок, объединив все компоненты курса

В следующем уроке --- обзор пройденного и рекомендации по дальнейшему изучению.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 5. UDF classify_amount в лабораторной принимает pa.Array и возвращает pa.Array. Почему DataFusion передаёт в UDF массив значений, а не отдельные скаляры?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 3