UDF, UDAF и UDWF
DataFusion позволяет расширять SQL и DataFrame API пользовательскими функциями. Python API поддерживает три типа: скалярные UDF, агрегатные UDAF и оконные UDWF.
Скалярные UDF
Скалярный UDF принимает одну или несколько колонок и возвращает значение для каждой строки.
Создание UDF
from datafusion import udf, SessionContext, col
import pyarrow as pa
# Определяем Python-функцию
def fahrenheit_to_celsius(f_values: pa.Array) -> pa.Array:
"""Конвертация Fahrenheit в Celsius."""
result = [(float(v) - 32) * 5 / 9 if v is not None else None for v in f_values]
return pa.array(result, type=pa.float64())
# Регистрируем как UDF
f_to_c = udf(
fahrenheit_to_celsius, # Python-функция
[pa.float64()], # Типы входных аргументов
pa.float64(), # Тип возвращаемого значения
"stable", # Volatility (обязательный аргумент)
)
Четвёртый аргумент volatility обязателен. Пропуск вызовет ошибку. Допустимые значения: "immutable", "stable", "volatile".
Volatility: семантика
Volatility сообщает оптимизатору, можно ли кэшировать результат функции:
Использование UDF
ctx = SessionContext()
data = {"city": ["Moscow", "Berlin", "Tokyo"], "temp_f": [50.0, 68.0, 77.0]}
df = ctx.from_pydict(data, name="weather")
# В DataFrame API
result = df.select(col("city"), f_to_c(col("temp_f")).alias("temp_c"))
result.show()
# +--------+--------+
# | city | temp_c |
# +--------+--------+
# | Moscow | 10.0 |
# | Berlin | 20.0 |
# | Tokyo | 25.0 |
# +--------+--------+
Регистрация UDF для SQL
# Регистрация для использования в SQL-запросах
ctx.register_udf(f_to_c)
df = ctx.sql("SELECT city, fahrenheit_to_celsius(temp_f) AS temp_c FROM weather")
df.show()
Имя функции в SQL берётся из имени Python-функции. Для явного указания имени используйте параметр name в udf().
Агрегатные UDAF
Агрегатные функции обрабатывают набор строк и возвращают одно значение (как SUM или AVG). UDAF реализуется через класс Accumulator.
Класс Accumulator
from datafusion import Accumulator
import pyarrow as pa
class WeightedAvgAccumulator(Accumulator):
"""Средневзвешенное значение."""
def __init__(self):
self._sum_product = 0.0
self._sum_weights = 0.0
def state(self) -> list[pa.Scalar]:
"""Промежуточное состояние для параллельного выполнения."""
return [
pa.scalar(self._sum_product, type=pa.float64()),
pa.scalar(self._sum_weights, type=pa.float64()),
]
def update(self, values: pa.Array, weights: pa.Array) -> None:
"""Обработка новой порции данных."""
for v, w in zip(values, weights):
if v is not None and w is not None:
self._sum_product += float(v) * float(w)
self._sum_weights += float(w)
def merge(self, states: list[pa.Array]) -> None:
"""Слияние промежуточных состояний из разных партиций."""
self._sum_product += sum(float(s) for s in states[0] if s is not None)
self._sum_weights += sum(float(s) for s in states[1] if s is not None)
def evaluate(self) -> pa.Scalar:
"""Финальный результат."""
if self._sum_weights == 0:
return pa.scalar(None, type=pa.float64())
return pa.scalar(self._sum_product / self._sum_weights, type=pa.float64())
Регистрация UDAF
from datafusion import udaf
weighted_avg = udaf(
WeightedAvgAccumulator,
[pa.float64(), pa.float64()], # Типы входных аргументов
pa.float64(), # Тип результата
[pa.float64(), pa.float64()], # Типы промежуточного состояния
"immutable", # Volatility
)
ctx.register_udaf(weighted_avg)
Использование UDAF
data = {
"student": ["Alice", "Alice", "Bob", "Bob"],
"score": [90.0, 85.0, 70.0, 95.0],
"weight": [0.4, 0.6, 0.3, 0.7],
}
df = ctx.from_pydict(data, name="grades")
# В SQL
df = ctx.sql("""
SELECT student, weighted_avg_accumulator(score, weight) as weighted_score
FROM grades
GROUP BY student
""")
df.show()
Оконные UDWF
Оконные функции обрабатывают строки в рамках окна (OVER clause). UDWF реализуется через WindowEvaluator.
from datafusion import WindowEvaluator
import pyarrow as pa
class CumulativeSum(WindowEvaluator):
"""Кумулятивная сумма."""
def __init__(self):
self._running = 0.0
def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
"""Вычисление для всех строк окна."""
result = []
running = 0.0
for v in values[0]:
if v is not None:
running += float(v)
result.append(running)
return pa.array(result, type=pa.float64())
UDWF --- продвинутый механизм. Для большинства оконных вычислений достаточно встроенных функций (ROW_NUMBER, LAG, SUM OVER). UDWF нужен для специфической логики, не покрываемой стандартными оконными функциями.
Полный пример: система оценок
from datafusion import SessionContext, udf, col
import pyarrow as pa
ctx = SessionContext()
# UDF: буквенная оценка
def letter_grade(scores: pa.Array) -> pa.Array:
grades = []
for s in scores:
if s is None:
grades.append(None)
elif float(s) >= 90:
grades.append("A")
elif float(s) >= 80:
grades.append("B")
elif float(s) >= 70:
grades.append("C")
else:
grades.append("F")
return pa.array(grades, type=pa.utf8())
grade_udf = udf(letter_grade, [pa.float64()], pa.utf8(), "immutable")
ctx.register_udf(grade_udf)
# Данные
data = {
"student": ["Alice", "Bob", "Carol", "Dave"],
"score": [95.0, 82.0, 71.0, 58.0],
}
ctx.from_pydict(data, name="results")
# SQL с UDF
df = ctx.sql("""
SELECT student, score, letter_grade(score) as grade
FROM results
ORDER BY score DESC
""")
df.show()
Итоги
- UDF --- скалярные функции:
udf(func, input_types, return_type, volatility)с обязательным volatility - UDAF --- агрегатные функции через класс
Accumulator(state/update/merge/evaluate) - UDWF --- оконные функции через класс
WindowEvaluator - Volatility (
immutable/stable/volatile) влияет на оптимизацию - UDF/UDAF регистрируются для SQL через
ctx.register_udf()/ctx.register_udaf() - Вход и выход --- PyArrow Array/Scalar, обеспечивая zero-copy передачу данных �чу данных