PyFlink архитектура
Flink написан на Java и Scala, работает в JVM, и весь его планировщик, состояние, шаффл и runtime — это JVM-объекты и инвалидации между JVM-процессами. Python никогда не был “first-class” языком для Flink. И тем не менее, PyFlink существует, развивается с 2017 года, и обрабатывает реальные workload-ы в production. Как это работает?
Этот урок про архитектуру PyFlink: как Python-функция оказывается рядом с Java-оператором, какой ценой это даётся, и какие фундаментальные ограничения вы должны учитывать при выборе PyFlink для production-job-а.
Python UDF в Spark: анатомия overheadПроблема: Python в JVM-движке
Flink-оператор это Java-класс, который получает event как Java-объект (POJO/Tuple/Row), вызывает user logic, и эмитит результат downstream. Если user logic написан на Python — нужен мост.
Возможны три подхода:
- Embedded Python в JVM (через GraalVM или Jython) — Python код выполняется внутри JVM. Этот подход экспериментален в Flink 1.15+, имеет ограничения (нет CPython extensions, нет numpy/pandas C-кода).
- External Python process с RPC — Python живёт в отдельном process, Java вызывает его через RPC. Это даёт изоляцию и полный CPython compatibility, но добавляет inter-process latency.
- Inline Python в SQL (через UDF registration) — Python функция регистрируется в Calcite, выполняется на каждое значение.
PyFlink использует подход #2 — external Python process с RPC. Конкретная реализация — Apache Beam Portability Framework, который Flink заимствует как proven solution для cross-language execution.
Beam Portability Framework
Beam Portability Framework (BPF) — это набор protobuf-определённых RPC-интерфейсов для запуска пользовательского кода в external language SDK harness. Изначально создан для Apache Beam, чтобы поддержать Python/Go/Java pipeline в один runner. Flink заимствует BPF и использует его как абстракцию между Java task и Python user code.
Архитектура:
Ключевые компоненты:
- Python SDK Harness — это самостоятельный CPython process, запущенный TaskManager-ом через subprocess. Один Python process на slot, обычно. В Flink 2.2 поддерживается batched mode — один process на TaskManager обслуживает несколько slots.
- gRPC server в Python — слушает локальный Unix domain socket (быстрее TCP), получает batches events для обработки.
- Data channel — основной канал для events. Batching включён по умолчанию (100-1000 events на batch).
- Control channel — для state access, timers, side outputs. Каждый stateful UDF делает RPC обратно в Java side для чтения/записи state.
Запуск Python harness
При старте Flink task с Python UDF происходит следующее:
- TaskManager создаёт Python virtual environment (или использует pre-built). Зависимости берутся из python-files или archives, указанных в job submission.
- Запускается subprocess:
python3 -m apache_beam.runners.worker.sdk_worker --logging_endpoint ... --control_endpoint .... - Subprocess открывает gRPC server, регистрируется в Java side.
- Java operator получает endpoint, начинает отправлять events.
Logs Python subprocess идут в stderr и попадают в TaskManager logs через специальный wrapper. Если Python harness крашится — TaskManager перезапускает его (до 5 раз) перед failing task.
# Python harness внутри (упрощённо)
class PythonSdkHarness:
def serve(self, control_endpoint):
server = grpc.server(thread_pool)
beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
self.control_servicer, server
)
server.add_insecure_port(control_endpoint)
server.start()
# Цикл: получать work items от Java side, обрабатывать, возвращать результаты
for work_item in self.control_channel:
self._execute(work_item)
Запуск Python process — это секунды на initialization (loading numpy, pandas — медленно). Поэтому harness переиспользуется для всех task’ов на этом slot в течение жизни job-а.
Cost модель: где теряются микросекунды
Каждый event, проходящий через Python UDF, проходит следующий путь overhead:
- Java serialize -> bytes (50-100 нс) — стандартный PojoSerializer или Avro.
- Java -> gRPC -> Python (1-10 мкс) — копирование через socket, batched.
- Python deserialize bytes -> object (1-5 мкс) — Python десериализация медленнее Java.
- user_function() execution (зависит от кода).
- Python serialize result -> bytes (1-5 мкс).
- Python -> gRPC -> Java (1-10 мкс).
- Java deserialize bytes -> object (50-100 нс).
Чистый overhead PyFlink (без учёта user logic) — обычно 5-30 мкс на event. На потоке 100K events/sec это ~3-10% CPU. На потоке 1M events/sec это превращается в 50-100% одного core.
Batching критичен. Если каждое событие отправляется отдельно — overhead будет неприемлемым. Beam Framework по умолчанию буферизует 100-1000 events перед gRPC call, что амортизирует cost.
# flink-conf.yaml настройка batch size
python.fn-execution.bundle.size: 1000 # events per bundle
python.fn-execution.bundle.time: 1000 # max ms per bundle
python.fn-execution.arrow.batch.size: 10000 # для Arrow-based UDF
Trade-off: больший bundle -> меньше overhead, но больше latency (события ждут заполнения батча).
State access через RPC
Stateful Python UDF имеют доступ к Flink state через RuntimeContext. Под капотом это означает: каждый state read/write — это gRPC call из Python обратно в Java.
class MyKeyedProcessFunction(KeyedProcessFunction):
def open(self, runtime_context):
# Регистрация ValueState — описание уходит в Java side
self.count = runtime_context.get_state(
ValueStateDescriptor("count", Types.LONG())
)
def process_element(self, event, ctx):
# Каждый .value() — это RPC call в Java side!
current = self.count.value() or 0
current += 1
self.count.update(current) # ещё один RPC call
yield (event.user_id, current)
Стоимость per-state-access — 50-200 мкс (включая gRPC + state backend read). Если ваш UDF делает 5 state-access на event, это 250-1000 мкс на event, что = 1000-4000 events/sec/slot. Огромное падение.
Оптимизация: state caching на Python side. Flink 2.x кэширует state values в Python harness — повторные .value() для того же key читаются из cache. Cache invalidation при переключении key. Это снижает RPC count для hot-key workloads на 70-90%.
python.state.cache-size: 1000 # количество ключей в Python-side cache
State в PyFlink дороже чем в Java/Scala в 10-50 раз из-за RPC overhead. Если ваш UDF stateful и hot — рассмотрите написание этого конкретного оператора на Java, оставив остальной pipeline на Python. PyFlink поддерживает mixing — Python и Java операторы в одном pipeline.
Process model: один Python на slot
По умолчанию каждый TaskManager slot имеет собственный Python harness process. На TM с 8 slots — 8 Python processes. Это даёт изоляцию (краш одного не убивает остальные) но multiply memory footprint.
Каждый Python process тянет за собой:
- CPython runtime (~50 MB)
- numpy, pandas (~200 MB если используется)
- ваши user dependencies (variable)
Итого 250-500 MB на slot. На больших TM (32+ slots) это становится memory-killer.
Flink 2.2 добавил shared mode: один Python process на TaskManager обслуживает все slots через GIL serialization. Меньше memory, но GIL становится bottleneck — Python код фактически serial при shared mode. Подходит только для I/O-bound UDF (вызовы внешних API), не для CPU-bound (numpy compute).
python.fn-execution.process-mode: per-slot # default
# или
python.fn-execution.process-mode: shared # экспериментально 2.2
Apache Arrow для vectorized exchange
Для большого throughput PyFlink использует Apache Arrow как wire format между Java и Python. Arrow — это columnar in-memory format, который позволяет zero-copy между процессами через shared memory.
Без Arrow:
Java row -> bytes -> Python row -> numpy column for compute
С Arrow:
Java columnar batch -> Arrow batch (shared memory) -> pandas DataFrame
Конверсия в pandas DataFrame почти бесплатна — Arrow layout совместим с pandas memory layout. Это даёт Pandas UDF — функции, получающие pandas.Series вместо отдельных values. Производительность для vector operations в 5-50x лучше скалярных UDF.
from pyflink.table.udf import udf
from pyflink.common import Types
import pandas as pd
@udf(result_type=Types.DOUBLE(), func_type="pandas")
def add_tax(amount: pd.Series, rate: pd.Series) -{'>'} pd.Series:
return amount * (1 + rate) # vectorized — обрабатывает batch целиком
Этот UDF получает pandas Series для batch (1000-10000 значений) вместо вызова per-value. Numpy/pandas код работает на native C speed, без per-value gRPC overhead.
DataStream API vs Table API в Python
PyFlink поддерживает оба API, но через разные внутренние пути:
- DataStream API в Python: каждый event пересылается через Beam harness как отдельный record. Stateful UDF делают gRPC за каждым state-access.
- Table API в Python: SQL компилируется в Java-операторы Calcite-ом. Python UDF появляются только там, где явно вызвана python function — остальное чистый Java.
Это значит, что Table API + SQL в Python обычно работает быстрее, чем DataStream API в Python, даже если код выглядит более verbose. Большинство production-PyFlink job-ов используют Table API + Python только для конкретных UDF (ML inference, custom parsers).
# Table API: SQL делается Java-side, Python UDF только в SELECT
from pyflink.table import TableEnvironment
t_env = TableEnvironment.create(environment_settings)
# Python UDF
t_env.create_temporary_function("predict_fraud", fraud_model_udf)
# SQL compile в Java operators, Python UDF только на SELECT projection
result = t_env.sql_query("""
SELECT user_id, amount, predict_fraud(features) AS fraud_score
FROM orders
WHERE amount > 100
""")
Здесь WHERE filter и group by (если бы было) выполняются в Java. Python UDF вызывается только для каждой row passing filter — резко меньше work для Python harness.
Когда выбирать PyFlink
PyFlink оправдан, когда:
- Команда — Python-native. Если data scientist-ы пишут модели на pandas/scikit-learn — PyFlink даёт им прямой путь к streaming без необходимости портирования в Java.
- ML inference в pipeline. Загрузить модель (TensorFlow, PyTorch) и вызывать её per-event — естественная задача для Python.
- Custom parsing для legacy форматов, где Python библиотеки лучше развиты (научные форматы, sigil-encoded data).
- Prototyping. Быстрая итерация на Python даёт скорость разработки.
PyFlink НЕ оправдан, когда:
- Hot path stateful logic. Stateful UDF в Python в 10-50x медленнее Java.
- Latency-critical (под 10ms p99). Overhead Beam framework добавляет 5-30 мкс per event.
- Огромный throughput (1M+ events/sec/operator). Java здесь однозначно лучше.
- Простые SQL transformations. Просто пишите Flink SQL без Python UDF — это будет fastest path.
Best practice — гибридный подход. Pipeline backbone в Flink SQL (compiled to Java), Python UDF только для специфических задач (ML inference, custom transforms). Это даёт скорость Java + удобство Python для важных частей.
Деплоймент PyFlink
Job submission для PyFlink требует zip-архив с Python кодом и зависимостями:
# Создать venv с зависимостями
python3 -m venv venv
source venv/bin/activate
pip install pandas numpy scikit-learn
# Zip-архив venv
zip -r venv.zip venv
# Запуск Flink job
flink run \
--python my_job.py \
--pyArchives venv.zip \
--pyExecutable venv.zip/venv/bin/python3
TaskManager-ы получают venv.zip, распаковывают локально, используют для Python harness. Этот процесс может занять минуты на больших archives (для venv с TensorFlow — это до гигабайта).
В Kubernetes-окружениях лучше использовать pre-built Docker images с предустановленным venv:
FROM flink:2.2-python
COPY requirements.txt /tmp/
RUN pip install -r /tmp/requirements.txt
Это убирает download latency и стабильнее для production.
Попробуй сам
-
Hello PyFlink. Запустите простой Python job: ingest из Kafka, filter в Python UDF, write обратно в Kafka. Сравните throughput с эквивалентным Java job-ом — ожидайте 30-50% разницы.
-
Stateful UDF stress test. Напишите Python KeyedProcessFunction с ValueState. Запустите на 100K events/sec. Наблюдайте CPU и Python state cache hit rate. Поиграйте с
python.state.cache-size. -
Pandas UDF speedup. Возьмите CPU-bound трансформацию (например JSON parsing + computation). Реализуйте как scalar UDF и как Pandas UDF. Сравните throughput — ожидайте 5-20x улучшение для Pandas версии.