Table API и SQL в Python
В предыдущем уроке мы разбирали DataStream API: imperative программирование, явное управление state-ом и timers. Table API даёт declarative альтернативу — SQL-подобные операции над таблицами с автоматической оптимизацией через Calcite planner.
Для Python это особенно ценно. Большая часть пайплайна компилируется в Java-операторы и работает без Python overhead. Python нужен только там, где явно вызывается user-defined function. Это даёт почти-Java performance с Python-friendly API.
Pandas UDFs и Apache Arrow в PySparkTableEnvironment и Schema
Точка входа Table API в Python:
from pyflink.table import (
TableEnvironment,
EnvironmentSettings,
Schema,
DataTypes
)
settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(settings)
# Конфигурация
t_env.get_config().set("parallelism.default", "4")
t_env.get_config().set("python.fn-execution.bundle.size", "1000")
Schema объявляется через DataTypes — это аналог TypeInformation для Table API:
schema = Schema.new_builder() \
.column("user_id", DataTypes.STRING()) \
.column("amount", DataTypes.BIGINT()) \
.column("ts", DataTypes.TIMESTAMP_LTZ(3)) \
.watermark("ts", "ts - INTERVAL '5' SECOND") \
.build()
TIMESTAMP_LTZ(3) это TIMESTAMP с миллисекундной точностью и time zone. INTERVAL '5' SECOND — SQL-синтаксис для задержки watermark. Через Schema можно явно задать time attribute и watermark стратегию.
DDL: создание таблиц
Table API позволяет описать source/sink через SQL DDL:
t_env.execute_sql("""
CREATE TABLE orders (
user_id STRING,
amount BIGINT,
ts TIMESTAMP_LTZ(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'broker:9092',
'properties.group.id' = 'flink-group',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
""")
t_env.execute_sql("""
CREATE TABLE alerts (
user_id STRING,
level STRING,
amount BIGINT,
ts TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'alerts',
'properties.bootstrap.servers' = 'broker:9092',
'format' = 'avro'
)
""")
DDL обрабатывается полностью на Java-стороне через Calcite. Python тут не работает. После создания таблицы вы можете использовать её в SQL queries.
SQL queries в Python
SQL — это hot path Table API. Запрос идёт прямо в Calcite, оптимизируется, компилируется в Java-операторы:
result = t_env.sql_query("""
SELECT user_id,
SUM(amount) AS total,
COUNT(*) AS event_count,
TUMBLE_START(ts, INTERVAL '1' HOUR) AS window_start
FROM orders
GROUP BY user_id, TUMBLE(ts, INTERVAL '1' HOUR)
""")
# Запись в sink
result.execute_insert("alerts")
Этот запрос работает полностью в Java — никакого Python в hot path. Python вызывается только если в SELECT/WHERE/JOIN явно используется UDF.
Чисто-SQL pipeline в PyFlink имеет такой же performance как чисто-SQL pipeline в Java. Это самый быстрый путь — рекомендуется для любой логики, которую можно выразить через SQL.
Scalar UDF: per-value transformation
Если нужна логика, которую SQL не выражает (custom parsing, ML inference, external API call) — пишется User-Defined Function (UDF):
from pyflink.table.udf import udf, ScalarFunction
from pyflink.table import DataTypes
@udf(result_type=DataTypes.STRING())
def normalize_phone(phone: str) -{'>'} str:
# Простая нормализация
digits = ''.join(c for c in phone if c.isdigit())
if len(digits) == 11 and digits.startswith('7'):
return '+7' + digits[1:]
return phone
# Регистрация
t_env.create_temporary_function("normalize_phone", normalize_phone)
# Использование в SQL
result = t_env.sql_query("""
SELECT user_id, normalize_phone(phone) AS phone_clean
FROM users
""")
При выполнении этого запроса для каждой row Java-сторона отправляет phone в Python harness через gRPC, ждёт результат, продолжает. Overhead — 5-30 мкс per call (с batching).
Для performance-critical UDF (миллионы calls/sec) этого overhead-а уже слишком много. Решение — Pandas UDF.
Pandas UDF: vectorized execution
Pandas UDF получает pandas.Series (batch значений), а не отдельное value. Это даёт vectorized execution через numpy/pandas, плюс резкое снижение per-event overhead:
import pandas as pd
from pyflink.table.udf import udf
from pyflink.table import DataTypes
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def calc_tax(amount: pd.Series, rate: pd.Series) -{'>'} pd.Series:
return amount * (1 + rate) # vectorized
t_env.create_temporary_function("calc_tax", calc_tax)
result = t_env.sql_query("""
SELECT user_id, calc_tax(amount, tax_rate) AS total
FROM orders_with_tax
""")
Под капотом:
- Java аккумулирует batch (1000-10000 events).
- Конвертирует batch в Apache Arrow columnar format.
- Передаёт Arrow batch в Python harness через shared memory (zero-copy).
- Python harness конвертирует Arrow в pandas DataFrame.
- UDF вызывается ONCE для всего batch.
- Результат конвертируется обратно в Arrow.
Performance vs scalar UDF: 10-50x speedup для numeric operations, 5-20x для object operations. Это близко к native Java performance.
# Пример с numpy
import numpy as np
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def cosine_similarity(vec1: pd.Series, vec2: pd.Series) -{'>'} pd.Series:
# vec1, vec2 — Series of list[float]
a = np.array(vec1.tolist())
b = np.array(vec2.tolist())
norm_a = np.linalg.norm(a, axis=1)
norm_b = np.linalg.norm(b, axis=1)
dot = np.sum(a * b, axis=1)
return pd.Series(dot / (norm_a * norm_b))
Этот UDF делает векторизованную similarity на batch из тысяч vectors. Без Pandas UDF это были бы тысячи Python calls. С Pandas UDF — один call с numpy computation.
Любой numeric UDF в production должен быть Pandas UDF. Скалярный UDF используйте только для одиночных compute (вызов внешнего API, complex if-then логика без vectorization). Pandas UDF — default choice, scalar — exception.
Table UDF: один-ко-многим
Иногда нужно из одного row сгенерировать несколько rows — например parsing JSON array, explode collections. Это Table UDF (UDTF):
from pyflink.table.udf import udtf
from pyflink.table import DataTypes
@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def split_tags(tags: str):
for tag in tags.split(','):
tag = tag.strip()
if tag:
yield tag, len(tag).__str__()
t_env.create_temporary_function("split_tags", split_tags)
result = t_env.sql_query("""
SELECT user_id, tag, tag_length
FROM orders, LATERAL TABLE(split_tags(tags)) AS T(tag, tag_length)
""")
LATERAL TABLE — это SQL способ выразить “развернуть UDTF result в дополнительные rows”. Каждый input row может породить 0..N output rows.
Под капотом UDTF использует тот же gRPC mechanism что и scalar UDF, но stream-style: Python yield-ит multiple results, Java-сторона эмитит каждый downstream.
Aggregate UDF: per-group aggregation
UDAF — это custom aggregation, аналог SUM(), AVG(), но с user logic. Например — медиана, или perсentile, или весовая агрегация:
from pyflink.table.udf import udaf, AggregateFunction
from pyflink.table import DataTypes
from pyflink.common import Row
class WeightedAvg(AggregateFunction):
def get_value(self, accumulator):
total_sum, total_weight = accumulator[0], accumulator[1]
if total_weight == 0:
return None
return total_sum / total_weight
def create_accumulator(self):
return Row(0.0, 0.0) # (sum, weight)
def accumulate(self, accumulator, value, weight):
accumulator[0] += value * weight
accumulator[1] += weight
def retract(self, accumulator, value, weight):
accumulator[0] -= value * weight
accumulator[1] -= weight
def merge(self, accumulator, others):
for other in others:
accumulator[0] += other[0]
accumulator[1] += other[1]
def get_accumulator_type(self):
return DataTypes.ROW([
DataTypes.FIELD("sum", DataTypes.DOUBLE()),
DataTypes.FIELD("weight", DataTypes.DOUBLE())
])
def get_result_type(self):
return DataTypes.DOUBLE()
weighted_avg = udaf(WeightedAvg())
t_env.create_temporary_function("weighted_avg", weighted_avg)
result = t_env.sql_query("""
SELECT user_id,
weighted_avg(price, quantity) AS weighted_price
FROM orders
GROUP BY user_id
""")
Особенности UDAF в Python:
create_accumulatorсоздаёт пустой accumulator на старте group.accumulateвызывается per event в group.retractдля retract-streams (SQL with updates/deletes).mergeдля session/sliding window merging.get_valueвозвращает финальный aggregate.
Performance: UDAF тоже идёт через gRPC. Каждое accumulate — это RPC. Для high-cardinality groupBy это дорого. Pandas UDAF (vectorized) даёт speedup, но требует Flink 2.x.
Mixing DataStream и Table API
В одном PyFlink job можно смешивать DataStream и Table API. Это полезно когда часть logic естественнее в SQL, а часть — в imperative Python:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
# Создать DataStream
stream = env.from_collection([(1, 'a'), (2, 'b'), (3, 'c')],
type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
# Конвертация в Table
table = t_env.from_data_stream(stream, Schema.new_builder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.build()
)
# SQL операция
filtered = t_env.sql_query("SELECT * FROM " + table + " WHERE id > 1")
# Обратно в DataStream для imperative logic
back_to_stream = t_env.to_data_stream(filtered)
# Processing на DataStream
back_to_stream \
.key_by(lambda r: r[0]) \
.process(MyKeyedProcessFunction()) \
.print()
Mix позволяет:
- Использовать DDL для source/sink (Table API проще).
- SQL для большинства transformations (Calcite оптимизирует).
- DataStream для специфических stateful logic (полный контроль).
Real production patterns
Pattern 1: SQL backbone + scalar UDF. 95% pipeline в SQL, scalar UDF для специфических transformations (parsing, custom format). Python harness работает мало, performance близок к Java.
Pattern 2: SQL + Pandas UDF для ML. Pre-aggregation в SQL, ML inference как Pandas UDF на batch. Это standard для ML-pipeline-ов: SQL делает feature engineering, Pandas UDF — model.predict() vectorized.
Pattern 3: DDL-only configuration. Все sources/sinks через DDL в external config file. Job code — только SQL queries. Это максимально декларативно, легко модифицируется без code changes.
Antipattern: scalar UDF в hot path с numeric logic. Это типичная ошибка — простая formula amount * 1.13 как scalar UDF добавляет 5-30 мкс на event. На больших throughput это убивает job. Всегда Pandas UDF для numeric.
# ПЛОХО: scalar UDF для simple math
@udf(result_type=DataTypes.DOUBLE())
def add_tax(amount: float) -{'>'} float:
return amount * 1.13
# ХОРОШО: Pandas UDF
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def add_tax(amount: pd.Series) -{'>'} pd.Series:
return amount * 1.13
# ИДЕАЛЬНО: чистый SQL без UDF
# SELECT amount * 1.13 AS total FROM orders
Configuration knobs
Полезные конфиги для Python Table API:
# Bundle size для UDF batching (events per gRPC call)
python.fn-execution.bundle.size: 1000
# Max bundle time (ms before flushing partial bundle)
python.fn-execution.bundle.time: 1000
# Pandas UDF: rows per Arrow batch
python.fn-execution.arrow.batch.size: 10000
# Python process memory
python.fn-execution.memory.managed: true
# Python state caching
python.state.cache-size: 1000
python.map-state.read-cache-size: 1000
python.map-state.write-cache-size: 1000
# Использовать internal mini-batch для GROUP BY
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.mini-batch.size: 5000
Mini-batch для GROUP BY — большой win. Без него каждое event обновляет aggregated state отдельно, что дорого даже в Java. С mini-batch обновления буферизуются и применяются в один pass.
Попробуй сам
-
SQL + scalar UDF. Создайте source table (Kafka), напишите SQL с GROUP BY и одним scalar UDF (custom format function). Измерьте throughput.
-
Конверт scalar в Pandas UDF. Возьмите UDF из шага 1, конвертируйте в Pandas UDF. Сравните throughput — ожидайте 5-20x улучшение.
-
UDAF для percentile. Реализуйте custom AggregateFunction для approximate quantile (используйте
numpy.quantileчерез accumulator). Тестируйте на streaming Kafka source.