Learning Platform
Глоссарий Troubleshooting
Урок 19.04 · 25 мин
Продвинутый
Beam OverheadPandas VectorizationCythonProfilingPerformance Tuning

PyFlink performance

PyFlink не быстрее Java Flink. Это must accept truth. Вопрос только в том — насколько медленнее, и какой ценой можно догнать. Этот урок про конкретные числа, профилирование, и решения, которые работают в production.

Мы разберём, где конкретно теряется производительность, какие техники её возвращают, и где границы — после которых даже самый агрессивно оптимизированный PyFlink уступит Java по фундаментальным причинам.

Pandas UDF в Spark: Arrow-based векторизация

Где теряются микросекунды

Полный путь одного event через PyFlink stateful UDF выглядит так:

  1. Java operator получает event — 50-100 нс.
  2. Сериализация Java -> bytes для отправки в Python — 100-500 нс.
  3. gRPC writeAndFlush в Python harness — 1-3 мкс (батчится).
  4. Python harness deserialize bytes -> Python object — 1-3 мкс.
  5. State read через gRPC обратно в Java (если есть) — 50-200 мкс.
  6. User function execution — variable, обычно 1-10 мкс для простых, больше для compute.
  7. State update через gRPC (если есть) — 50-200 мкс.
  8. Сериализация Python result -> bytes — 1-3 мкс.
  9. gRPC отправка обратно в Java — 1-3 мкс.
  10. Java deserialize bytes -> Java object — 100-500 нс.

Без state access: 5-15 мкс per event. Со state access (read + write): 105-415 мкс per event.

Сравнение с Java:

  • Без state: 0.5-2 мкс per event.
  • Со state: 2-10 мкс per event.

Multiplier: 5-20x для stateless, 10-50x для stateful. Это — стартовая точка. Дальше оптимизации могут вернуть часть этого разрыва.


Beam portability overhead

Beam framework (см. урок 18.1) добавляет 10-30% overhead vs гипотетическому “native Python в JVM”. Конкретные источники:

  • gRPC serialization — protobuf encoding для control messages, framing для data channel.
  • Bundle accumulation — Java side ждёт N events перед отправкой (бат вытаскивается из контекста).
  • Per-bundle setup — Python harness инициализирует state, counters, side outputs для каждого bundle.
  • Backpressure signaling — Python должен говорить Java о готовности принять next bundle, что добавляет round-trip.

Этот overhead — фундаментальный, не убирается без перепроектирования портabiltity layer. В Flink 2.x команда работает над более прямой интеграцией без full Beam stack, но это experimental.

Конфигурации, которые снижают Beam overhead:

# Больше events на bundle — меньше per-bundle setup
python.fn-execution.bundle.size: 5000  # default 1000

# Дольше bundle accumulation — больше batching benefit
python.fn-execution.bundle.time: 5000  # ms, default 1000

# Используем Arrow вместо вариативной сериализации
python.fn-execution.arrow.batch.size: 50000

# Per-slot process: больше memory, но изоляция
python.fn-execution.process-mode: per-slot

Trade-offs: больше batching — меньше overhead, но больше latency. Если ваш latency budget 100ms — bundle time 5000 ms неприемлем. Подбирайте под workload.


Vectorization: Pandas UDF benchmarks

Pandas UDF — главная техника возврата performance. Конкретные benchmarks:

ОперацияScalar UDFPandas UDFSpeedup
amount * 1.13 (numeric)280K ops/s14M ops/s50x
str.upper() (string)220K ops/s4M ops/s18x
Custom regex parsing80K ops/s800K ops/s10x
sklearn predict() (10K rows)50K ops/s1.2M ops/s24x
numpy correlation30K ops/s8M ops/s270x
JSON parsing (orjson)120K ops/s600K ops/s5x

Закономерности:

  • Numeric операции: огромный speedup (10-300x). Numpy/pandas работают на native C, scalar UDF делает Python interpreter loop.
  • String операции: средний speedup (5-20x). Pandas string operations используют C extensions, но не настолько fast как numeric.
  • Object operations (JSON, custom): умеренный speedup (3-10x). Здесь основное время в логике, batching даёт меньше benefit.
  • ML inference: большой speedup (10-50x) — model.predict() на batch это native vectorized operation.

Cython и Numba для hot UDF

Если Pandas UDF не дотягивает, можно компилировать Python в native code через Cython или Numba.

Cython — pre-compile Python в C extension. Требует write-once compile-step:

# fraud.pyx (Cython source)
import numpy as np
cimport numpy as np

def detect_fraud_pattern(np.ndarray[double, ndim=2] features, double threshold):
    cdef int n = features.shape[0]
    cdef np.ndarray[int, ndim=1] result = np.zeros(n, dtype=np.int32)
    cdef int i
    cdef double score
    for i in range(n):
        score = features[i, 0] * 0.3 + features[i, 1] * 0.7
        if score {'>'} threshold:
            result[i] = 1
    return result

Компиляция:

cythonize -i fraud.pyx  # создаёт fraud.cpython-XX.so

Использование в Pandas UDF:

from fraud import detect_fraud_pattern

@udf(result_type=DataTypes.INT(), func_type="pandas")
def fraud_udf(features: pd.Series, threshold: pd.Series) -{'>'} pd.Series:
    features_arr = np.array(features.tolist())
    return pd.Series(detect_fraud_pattern(features_arr, threshold[0]))

Speedup vs pure Python: 10-50x для CPU-bound logic. Vs Pandas UDF без Cython: 2-5x.

Numba — JIT compilation Python в native code в runtime. Не требует compile step:

from numba import njit
import numpy as np

@njit(cache=True)
def detect_fraud_pattern_jit(features, threshold):
    n = features.shape[0]
    result = np.zeros(n, dtype=np.int32)
    for i in range(n):
        score = features[i, 0] * 0.3 + features[i, 1] * 0.7
        if score {'>'} threshold:
            result[i] = 1
    return result

@udf(result_type=DataTypes.INT(), func_type="pandas")
def fraud_udf(features: pd.Series, threshold: pd.Series) -{'>'} pd.Series:
    features_arr = np.array(features.tolist())
    return pd.Series(detect_fraud_pattern_jit(features_arr, threshold[0]))

Numba компилирует функцию при первом вызове (warmup 100-500ms), потом работает на native speed. Cache pickle-сохраняет compiled code между runs.

Numba проще в deployment (никаких .so файлов), но имеет ограничения:

  • Не все Python features поддерживаются (только подмножество numpy operations).
  • Нет поддержки object types — только numeric arrays.
  • JIT overhead на cold start.
TIP

Сначала Pandas UDF — она даёт 80% speedup без compilation. Если этого мало — Numba (легче в deployment). Только если и этого мало — Cython (требует build step, но даёт максимум). Не начинайте с Cython.


Профилирование Python harness

Чтобы понять, где конкретно теряется время в Python harness, используем стандартные Python profilers + Flink metrics.

py-spy — sampling profiler, не требует instrumentation:

# Запускаем Flink job, находим PID Python harness
ps aux | grep python | grep sdk_worker

# Запускаем py-spy на этот PID
py-spy record -d 30 -o profile.svg --pid 12345

Output — flame graph (SVG), показывает где Python тратит CPU. Типичные observations:

  • Если 40%+ в _pickle.dumps/loads — сериализация bottleneck. Увеличьте bundle size или используйте Arrow.
  • Если 30%+ в grpc._channel.* — RPC overhead. Снижайте число state RPC.
  • Если 20%+ в user UDF — это normal, вы pinpoint-нули user logic.
  • Если 50%+ в pandas.* — vectorized код работает, всё ок. Можете попробовать Cython/Numba для дальнейшего ускорения.

cProfile — встроенный Python profiler, требует instrumentation:

import cProfile
import pstats

class ProfiledMapFunction(MapFunction):
    def open(self, runtime_context):
        self.profiler = cProfile.Profile()
        self.event_count = 0

    def map(self, value):
        self.event_count += 1
        if self.event_count == 1000:  # profile только первый bundle
            self.profiler.enable()
        result = self._actual_logic(value)
        if self.event_count == 2000:
            self.profiler.disable()
            with open('/tmp/profile.txt', 'w') as f:
                pstats.Stats(self.profiler, stream=f).sort_stats('cumulative').print_stats(50)
        return result

cProfile точнее py-spy, но имеет measurement overhead (10-30%).

Flink metrics — для high-level observability:

class MyFunction(MapFunction):
    def open(self, runtime_context):
        self.processed_events = runtime_context.get_metrics_group().counter("processed")
        self.processing_time_ms = runtime_context.get_metrics_group().histogram("processing_ms")

    def map(self, value):
        import time
        start = time.perf_counter()
        result = self._actual_logic(value)
        elapsed = (time.perf_counter() - start) * 1000
        self.processed_events.inc()
        self.processing_time_ms.update(int(elapsed))
        return result

Метрики экспортируются в Prometheus/InfluxDB через Flink metrics reporter. Дают long-term тренды и alerting.


State access optimization

Stateful Python UDF — самый дорогой случай. Каждый state read/write это RPC обратно в Java (50-200 мкс). На потоке 100K events/sec с 3 state operations per event — 30K RPC/sec. Это съест CPU.

Техники оптимизации:

1. Python state cache. Включается в config, кэширует last value per key в Python harness. Hits — без RPC, misses — fetch + cache:

python.state.cache-size: 10000  # max number of cached keys

Cache hit rate можно мониторить через metric pythonStateCache_hitRate. Хорошее значение — >70%. Если меньше — workload имеет low key locality (события одного key встречаются редко), cache не помогает.

2. Buffering в local memory. Накапливать state updates в Python-side variable, flush раз в N events:

class BufferedAggregator(KeyedProcessFunction):
    def open(self, runtime_context):
        self.aggregate = runtime_context.get_state(
            ValueStateDescriptor("agg", Types.DOUBLE())
        )
        self.local_buffer = 0.0
        self.flush_count = 0

    def process_element(self, value, ctx):
        self.local_buffer += value.amount
        self.flush_count += 1

        if self.flush_count {'>'}= 100:
            current = self.aggregate.value() or 0.0
            self.aggregate.update(current + self.local_buffer)
            self.local_buffer = 0.0
            self.flush_count = 0

Проблема: local_buffer не survives task restart. Подходит только для approximate aggregations (где недосчёт 100 events приемлем).

3. ValueState с complex value. Вместо нескольких ValueState — один с complex value (Row, Tuple). Один RPC доставляет всё нужное:

# ПЛОХО: 4 RPC per event
self.total = runtime_context.get_state(ValueStateDescriptor("total", Types.LONG()))
self.count = runtime_context.get_state(ValueStateDescriptor("count", Types.LONG()))
self.max = runtime_context.get_state(ValueStateDescriptor("max", Types.LONG()))
self.last_ts = runtime_context.get_state(ValueStateDescriptor("last_ts", Types.LONG()))

# ХОРОШО: 1 RPC per event
self.agg = runtime_context.get_state(
    ValueStateDescriptor("agg", Types.ROW([
        Types.LONG(), Types.LONG(), Types.LONG(), Types.LONG()
    ]))
)

Решение PyFlink vs Java строится на нескольких осях:

Decision matrix: PyFlink vs Java
PyFlink WINPyFlink win zone: ML inference (pandas/numpy/sklearn), prototype scripts, command line tools для batch processing, Data scientist-friendly работа. Throughput до 100K-500K events/sec на slot для simple operations.
Mixed (SQL + UDF)Mixed zone: medium throughput (100K-1M events/sec), стандартный SQL processing с лишь несколькими Python UDF, нужно сохранить Python для конкретных UDF (например custom parsing legacy data).
Java WINJava win zone: high throughput (1M+ events/sec), stateful hot path с миллионами active keys, ultra-low latency требования (p99 под 10ms), сложные windowing patterns.
ML inferenceML inference в pipeline: загрузить модель, предсказать. Естественная задача для Python — все ML libraries здесь. PyFlink + Pandas UDF близок к Java performance для batch ML.
Legacy parsingCustom parsing legacy форматов: scientific data formats, sigil-encoded text. Python ecosystem rich для exotic форматов. UDF делает parsing, остальное в SQL.
Hot statefulStateful hot path с RPC overhead — антипаттерн для PyFlink. Каждый state access = 50-200 мкс. На high QPS это убивает throughput. Переписывать в Java — единственный fix.
PrototypingPrototyping: iterative разработка, тестирование идей. Python скорость разработки в 2-3x быстрее Java. Прототип в PyFlink, prod в Java — нормальная workflow.
Pure SQLПростые SQL pipeline-ы без UDF: PyFlink и Java одинаковы (всё работает в Java через Calcite). Выбор — по команде, не по performance.
Ultra low latencyUltra-low latency (sub-10ms p99): Beam overhead 5-30 мкс на event — это уже значимая доля. Java оператор работает в 1-2 мкс. PyFlink не покрывает этот use case.

Часто прототип строится на PyFlink, дальше критичные части переписываются в Java. Это не “all or nothing” — Flink поддерживает gradual migration.

Step 1: Identify hot operators. Через профилирование найдите операторы, которые тратят больше всего CPU или дают backpressure.

Step 2: Извлеките UDF в Java JAR. Напишите Java implementation того же UDF, упакуйте в JAR, добавьте в classpath PyFlink job.

# PyFlink job всё ещё в Python
t_env.create_temporary_function("fraud_detect",
    JavaUserDefinedScalarFunctions("com.example.FraudDetectFunction"))

result = t_env.sql_query("""
    SELECT user_id, fraud_detect(features) AS score FROM events
""")

Этот UDF работает 100% в Java. Performance восстанавливается, остальной pipeline остаётся в Python.

Step 3: Если нужно — переписывайте всё в Java. Когда стало ясно, что PyFlink не покрывает требования, переписывайте job полностью. Сохраните DDL/SQL — они одинаковы между PyFlink и Java. Imperative parts (DataStream API) — переписываются 1:1.


Real production benchmarks

Конкретные числа из реального production-кейса (fintech transaction processing):

VersionThroughput per slotCPU per slotMemory per slotP99 latency
Java DataStream API200K events/s100% (1 core)1.5 GB25 ms
PyFlink scalar UDF25K events/s100%2.5 GB180 ms
PyFlink Pandas UDF85K events/s80%2.2 GB60 ms
PyFlink + Java UDF180K events/s100%1.8 GB30 ms

Выводы:

  • PyFlink scalar UDF — 8x медленнее Java.
  • Pandas UDF — 2.4x медленнее (acceptable для большинства use case).
  • Java UDF в PyFlink job — почти равно Java (5-10% разница).

Для этой команды правильным выбором стало “PyFlink с Java UDF для hot path”. Они сохранили Python developer experience для большинства кода, но критичные операторы (state-heavy aggregations) написаны в Java.


Попробуй сам

  1. Benchmark scalar vs Pandas UDF. Возьмите свой UDF, напишите версию scalar и Pandas. Запустите на 100K test events. Сравните throughput.

  2. py-spy профилирование. Запустите PyFlink job, найдите Python harness PID, прогоните py-spy на 30 секунд. Посмотрите flame graph — что съедает CPU?

  3. State cache impact. Запустите stateful job с python.state.cache-size=0 (disabled). Замерьте throughput. Поднимите до 10000. Сравните — это покажет реальный impact state caching на ваш workload.

Проверка знанийKnowledge check
У вас есть PyFlink job, который надо ускорить с 25K events/sec до 200K events/sec (8x). Сейчас pipeline: Kafka source -> SQL filter -> Python scalar UDF (parse custom format) -> Python KeyedProcessFunction с ValueState (aggregate per user) -> Kafka sink. Какой план оптимизации в порядке priority и ожидаемый speedup на каждом шаге?
ОтветAnswer
План в порядке impact: (1) Конверт scalar UDF в Pandas UDF — ожидаемый speedup 5-20x для parse operations (если они numeric/vectorizable). После этого pipeline стал бы 60-200K events/sec, возможно цель уже достигнута. Cost: переписать UDF, минимальный риск. (2) Если parse UDF действительно нельзя векторизовать (legacy custom format) — написать его в Java как JAR UDF, использовать через JavaUserDefinedScalarFunctions. Speedup 10-30x на этом UDF. (3) Stateful KeyedProcessFunction в Python — почти наверняка bottleneck. Опции по приоритету: (a) Включить python.state.cache-size=10000 — обычно даёт 2-3x speedup за счёт reducing RPC. (b) Buffer state updates в local memory (см. урок), flush раз в N events — если approximate aggregation OK. (c) Переписать только этот оператор в Java — это даст 5-10x speedup на нём. (4) Конфиги: python.fn-execution.bundle.size=5000, python.fn-execution.bundle.time=5000 — снижают per-event overhead на 20-40%. (5) Java + PyFlink mix — это типичная final architecture для high-performance PyFlink job-ов. SQL + Java UDF backbone, Python только для ML/preprocessing. Reality check: если стартуете с 25K и нужно 200K — это agressive jump. Готовьтесь, что Pandas UDF и cache дадут только 3-5x, а оставшийся 1.5-2x потребует Java переписывания части кода. Если этого нет в roadmap — пересмотрите throughput requirement или scale out parallelism (что добавляет infrastructure cost).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Approximate overhead numbers PyFlink vs Java per event для stateless и stateful UDF?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4