Learning Platform
Глоссарий Troubleshooting
Урок 19.03 · 25 мин
Продвинутый
PyFlink Table APISQLUDFUDAFUDTFPandas UDF

Table API и SQL в Python

В предыдущем уроке мы разбирали DataStream API: imperative программирование, явное управление state-ом и timers. Table API даёт declarative альтернативу — SQL-подобные операции над таблицами с автоматической оптимизацией через Calcite planner.

Для Python это особенно ценно. Большая часть пайплайна компилируется в Java-операторы и работает без Python overhead. Python нужен только там, где явно вызывается user-defined function. Это даёт почти-Java performance с Python-friendly API.

Pandas UDFs и Apache Arrow в PySpark

TableEnvironment и Schema

Точка входа Table API в Python:

from pyflink.table import (
    TableEnvironment,
    EnvironmentSettings,
    Schema,
    DataTypes
)

settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)

# Конфигурация
t_env.get_config().set("parallelism.default", "4")
t_env.get_config().set("python.fn-execution.bundle.size", "1000")

Schema объявляется через DataTypes — это аналог TypeInformation для Table API:

schema = Schema.new_builder() \
    .column("user_id", DataTypes.STRING()) \
    .column("amount", DataTypes.BIGINT()) \
    .column("ts", DataTypes.TIMESTAMP_LTZ(3)) \
    .watermark("ts", "ts - INTERVAL '5' SECOND") \
    .build()

TIMESTAMP_LTZ(3) это TIMESTAMP с миллисекундной точностью и time zone. INTERVAL '5' SECOND — SQL-синтаксис для задержки watermark. Через Schema можно явно задать time attribute и watermark стратегию.


DDL: создание таблиц

Table API позволяет описать source/sink через SQL DDL:

t_env.execute_sql("""
    CREATE TABLE orders (
        user_id STRING,
        amount BIGINT,
        ts TIMESTAMP_LTZ(3),
        WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders',
        'properties.bootstrap.servers' = 'broker:9092',
        'properties.group.id' = 'flink-group',
        'format' = 'json',
        'scan.startup.mode' = 'earliest-offset'
    )
""")

t_env.execute_sql("""
    CREATE TABLE alerts (
        user_id STRING,
        level STRING,
        amount BIGINT,
        ts TIMESTAMP_LTZ(3)
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'alerts',
        'properties.bootstrap.servers' = 'broker:9092',
        'format' = 'avro'
    )
""")

DDL обрабатывается полностью на Java-стороне через Calcite. Python тут не работает. После создания таблицы вы можете использовать её в SQL queries.


SQL queries в Python

SQL — это hot path Table API. Запрос идёт прямо в Calcite, оптимизируется, компилируется в Java-операторы:

result = t_env.sql_query("""
    SELECT user_id,
           SUM(amount) AS total,
           COUNT(*) AS event_count,
           TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start
    FROM orders
    GROUP BY user_id, TUMBLE(ts, INTERVAL '1' HOUR)
""")

# Запись в sink
result.execute_insert("alerts")

Этот запрос работает полностью в Java — никакого Python в hot path. Python вызывается только если в SELECT/WHERE/JOIN явно используется UDF.

Чисто-SQL pipeline в PyFlink имеет такой же performance как чисто-SQL pipeline в Java. Это самый быстрый путь — рекомендуется для любой логики, которую можно выразить через SQL.


Scalar UDF: per-value transformation

Если нужна логика, которую SQL не выражает (custom parsing, ML inference, external API call) — пишется User-Defined Function (UDF):

from pyflink.table.udf import udf, ScalarFunction
from pyflink.table import DataTypes

@udf(result_type=DataTypes.STRING())
def normalize_phone(phone: str) -{'>'} str:
    # Простая нормализация
    digits = ''.join(c for c in phone if c.isdigit())
    if len(digits) == 11 and digits.startswith('7'):
        return '+7' + digits[1:]
    return phone

# Регистрация
t_env.create_temporary_function("normalize_phone", normalize_phone)

# Использование в SQL
result = t_env.sql_query("""
    SELECT user_id, normalize_phone(phone) AS phone_clean
    FROM users
""")

При выполнении этого запроса для каждой row Java-сторона отправляет phone в Python harness через gRPC, ждёт результат, продолжает. Overhead — 5-30 мкс per call (с batching).

Для performance-critical UDF (миллионы calls/sec) этого overhead-а уже слишком много. Решение — Pandas UDF.


Pandas UDF: vectorized execution

Pandas UDF получает pandas.Series (batch значений), а не отдельное value. Это даёт vectorized execution через numpy/pandas, плюс резкое снижение per-event overhead:

import pandas as pd
from pyflink.table.udf import udf
from pyflink.table import DataTypes

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def calc_tax(amount: pd.Series, rate: pd.Series) -{'>'} pd.Series:
    return amount * (1 + rate)  # vectorized

t_env.create_temporary_function("calc_tax", calc_tax)

result = t_env.sql_query("""
    SELECT user_id, calc_tax(amount, tax_rate) AS total
    FROM orders_with_tax
""")

Под капотом:

  1. Java аккумулирует batch (1000-10000 events).
  2. Конвертирует batch в Apache Arrow columnar format.
  3. Передаёт Arrow batch в Python harness через shared memory (zero-copy).
  4. Python harness конвертирует Arrow в pandas DataFrame.
  5. UDF вызывается ONCE для всего batch.
  6. Результат конвертируется обратно в Arrow.

Performance vs scalar UDF: 10-50x speedup для numeric operations, 5-20x для object operations. Это близко к native Java performance.

# Пример с numpy
import numpy as np

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def cosine_similarity(vec1: pd.Series, vec2: pd.Series) -{'>'} pd.Series:
    # vec1, vec2 — Series of list[float]
    a = np.array(vec1.tolist())
    b = np.array(vec2.tolist())
    norm_a = np.linalg.norm(a, axis=1)
    norm_b = np.linalg.norm(b, axis=1)
    dot = np.sum(a * b, axis=1)
    return pd.Series(dot / (norm_a * norm_b))

Этот UDF делает векторизованную similarity на batch из тысяч vectors. Без Pandas UDF это были бы тысячи Python calls. С Pandas UDF — один call с numpy computation.

TIP

Любой numeric UDF в production должен быть Pandas UDF. Скалярный UDF используйте только для одиночных compute (вызов внешнего API, complex if-then логика без vectorization). Pandas UDF — default choice, scalar — exception.


Scalar UDF vs Pandas UDF: cost breakdown
Sliding window 1M events/sec, simple math UDF (amount * 1.13)
Scalar UDF: 1M calls/secScalar UDF: каждое значение пересылается отдельно через gRPC (с batching внутри). Per-event overhead 5-30 мкс. На 1M events/sec — CPU spend 50-100% one core just для marshalling.
cost: 50-100% core
Python harness: bottleneckPython harness: spent в большей части в gRPC deserialize + serialize. User function calls — лишь fraction. Performance ceiling около 500K-1M events/sec per UDF на slot.
Pandas UDF: 100 calls/sec (batches)Pandas UDF: Java аккумулирует batch до 10K events. Один gRPC call передаёт всё в Arrow shared memory. Python harness получает pandas DataFrame, обрабатывает vectorized, возвращает.
cost: 5-10% core
Numpy vectorizedPython harness: 99% spent в numpy/pandas native code. gRPC overhead amortized по 10K events. Per-event cost снижается с 5-30 мкс до 0.5-3 мкс.

Table UDF: один-ко-многим

Иногда нужно из одного row сгенерировать несколько rows — например parsing JSON array, explode collections. Это Table UDF (UDTF):

from pyflink.table.udf import udtf
from pyflink.table import DataTypes

@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split_tags(tags: str):
    for tag in tags.split(','):
        tag = tag.strip()
        if tag:
            yield tag, len(tag).__str__()

t_env.create_temporary_function("split_tags", split_tags)

result = t_env.sql_query("""
    SELECT user_id, tag, tag_length
    FROM orders, LATERAL TABLE(split_tags(tags)) AS T(tag, tag_length)
""")

LATERAL TABLE — это SQL способ выразить “развернуть UDTF result в дополнительные rows”. Каждый input row может породить 0..N output rows.

Под капотом UDTF использует тот же gRPC mechanism что и scalar UDF, но stream-style: Python yield-ит multiple results, Java-сторона эмитит каждый downstream.


Aggregate UDF: per-group aggregation

UDAF — это custom aggregation, аналог SUM(), AVG(), но с user logic. Например — медиана, или perсentile, или весовая агрегация:

from pyflink.table.udf import udaf, AggregateFunction
from pyflink.table import DataTypes
from pyflink.common import Row

class WeightedAvg(AggregateFunction):
    def get_value(self, accumulator):
        total_sum, total_weight = accumulator[0], accumulator[1]
        if total_weight == 0:
            return None
        return total_sum / total_weight

    def create_accumulator(self):
        return Row(0.0, 0.0)  # (sum, weight)

    def accumulate(self, accumulator, value, weight):
        accumulator[0] += value * weight
        accumulator[1] += weight

    def retract(self, accumulator, value, weight):
        accumulator[0] -= value * weight
        accumulator[1] -= weight

    def merge(self, accumulator, others):
        for other in others:
            accumulator[0] += other[0]
            accumulator[1] += other[1]

    def get_accumulator_type(self):
        return DataTypes.ROW([
            DataTypes.FIELD("sum", DataTypes.DOUBLE()),
            DataTypes.FIELD("weight", DataTypes.DOUBLE())
        ])

    def get_result_type(self):
        return DataTypes.DOUBLE()

weighted_avg = udaf(WeightedAvg())
t_env.create_temporary_function("weighted_avg", weighted_avg)

result = t_env.sql_query("""
    SELECT user_id,
           weighted_avg(price, quantity) AS weighted_price
    FROM orders
    GROUP BY user_id
""")

Особенности UDAF в Python:

  • create_accumulator создаёт пустой accumulator на старте group.
  • accumulate вызывается per event в group.
  • retract для retract-streams (SQL with updates/deletes).
  • merge для session/sliding window merging.
  • get_value возвращает финальный aggregate.

Performance: UDAF тоже идёт через gRPC. Каждое accumulate — это RPC. Для high-cardinality groupBy это дорого. Pandas UDAF (vectorized) даёт speedup, но требует Flink 2.x.


Mixing DataStream и Table API

В одном PyFlink job можно смешивать DataStream и Table API. Это полезно когда часть logic естественнее в SQL, а часть — в imperative Python:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)

# Создать DataStream
stream = env.from_collection([(1, 'a'), (2, 'b'), (3, 'c')],
                              type_info=Types.TUPLE([Types.INT(), Types.STRING()]))

# Конвертация в Table
table = t_env.from_data_stream(stream, Schema.new_builder()
    .column("id", DataTypes.INT())
    .column("name", DataTypes.STRING())
    .build()
)

# SQL операция
filtered = t_env.sql_query("SELECT * FROM " + table + " WHERE id > 1")

# Обратно в DataStream для imperative logic
back_to_stream = t_env.to_data_stream(filtered)

# Processing на DataStream
back_to_stream \
    .key_by(lambda r: r[0]) \
    .process(MyKeyedProcessFunction()) \
    .print()

Mix позволяет:

  • Использовать DDL для source/sink (Table API проще).
  • SQL для большинства transformations (Calcite оптимизирует).
  • DataStream для специфических stateful logic (полный контроль).

Real production patterns

Pattern 1: SQL backbone + scalar UDF. 95% pipeline в SQL, scalar UDF для специфических transformations (parsing, custom format). Python harness работает мало, performance близок к Java.

Pattern 2: SQL + Pandas UDF для ML. Pre-aggregation в SQL, ML inference как Pandas UDF на batch. Это standard для ML-pipeline-ов: SQL делает feature engineering, Pandas UDF — model.predict() vectorized.

Pattern 3: DDL-only configuration. Все sources/sinks через DDL в external config file. Job code — только SQL queries. Это максимально декларативно, легко модифицируется без code changes.

Antipattern: scalar UDF в hot path с numeric logic. Это типичная ошибка — простая formula amount * 1.13 как scalar UDF добавляет 5-30 мкс на event. На больших throughput это убивает job. Всегда Pandas UDF для numeric.

# ПЛОХО: scalar UDF для simple math
@udf(result_type=DataTypes.DOUBLE())
def add_tax(amount: float) -{'>'} float:
    return amount * 1.13

# ХОРОШО: Pandas UDF
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def add_tax(amount: pd.Series) -{'>'} pd.Series:
    return amount * 1.13

# ИДЕАЛЬНО: чистый SQL без UDF
# SELECT amount * 1.13 AS total FROM orders

Configuration knobs

Полезные конфиги для Python Table API:

# Bundle size для UDF batching (events per gRPC call)
python.fn-execution.bundle.size: 1000

# Max bundle time (ms before flushing partial bundle)
python.fn-execution.bundle.time: 1000

# Pandas UDF: rows per Arrow batch
python.fn-execution.arrow.batch.size: 10000

# Python process memory
python.fn-execution.memory.managed: true

# Python state caching
python.state.cache-size: 1000
python.map-state.read-cache-size: 1000
python.map-state.write-cache-size: 1000

# Использовать internal mini-batch для GROUP BY
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.mini-batch.size: 5000

Mini-batch для GROUP BY — большой win. Без него каждое event обновляет aggregated state отдельно, что дорого даже в Java. С mini-batch обновления буферизуются и применяются в один pass.


Попробуй сам

  1. SQL + scalar UDF. Создайте source table (Kafka), напишите SQL с GROUP BY и одним scalar UDF (custom format function). Измерьте throughput.

  2. Конверт scalar в Pandas UDF. Возьмите UDF из шага 1, конвертируйте в Pandas UDF. Сравните throughput — ожидайте 5-20x улучшение.

  3. UDAF для percentile. Реализуйте custom AggregateFunction для approximate quantile (используйте numpy.quantile через accumulator). Тестируйте на streaming Kafka source.

Проверка знанийKnowledge check
У вас PyFlink Table API job, который делает ML inference. Pipeline: Kafka source -> SELECT с pandas UDF model.predict() -> Kafka sink. Throughput ожидаемый 100K events/sec, фактический 8K events/sec. Профилирование показывает что 70% времени в Python, и только 20% из этого — собственно model.predict(). Остальные 50% — где-то в Arrow conversion и gRPC. Что можно оптимизировать?
ОтветAnswer
Несколько направлений: (1) Bundle size — поднять python.fn-execution.arrow.batch.size с дефолта (10K) до 50K-100K. Больше events на один Arrow batch — меньше Arrow conversion overhead per event. Trade-off — latency растёт на 100-500ms. (2) Bundle time — поднять python.fn-execution.bundle.time до 5-10 секунд. Это даёт harness больше времени накопить batch перед обработкой. (3) Pre-process features в SQL — если model.predict() получает derived features, попробовать вычислять их в SQL (CAST, ROUND, простые формулы), оставляя UDF только для финального .predict(). Меньше columns в Arrow batch = меньше memory bandwidth. (4) Model batching — внутри UDF делать model.predict() сразу на всю pandas DataFrame, а не итерировать по rows. Большинство ML libraries (sklearn, xgboost, tensorflow) поддерживают batch prediction нативно. (5) Простые модели — если модель maленькая (linear, tree, GBM), стоит переписать её inference на чистый numpy/pandas без model object — это убирает Python object overhead. (6) Если ничего не помогло — сериализовать model в ONNX, использовать ONNX Runtime через Java-side UDF — Python harness вообще убираем, model.predict() работает в JVM native code. Это требует больше работы, но даёт 5-10x speedup для production-critical workload.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Почему Table API + SQL в PyFlink часто работает быстрее DataStream API даже с теми же business operations?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4