PyFlink performance
PyFlink не быстрее Java Flink. Это must accept truth. Вопрос только в том — насколько медленнее, и какой ценой можно догнать. Этот урок про конкретные числа, профилирование, и решения, которые работают в production.
Мы разберём, где конкретно теряется производительность, какие техники её возвращают, и где границы — после которых даже самый агрессивно оптимизированный PyFlink уступит Java по фундаментальным причинам.
Pandas UDF в Spark: Arrow-based векторизацияГде теряются микросекунды
Полный путь одного event через PyFlink stateful UDF выглядит так:
- Java operator получает event — 50-100 нс.
- Сериализация Java -> bytes для отправки в Python — 100-500 нс.
- gRPC writeAndFlush в Python harness — 1-3 мкс (батчится).
- Python harness deserialize bytes -> Python object — 1-3 мкс.
- State read через gRPC обратно в Java (если есть) — 50-200 мкс.
- User function execution — variable, обычно 1-10 мкс для простых, больше для compute.
- State update через gRPC (если есть) — 50-200 мкс.
- Сериализация Python result -> bytes — 1-3 мкс.
- gRPC отправка обратно в Java — 1-3 мкс.
- Java deserialize bytes -> Java object — 100-500 нс.
Без state access: 5-15 мкс per event. Со state access (read + write): 105-415 мкс per event.
Сравнение с Java:
- Без state: 0.5-2 мкс per event.
- Со state: 2-10 мкс per event.
Multiplier: 5-20x для stateless, 10-50x для stateful. Это — стартовая точка. Дальше оптимизации могут вернуть часть этого разрыва.
Beam portability overhead
Beam framework (см. урок 18.1) добавляет 10-30% overhead vs гипотетическому “native Python в JVM”. Конкретные источники:
- gRPC serialization — protobuf encoding для control messages, framing для data channel.
- Bundle accumulation — Java side ждёт N events перед отправкой (бат вытаскивается из контекста).
- Per-bundle setup — Python harness инициализирует state, counters, side outputs для каждого bundle.
- Backpressure signaling — Python должен говорить Java о готовности принять next bundle, что добавляет round-trip.
Этот overhead — фундаментальный, не убирается без перепроектирования портabiltity layer. В Flink 2.x команда работает над более прямой интеграцией без full Beam stack, но это experimental.
Конфигурации, которые снижают Beam overhead:
# Больше events на bundle — меньше per-bundle setup
python.fn-execution.bundle.size: 5000 # default 1000
# Дольше bundle accumulation — больше batching benefit
python.fn-execution.bundle.time: 5000 # ms, default 1000
# Используем Arrow вместо вариативной сериализации
python.fn-execution.arrow.batch.size: 50000
# Per-slot process: больше memory, но изоляция
python.fn-execution.process-mode: per-slot
Trade-offs: больше batching — меньше overhead, но больше latency. Если ваш latency budget 100ms — bundle time 5000 ms неприемлем. Подбирайте под workload.
Vectorization: Pandas UDF benchmarks
Pandas UDF — главная техника возврата performance. Конкретные benchmarks:
| Операция | Scalar UDF | Pandas UDF | Speedup |
|---|---|---|---|
| amount * 1.13 (numeric) | 280K ops/s | 14M ops/s | 50x |
| str.upper() (string) | 220K ops/s | 4M ops/s | 18x |
| Custom regex parsing | 80K ops/s | 800K ops/s | 10x |
| sklearn predict() (10K rows) | 50K ops/s | 1.2M ops/s | 24x |
| numpy correlation | 30K ops/s | 8M ops/s | 270x |
| JSON parsing (orjson) | 120K ops/s | 600K ops/s | 5x |
Закономерности:
- Numeric операции: огромный speedup (10-300x). Numpy/pandas работают на native C, scalar UDF делает Python interpreter loop.
- String операции: средний speedup (5-20x). Pandas string operations используют C extensions, но не настолько fast как numeric.
- Object operations (JSON, custom): умеренный speedup (3-10x). Здесь основное время в логике, batching даёт меньше benefit.
- ML inference: большой speedup (10-50x) — model.predict() на batch это native vectorized operation.
Cython и Numba для hot UDF
Если Pandas UDF не дотягивает, можно компилировать Python в native code через Cython или Numba.
Cython — pre-compile Python в C extension. Требует write-once compile-step:
# fraud.pyx (Cython source)
import numpy as np
cimport numpy as np
def detect_fraud_pattern(np.ndarray[double, ndim=2] features, double threshold):
cdef int n = features.shape[0]
cdef np.ndarray[int, ndim=1] result = np.zeros(n, dtype=np.int32)
cdef int i
cdef double score
for i in range(n):
score = features[i, 0] * 0.3 + features[i, 1] * 0.7
if score {'>'} threshold:
result[i] = 1
return result
Компиляция:
cythonize -i fraud.pyx # создаёт fraud.cpython-XX.so
Использование в Pandas UDF:
from fraud import detect_fraud_pattern
@udf(result_type=DataTypes.INT(), func_type="pandas")
def fraud_udf(features: pd.Series, threshold: pd.Series) -{'>'} pd.Series:
features_arr = np.array(features.tolist())
return pd.Series(detect_fraud_pattern(features_arr, threshold[0]))
Speedup vs pure Python: 10-50x для CPU-bound logic. Vs Pandas UDF без Cython: 2-5x.
Numba — JIT compilation Python в native code в runtime. Не требует compile step:
from numba import njit
import numpy as np
@njit(cache=True)
def detect_fraud_pattern_jit(features, threshold):
n = features.shape[0]
result = np.zeros(n, dtype=np.int32)
for i in range(n):
score = features[i, 0] * 0.3 + features[i, 1] * 0.7
if score {'>'} threshold:
result[i] = 1
return result
@udf(result_type=DataTypes.INT(), func_type="pandas")
def fraud_udf(features: pd.Series, threshold: pd.Series) -{'>'} pd.Series:
features_arr = np.array(features.tolist())
return pd.Series(detect_fraud_pattern_jit(features_arr, threshold[0]))
Numba компилирует функцию при первом вызове (warmup 100-500ms), потом работает на native speed. Cache pickle-сохраняет compiled code между runs.
Numba проще в deployment (никаких .so файлов), но имеет ограничения:
- Не все Python features поддерживаются (только подмножество numpy operations).
- Нет поддержки object types — только numeric arrays.
- JIT overhead на cold start.
Сначала Pandas UDF — она даёт 80% speedup без compilation. Если этого мало — Numba (легче в deployment). Только если и этого мало — Cython (требует build step, но даёт максимум). Не начинайте с Cython.
Профилирование Python harness
Чтобы понять, где конкретно теряется время в Python harness, используем стандартные Python profilers + Flink metrics.
py-spy — sampling profiler, не требует instrumentation:
# Запускаем Flink job, находим PID Python harness
ps aux | grep python | grep sdk_worker
# Запускаем py-spy на этот PID
py-spy record -d 30 -o profile.svg --pid 12345
Output — flame graph (SVG), показывает где Python тратит CPU. Типичные observations:
- Если 40%+ в
_pickle.dumps/loads— сериализация bottleneck. Увеличьте bundle size или используйте Arrow. - Если 30%+ в
grpc._channel.*— RPC overhead. Снижайте число state RPC. - Если 20%+ в user UDF — это normal, вы pinpoint-нули user logic.
- Если 50%+ в
pandas.*— vectorized код работает, всё ок. Можете попробовать Cython/Numba для дальнейшего ускорения.
cProfile — встроенный Python profiler, требует instrumentation:
import cProfile
import pstats
class ProfiledMapFunction(MapFunction):
def open(self, runtime_context):
self.profiler = cProfile.Profile()
self.event_count = 0
def map(self, value):
self.event_count += 1
if self.event_count == 1000: # profile только первый bundle
self.profiler.enable()
result = self._actual_logic(value)
if self.event_count == 2000:
self.profiler.disable()
with open('/tmp/profile.txt', 'w') as f:
pstats.Stats(self.profiler, stream=f).sort_stats('cumulative').print_stats(50)
return result
cProfile точнее py-spy, но имеет measurement overhead (10-30%).
Flink metrics — для high-level observability:
class MyFunction(MapFunction):
def open(self, runtime_context):
self.processed_events = runtime_context.get_metrics_group().counter("processed")
self.processing_time_ms = runtime_context.get_metrics_group().histogram("processing_ms")
def map(self, value):
import time
start = time.perf_counter()
result = self._actual_logic(value)
elapsed = (time.perf_counter() - start) * 1000
self.processed_events.inc()
self.processing_time_ms.update(int(elapsed))
return result
Метрики экспортируются в Prometheus/InfluxDB через Flink metrics reporter. Дают long-term тренды и alerting.
State access optimization
Stateful Python UDF — самый дорогой случай. Каждый state read/write это RPC обратно в Java (50-200 мкс). На потоке 100K events/sec с 3 state operations per event — 30K RPC/sec. Это съест CPU.
Техники оптимизации:
1. Python state cache. Включается в config, кэширует last value per key в Python harness. Hits — без RPC, misses — fetch + cache:
python.state.cache-size: 10000 # max number of cached keys
Cache hit rate можно мониторить через metric pythonStateCache_hitRate. Хорошее значение — >70%. Если меньше — workload имеет low key locality (события одного key встречаются редко), cache не помогает.
2. Buffering в local memory. Накапливать state updates в Python-side variable, flush раз в N events:
class BufferedAggregator(KeyedProcessFunction):
def open(self, runtime_context):
self.aggregate = runtime_context.get_state(
ValueStateDescriptor("agg", Types.DOUBLE())
)
self.local_buffer = 0.0
self.flush_count = 0
def process_element(self, value, ctx):
self.local_buffer += value.amount
self.flush_count += 1
if self.flush_count {'>'}= 100:
current = self.aggregate.value() or 0.0
self.aggregate.update(current + self.local_buffer)
self.local_buffer = 0.0
self.flush_count = 0
Проблема: local_buffer не survives task restart. Подходит только для approximate aggregations (где недосчёт 100 events приемлем).
3. ValueState с complex value. Вместо нескольких ValueState — один с complex value (Row, Tuple). Один RPC доставляет всё нужное:
# ПЛОХО: 4 RPC per event
self.total = runtime_context.get_state(ValueStateDescriptor("total", Types.LONG()))
self.count = runtime_context.get_state(ValueStateDescriptor("count", Types.LONG()))
self.max = runtime_context.get_state(ValueStateDescriptor("max", Types.LONG()))
self.last_ts = runtime_context.get_state(ValueStateDescriptor("last_ts", Types.LONG()))
# ХОРОШО: 1 RPC per event
self.agg = runtime_context.get_state(
ValueStateDescriptor("agg", Types.ROW([
Types.LONG(), Types.LONG(), Types.LONG(), Types.LONG()
]))
)
Когда PyFlink, когда Java
Решение PyFlink vs Java строится на нескольких осях:
Migration path: PyFlink -> Java
Часто прототип строится на PyFlink, дальше критичные части переписываются в Java. Это не “all or nothing” — Flink поддерживает gradual migration.
Step 1: Identify hot operators. Через профилирование найдите операторы, которые тратят больше всего CPU или дают backpressure.
Step 2: Извлеките UDF в Java JAR. Напишите Java implementation того же UDF, упакуйте в JAR, добавьте в classpath PyFlink job.
# PyFlink job всё ещё в Python
t_env.create_temporary_function("fraud_detect",
JavaUserDefinedScalarFunctions("com.example.FraudDetectFunction"))
result = t_env.sql_query("""
SELECT user_id, fraud_detect(features) AS score FROM events
""")
Этот UDF работает 100% в Java. Performance восстанавливается, остальной pipeline остаётся в Python.
Step 3: Если нужно — переписывайте всё в Java. Когда стало ясно, что PyFlink не покрывает требования, переписывайте job полностью. Сохраните DDL/SQL — они одинаковы между PyFlink и Java. Imperative parts (DataStream API) — переписываются 1:1.
Real production benchmarks
Конкретные числа из реального production-кейса (fintech transaction processing):
| Version | Throughput per slot | CPU per slot | Memory per slot | P99 latency |
|---|---|---|---|---|
| Java DataStream API | 200K events/s | 100% (1 core) | 1.5 GB | 25 ms |
| PyFlink scalar UDF | 25K events/s | 100% | 2.5 GB | 180 ms |
| PyFlink Pandas UDF | 85K events/s | 80% | 2.2 GB | 60 ms |
| PyFlink + Java UDF | 180K events/s | 100% | 1.8 GB | 30 ms |
Выводы:
- PyFlink scalar UDF — 8x медленнее Java.
- Pandas UDF — 2.4x медленнее (acceptable для большинства use case).
- Java UDF в PyFlink job — почти равно Java (5-10% разница).
Для этой команды правильным выбором стало “PyFlink с Java UDF для hot path”. Они сохранили Python developer experience для большинства кода, но критичные операторы (state-heavy aggregations) написаны в Java.
Попробуй сам
-
Benchmark scalar vs Pandas UDF. Возьмите свой UDF, напишите версию scalar и Pandas. Запустите на 100K test events. Сравните throughput.
-
py-spy профилирование. Запустите PyFlink job, найдите Python harness PID, прогоните py-spy на 30 секунд. Посмотрите flame graph — что съедает CPU?
-
State cache impact. Запустите stateful job с
python.state.cache-size=0(disabled). Замерьте throughput. Поднимите до 10000. Сравните — это покажет реальный impact state caching на ваш workload.