Learning Platform
Глоссарий Troubleshooting
Урок 19.01 · 26 мин
Продвинутый
PyFlinkBeam PortabilityPython HarnessgRPCProcess Isolation

PyFlink архитектура

Flink написан на Java и Scala, работает в JVM, и весь его планировщик, состояние, шаффл и runtime — это JVM-объекты и инвалидации между JVM-процессами. Python никогда не был “first-class” языком для Flink. И тем не менее, PyFlink существует, развивается с 2017 года, и обрабатывает реальные workload-ы в production. Как это работает?

Этот урок про архитектуру PyFlink: как Python-функция оказывается рядом с Java-оператором, какой ценой это даётся, и какие фундаментальные ограничения вы должны учитывать при выборе PyFlink для production-job-а.

Python UDF в Spark: анатомия overhead

Проблема: Python в JVM-движке

Flink-оператор это Java-класс, который получает event как Java-объект (POJO/Tuple/Row), вызывает user logic, и эмитит результат downstream. Если user logic написан на Python — нужен мост.

Возможны три подхода:

  1. Embedded Python в JVM (через GraalVM или Jython) — Python код выполняется внутри JVM. Этот подход экспериментален в Flink 1.15+, имеет ограничения (нет CPython extensions, нет numpy/pandas C-кода).
  2. External Python process с RPC — Python живёт в отдельном process, Java вызывает его через RPC. Это даёт изоляцию и полный CPython compatibility, но добавляет inter-process latency.
  3. Inline Python в SQL (через UDF registration) — Python функция регистрируется в Calcite, выполняется на каждое значение.

PyFlink использует подход #2 — external Python process с RPC. Конкретная реализация — Apache Beam Portability Framework, который Flink заимствует как proven solution для cross-language execution.


Beam Portability Framework

Beam Portability Framework (BPF) — это набор protobuf-определённых RPC-интерфейсов для запуска пользовательского кода в external language SDK harness. Изначально создан для Apache Beam, чтобы поддержать Python/Go/Java pipeline в один runner. Flink заимствует BPF и использует его как абстракцию между Java task и Python user code.

Архитектура:

PyFlink: путь Python UDF через Beam harness

Ключевые компоненты:

  • Python SDK Harness — это самостоятельный CPython process, запущенный TaskManager-ом через subprocess. Один Python process на slot, обычно. В Flink 2.2 поддерживается batched mode — один process на TaskManager обслуживает несколько slots.
  • gRPC server в Python — слушает локальный Unix domain socket (быстрее TCP), получает batches events для обработки.
  • Data channel — основной канал для events. Batching включён по умолчанию (100-1000 events на batch).
  • Control channel — для state access, timers, side outputs. Каждый stateful UDF делает RPC обратно в Java side для чтения/записи state.

Запуск Python harness

При старте Flink task с Python UDF происходит следующее:

  1. TaskManager создаёт Python virtual environment (или использует pre-built). Зависимости берутся из python-files или archives, указанных в job submission.
  2. Запускается subprocess: python3 -m apache_beam.runners.worker.sdk_worker --logging_endpoint ... --control_endpoint ....
  3. Subprocess открывает gRPC server, регистрируется в Java side.
  4. Java operator получает endpoint, начинает отправлять events.

Logs Python subprocess идут в stderr и попадают в TaskManager logs через специальный wrapper. Если Python harness крашится — TaskManager перезапускает его (до 5 раз) перед failing task.

# Python harness внутри (упрощённо)
class PythonSdkHarness:
    def serve(self, control_endpoint):
        server = grpc.server(thread_pool)
        beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
            self.control_servicer, server
        )
        server.add_insecure_port(control_endpoint)
        server.start()
        # Цикл: получать work items от Java side, обрабатывать, возвращать результаты
        for work_item in self.control_channel:
            self._execute(work_item)

Запуск Python process — это секунды на initialization (loading numpy, pandas — медленно). Поэтому harness переиспользуется для всех task’ов на этом slot в течение жизни job-а.


Cost модель: где теряются микросекунды

Каждый event, проходящий через Python UDF, проходит следующий путь overhead:

  1. Java serialize -> bytes (50-100 нс) — стандартный PojoSerializer или Avro.
  2. Java -> gRPC -> Python (1-10 мкс) — копирование через socket, batched.
  3. Python deserialize bytes -> object (1-5 мкс) — Python десериализация медленнее Java.
  4. user_function() execution (зависит от кода).
  5. Python serialize result -> bytes (1-5 мкс).
  6. Python -> gRPC -> Java (1-10 мкс).
  7. Java deserialize bytes -> object (50-100 нс).

Чистый overhead PyFlink (без учёта user logic) — обычно 5-30 мкс на event. На потоке 100K events/sec это ~3-10% CPU. На потоке 1M events/sec это превращается в 50-100% одного core.

Batching критичен. Если каждое событие отправляется отдельно — overhead будет неприемлемым. Beam Framework по умолчанию буферизует 100-1000 events перед gRPC call, что амортизирует cost.

# flink-conf.yaml настройка batch size
python.fn-execution.bundle.size: 1000        # events per bundle
python.fn-execution.bundle.time: 1000        # max ms per bundle
python.fn-execution.arrow.batch.size: 10000  # для Arrow-based UDF

Trade-off: больший bundle -> меньше overhead, но больше latency (события ждут заполнения батча).


State access через RPC

Stateful Python UDF имеют доступ к Flink state через RuntimeContext. Под капотом это означает: каждый state read/write — это gRPC call из Python обратно в Java.

class MyKeyedProcessFunction(KeyedProcessFunction):
    def open(self, runtime_context):
        # Регистрация ValueState — описание уходит в Java side
        self.count = runtime_context.get_state(
            ValueStateDescriptor("count", Types.LONG())
        )

    def process_element(self, event, ctx):
        # Каждый .value() — это RPC call в Java side!
        current = self.count.value() or 0
        current += 1
        self.count.update(current)  # ещё один RPC call
        yield (event.user_id, current)

Стоимость per-state-access — 50-200 мкс (включая gRPC + state backend read). Если ваш UDF делает 5 state-access на event, это 250-1000 мкс на event, что = 1000-4000 events/sec/slot. Огромное падение.

Оптимизация: state caching на Python side. Flink 2.x кэширует state values в Python harness — повторные .value() для того же key читаются из cache. Cache invalidation при переключении key. Это снижает RPC count для hot-key workloads на 70-90%.

python.state.cache-size: 1000  # количество ключей в Python-side cache
WARNING

State в PyFlink дороже чем в Java/Scala в 10-50 раз из-за RPC overhead. Если ваш UDF stateful и hot — рассмотрите написание этого конкретного оператора на Java, оставив остальной pipeline на Python. PyFlink поддерживает mixing — Python и Java операторы в одном pipeline.


Process model: один Python на slot

По умолчанию каждый TaskManager slot имеет собственный Python harness process. На TM с 8 slots — 8 Python processes. Это даёт изоляцию (краш одного не убивает остальные) но multiply memory footprint.

Каждый Python process тянет за собой:

  • CPython runtime (~50 MB)
  • numpy, pandas (~200 MB если используется)
  • ваши user dependencies (variable)

Итого 250-500 MB на slot. На больших TM (32+ slots) это становится memory-killer.

Flink 2.2 добавил shared mode: один Python process на TaskManager обслуживает все slots через GIL serialization. Меньше memory, но GIL становится bottleneck — Python код фактически serial при shared mode. Подходит только для I/O-bound UDF (вызовы внешних API), не для CPU-bound (numpy compute).

python.fn-execution.process-mode: per-slot  # default
# или
python.fn-execution.process-mode: shared     # экспериментально 2.2

Apache Arrow для vectorized exchange

Для большого throughput PyFlink использует Apache Arrow как wire format между Java и Python. Arrow — это columnar in-memory format, который позволяет zero-copy между процессами через shared memory.

Без Arrow:

Java row -> bytes -> Python row -> numpy column for compute

С Arrow:

Java columnar batch -> Arrow batch (shared memory) -> pandas DataFrame

Конверсия в pandas DataFrame почти бесплатна — Arrow layout совместим с pandas memory layout. Это даёт Pandas UDF — функции, получающие pandas.Series вместо отдельных values. Производительность для vector operations в 5-50x лучше скалярных UDF.

from pyflink.table.udf import udf
from pyflink.common import Types
import pandas as pd

@udf(result_type=Types.DOUBLE(), func_type="pandas")
def add_tax(amount: pd.Series, rate: pd.Series) -{'>'} pd.Series:
    return amount * (1 + rate)  # vectorized — обрабатывает batch целиком

Этот UDF получает pandas Series для batch (1000-10000 значений) вместо вызова per-value. Numpy/pandas код работает на native C speed, без per-value gRPC overhead.


DataStream API vs Table API в Python

PyFlink поддерживает оба API, но через разные внутренние пути:

  • DataStream API в Python: каждый event пересылается через Beam harness как отдельный record. Stateful UDF делают gRPC за каждым state-access.
  • Table API в Python: SQL компилируется в Java-операторы Calcite-ом. Python UDF появляются только там, где явно вызвана python function — остальное чистый Java.

Это значит, что Table API + SQL в Python обычно работает быстрее, чем DataStream API в Python, даже если код выглядит более verbose. Большинство production-PyFlink job-ов используют Table API + Python только для конкретных UDF (ML inference, custom parsers).

# Table API: SQL делается Java-side, Python UDF только в SELECT
from pyflink.table import TableEnvironment

t_env = TableEnvironment.create(environment_settings)

# Python UDF
t_env.create_temporary_function("predict_fraud", fraud_model_udf)

# SQL compile в Java operators, Python UDF только на SELECT projection
result = t_env.sql_query("""
    SELECT user_id, amount, predict_fraud(features) AS fraud_score
    FROM orders
    WHERE amount > 100
""")

Здесь WHERE filter и group by (если бы было) выполняются в Java. Python UDF вызывается только для каждой row passing filter — резко меньше work для Python harness.


PyFlink оправдан, когда:

  • Команда — Python-native. Если data scientist-ы пишут модели на pandas/scikit-learn — PyFlink даёт им прямой путь к streaming без необходимости портирования в Java.
  • ML inference в pipeline. Загрузить модель (TensorFlow, PyTorch) и вызывать её per-event — естественная задача для Python.
  • Custom parsing для legacy форматов, где Python библиотеки лучше развиты (научные форматы, sigil-encoded data).
  • Prototyping. Быстрая итерация на Python даёт скорость разработки.

PyFlink НЕ оправдан, когда:

  • Hot path stateful logic. Stateful UDF в Python в 10-50x медленнее Java.
  • Latency-critical (под 10ms p99). Overhead Beam framework добавляет 5-30 мкс per event.
  • Огромный throughput (1M+ events/sec/operator). Java здесь однозначно лучше.
  • Простые SQL transformations. Просто пишите Flink SQL без Python UDF — это будет fastest path.
TIP

Best practice — гибридный подход. Pipeline backbone в Flink SQL (compiled to Java), Python UDF только для специфических задач (ML inference, custom transforms). Это даёт скорость Java + удобство Python для важных частей.


Job submission для PyFlink требует zip-архив с Python кодом и зависимостями:

# Создать venv с зависимостями
python3 -m venv venv
source venv/bin/activate
pip install pandas numpy scikit-learn

# Zip-архив venv
zip -r venv.zip venv

# Запуск Flink job
flink run \
    --python my_job.py \
    --pyArchives venv.zip \
    --pyExecutable venv.zip/venv/bin/python3

TaskManager-ы получают venv.zip, распаковывают локально, используют для Python harness. Этот процесс может занять минуты на больших archives (для venv с TensorFlow — это до гигабайта).

В Kubernetes-окружениях лучше использовать pre-built Docker images с предустановленным venv:

FROM flink:2.2-python
COPY requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt

Это убирает download latency и стабильнее для production.


Попробуй сам

  1. Hello PyFlink. Запустите простой Python job: ingest из Kafka, filter в Python UDF, write обратно в Kafka. Сравните throughput с эквивалентным Java job-ом — ожидайте 30-50% разницы.

  2. Stateful UDF stress test. Напишите Python KeyedProcessFunction с ValueState. Запустите на 100K events/sec. Наблюдайте CPU и Python state cache hit rate. Поиграйте с python.state.cache-size.

  3. Pandas UDF speedup. Возьмите CPU-bound трансформацию (например JSON parsing + computation). Реализуйте как scalar UDF и как Pandas UDF. Сравните throughput — ожидайте 5-20x улучшение для Pandas версии.

Проверка знанийKnowledge check
У вас PyFlink job с stateful KeyedProcessFunction. На потоке 50K events/sec наблюдаете 100% CPU на Python harness и backpressure от upstream. Профилирование показывает 60% времени в gRPC calls между Python и Java. Какие конкретные шаги по оптимизации, и в каком порядке?
ОтветAnswer
60% времени в gRPC — это RPC overhead, преимущественно state access. Порядок шагов: (1) Включить Python state cache: python.state.cache-size=10000 (зависит от количества active keys). Это убирает RPC для повторных .value() на тот же key — обычно 60-80% RPC исчезают. (2) Минимизировать state access в коде: вместо .value() + .update() на каждое событие, batch updates в local переменной и flush раз в N событий через timer. (3) Увеличить bundle size: python.fn-execution.bundle.size=5000 — больше events per gRPC call, меньше per-event overhead. (4) Проверить — действительно ли вам нужен Python для этой части? Stateful logic в hot path плохо подходит для PyFlink. Лучшее решение — переписать этот конкретный оператор на Java, оставив остальной pipeline в Python. PyFlink поддерживает mix. (5) Если миграция в Java невозможна — рассмотреть Pandas UDF batch processing: накапливать events в pandas DataFrame в state, обрабатывать batch периодически. Это разделит state access и compute. (6) Если ничего не помогло — последний resort это денормализация state: пересмотреть структуру state так, чтобы один read давал всё нужное (вместо нескольких чтений из разных state primitives).

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Как технически работает PyFlink под капотом — как Python UDF интегрируется с Java-based Flink runtime?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4