Learning Platform
Глоссарий Troubleshooting
Урок 05.04 · 15 мин
Средний
UDFUDAFUDWFAccumulatorvolatilityudfWindowEvaluator

UDF, UDAF и UDWF

DataFusion позволяет расширять SQL и DataFrame API пользовательскими функциями. Python API поддерживает три типа: скалярные UDF, агрегатные UDAF и оконные UDWF.

Скалярные UDF

Скалярный UDF принимает одну или несколько колонок и возвращает значение для каждой строки.

Создание UDF

from datafusion import udf, SessionContext, col
import pyarrow as pa

# Определяем Python-функцию
def fahrenheit_to_celsius(f_values: pa.Array) -> pa.Array:
    """Конвертация Fahrenheit в Celsius."""
    result = [(float(v) - 32) * 5 / 9 if v is not None else None for v in f_values]
    return pa.array(result, type=pa.float64())

# Регистрируем как UDF
f_to_c = udf(
    fahrenheit_to_celsius,              # Python-функция
    [pa.float64()],                     # Типы входных аргументов
    pa.float64(),                       # Тип возвращаемого значения
    "stable",                           # Volatility (обязательный аргумент)
)
WARNING

Четвёртый аргумент volatility обязателен. Пропуск вызовет ошибку. Допустимые значения: "immutable", "stable", "volatile".

Volatility: семантика

Volatility сообщает оптимизатору, можно ли кэшировать результат функции:

Volatility: уровни
immutableНеизменяемая функция — при одинаковом входе всегда одинаковый результат, оптимизатор может кэшировать
stableСтабильная функция — результат постоянен в рамках одного запроса, но может меняться между запросами
volatileВолатильная функция — результат может меняться при каждом вызове, кэширование невозможно

Использование UDF

ctx = SessionContext()

data = {"city": ["Moscow", "Berlin", "Tokyo"], "temp_f": [50.0, 68.0, 77.0]}
df = ctx.from_pydict(data, name="weather")

# В DataFrame API
result = df.select(col("city"), f_to_c(col("temp_f")).alias("temp_c"))
result.show()
# +--------+--------+
# | city   | temp_c |
# +--------+--------+
# | Moscow | 10.0   |
# | Berlin | 20.0   |
# | Tokyo  | 25.0   |
# +--------+--------+

Регистрация UDF для SQL

# Регистрация для использования в SQL-запросах
ctx.register_udf(f_to_c)

df = ctx.sql("SELECT city, fahrenheit_to_celsius(temp_f) AS temp_c FROM weather")
df.show()
NOTE

Имя функции в SQL берётся из имени Python-функции. Для явного указания имени используйте параметр name в udf().

Агрегатные UDAF

Агрегатные функции обрабатывают набор строк и возвращают одно значение (как SUM или AVG). UDAF реализуется через класс Accumulator.

Класс Accumulator

from datafusion import Accumulator
import pyarrow as pa

class WeightedAvgAccumulator(Accumulator):
    """Средневзвешенное значение."""

    def __init__(self):
        self._sum_product = 0.0
        self._sum_weights = 0.0

    def state(self) -> list[pa.Scalar]:
        """Промежуточное состояние для параллельного выполнения."""
        return [
            pa.scalar(self._sum_product, type=pa.float64()),
            pa.scalar(self._sum_weights, type=pa.float64()),
        ]

    def update(self, values: pa.Array, weights: pa.Array) -> None:
        """Обработка новой порции данных."""
        for v, w in zip(values, weights):
            if v is not None and w is not None:
                self._sum_product += float(v) * float(w)
                self._sum_weights += float(w)

    def merge(self, states: list[pa.Array]) -> None:
        """Слияние промежуточных состояний из разных партиций."""
        self._sum_product += sum(float(s) for s in states[0] if s is not None)
        self._sum_weights += sum(float(s) for s in states[1] if s is not None)

    def evaluate(self) -> pa.Scalar:
        """Финальный результат."""
        if self._sum_weights == 0:
            return pa.scalar(None, type=pa.float64())
        return pa.scalar(self._sum_product / self._sum_weights, type=pa.float64())

Регистрация UDAF

from datafusion import udaf

weighted_avg = udaf(
    WeightedAvgAccumulator,
    [pa.float64(), pa.float64()],   # Типы входных аргументов
    pa.float64(),                    # Тип результата
    [pa.float64(), pa.float64()],   # Типы промежуточного состояния
    "immutable",                     # Volatility
)

ctx.register_udaf(weighted_avg)

Использование UDAF

data = {
    "student": ["Alice", "Alice", "Bob", "Bob"],
    "score": [90.0, 85.0, 70.0, 95.0],
    "weight": [0.4, 0.6, 0.3, 0.7],
}
df = ctx.from_pydict(data, name="grades")

# В SQL
df = ctx.sql("""
    SELECT student, weighted_avg_accumulator(score, weight) as weighted_score
    FROM grades
    GROUP BY student
""")
df.show()

Оконные UDWF

Оконные функции обрабатывают строки в рамках окна (OVER clause). UDWF реализуется через WindowEvaluator.

from datafusion import WindowEvaluator
import pyarrow as pa

class CumulativeSum(WindowEvaluator):
    """Кумулятивная сумма."""

    def __init__(self):
        self._running = 0.0

    def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
        """Вычисление для всех строк окна."""
        result = []
        running = 0.0
        for v in values[0]:
            if v is not None:
                running += float(v)
            result.append(running)
        return pa.array(result, type=pa.float64())
TIP

UDWF --- продвинутый механизм. Для большинства оконных вычислений достаточно встроенных функций (ROW_NUMBER, LAG, SUM OVER). UDWF нужен для специфической логики, не покрываемой стандартными оконными функциями.

Полный пример: система оценок

from datafusion import SessionContext, udf, col
import pyarrow as pa

ctx = SessionContext()

# UDF: буквенная оценка
def letter_grade(scores: pa.Array) -> pa.Array:
    grades = []
    for s in scores:
        if s is None:
            grades.append(None)
        elif float(s) >= 90:
            grades.append("A")
        elif float(s) >= 80:
            grades.append("B")
        elif float(s) >= 70:
            grades.append("C")
        else:
            grades.append("F")
    return pa.array(grades, type=pa.utf8())

grade_udf = udf(letter_grade, [pa.float64()], pa.utf8(), "immutable")
ctx.register_udf(grade_udf)

# Данные
data = {
    "student": ["Alice", "Bob", "Carol", "Dave"],
    "score": [95.0, 82.0, 71.0, 58.0],
}
ctx.from_pydict(data, name="results")

# SQL с UDF
df = ctx.sql("""
    SELECT student, score, letter_grade(score) as grade
    FROM results
    ORDER BY score DESC
""")
df.show()

Итоги

  • UDF --- скалярные функции: udf(func, input_types, return_type, volatility) с обязательным volatility
  • UDAF --- агрегатные функции через класс Accumulator (state/update/merge/evaluate)
  • UDWF --- оконные функции через класс WindowEvaluator
  • Volatility (immutable/stable/volatile) влияет на оптимизацию
  • UDF/UDAF регистрируются для SQL через ctx.register_udf() / ctx.register_udaf()
  • Вход и выход --- PyArrow Array/Scalar, обеспечивая zero-copy передачу данных �чу данных

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 5. Какие четыре аргумента принимает функция udf() для создания скалярной UDF в DataFusion Python?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6