DataStream API в Python
После архитектурного urок-а — конкретный API. PyFlink DataStream API повторяет Java DataStream API почти 1:1 по семантике, но имеет несколько importantных отличий в синтаксисе, типизации и performance characteristics.
Этот урок про практическую работу с PyFlink DataStream API: какие классы для каких задач, как корректно типизировать, какие паттерны работают в production, а какие выглядят правильно, но дают катастрофичную performance.
DataStream transformations: map, filter, flatMapStreamExecutionEnvironment в Python
Точка входа любого PyFlink job — StreamExecutionEnvironment:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# Source
source = env.from_collection(
collection=[(1, "alice"), (2, "bob"), (3, "carol")],
type_info=Types.TUPLE([Types.INT(), Types.STRING()])
)
# Transformation
result = source \
.map(lambda x: (x[0], x[1].upper()), output_type=Types.TUPLE([Types.INT(), Types.STRING()])) \
.filter(lambda x: x[0] {'>'} 1)
# Sink
result.print()
env.execute("my_job")
Отличия от Java:
output_type=Types.TUPLE(...)обязателен почти всегда (Python не имеет type erasure problems как Java, но system типов разный).- Лямбды это Python callable, любой объект с
__call__. env.execute()блокирующий — возвращает результат только после остановки job-а.
Type system: pyflink.common.Types
Flink-у нужна явная информация о типах для сериализации между Python и Java. В Python нет статической типизации, поэтому используются объекты Types:
from pyflink.common.typeinfo import Types
# Basic types
Types.STRING()
Types.INT() # 32-bit
Types.LONG() # 64-bit
Types.DOUBLE()
Types.BOOLEAN()
Types.BYTE()
Types.SHORT()
Types.FLOAT()
# Sql time types
Types.SQL_TIMESTAMP() # java.sql.Timestamp
Types.INSTANT() # java.time.Instant
Types.LOCAL_DATE()
# Composite types
Types.TUPLE([Types.STRING(), Types.LONG()])
Types.ROW([Types.STRING(), Types.LONG()])
Types.ROW_NAMED(["user_id", "amount"], [Types.STRING(), Types.LONG()])
# Collections
Types.LIST(Types.STRING())
Types.MAP(Types.STRING(), Types.LONG())
Types.PRIMITIVE_ARRAY(Types.INT())
Types.BASIC_ARRAY(Types.STRING()) # Object[]
Row vs Tuple: оба сериализуются как positional record. Row отличается тем, что имеет named accessors (row['user_id']), Tuple — только positional (tuple[0]). Row предпочтительнее для readability в большом коде.
from pyflink.common import Row
def parse_event(raw: str) -{'>'} Row:
parts = raw.split(",")
return Row(user_id=parts[0], amount=int(parts[1]), ts=int(parts[2]))
stream.map(
parse_event,
output_type=Types.ROW_NAMED(
["user_id", "amount", "ts"],
[Types.STRING(), Types.LONG(), Types.LONG()]
)
)
Без output_type Flink не знает, как сериализовать результат, и упадёт с TypeError.
Function classes vs lambdas
В Java DataStream API вы выбираете между Function (lightweight stateless) и RichFunction (имеет RuntimeContext, open, close). В Python та же дихотомия, но через class hierarchy.
from pyflink.datastream.functions import (
MapFunction,
FilterFunction,
FlatMapFunction,
ProcessFunction,
KeyedProcessFunction,
)
class ParseEvent(MapFunction):
def map(self, raw_string):
parts = raw_string.split(",")
return Row(user_id=parts[0], amount=int(parts[1]))
stream.map(ParseEvent(), output_type=Types.ROW_NAMED([...], [...]))
vs lambda:
stream.map(
lambda raw: Row(user_id=raw.split(",")[0], amount=int(raw.split(",")[1])),
output_type=Types.ROW_NAMED([...], [...])
)
Когда class предпочтительнее:
- Нужен setup (
open()) — загрузка модели, инициализация connections. - Нужен RuntimeContext для metrics, state, side outputs.
- Code reuse — функция используется в нескольких местах pipeline.
Когда lambda OK:
- Stateless transformation в одну строку.
- Quick prototyping.
RuntimeContext в Python
RuntimeContext даёт доступ к task-level информации и инструментам: metrics, state, configuration. В Python:
from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.common import Types
from pyflink.datastream.state import ValueStateDescriptor
class FraudDetector(KeyedProcessFunction):
def open(self, runtime_context):
# Регистрация state
self.balance = runtime_context.get_state(
ValueStateDescriptor("balance", Types.DOUBLE())
)
self.alert_count = runtime_context.get_state(
ValueStateDescriptor("alert_count", Types.LONG())
)
# Metrics
self.alerts_counter = runtime_context.get_metrics_group().counter("alerts_fired")
# Task info
self.task_index = runtime_context.get_index_of_this_subtask()
self.task_parallelism = runtime_context.get_number_of_parallel_subtasks()
def process_element(self, value, ctx):
current = self.balance.value() or 0.0
current += value.amount
self.balance.update(current)
if current {'>'} 10000:
count = (self.alert_count.value() or 0) + 1
self.alert_count.update(count)
self.alerts_counter.inc()
yield Row(
user_id=ctx.get_current_key(),
level="HIGH",
count=count,
timestamp=ctx.timestamp()
)
def close(self):
# cleanup if needed
pass
Важно:
process_element— это generator (yieldвместоreturn). Несколько yield = несколько output events.ctxдает доступ к timestamp, current key, side outputs, timer service.- State access внутри
process_elementдорогой (RPC) — кэшируйте на стороне Python если возможно.
Timers в Python
Timer service работает почти так же, как в Java:
class SessionWindow(KeyedProcessFunction):
def open(self, runtime_context):
self.last_event = runtime_context.get_state(
ValueStateDescriptor("last_event", Types.LONG())
)
self.session_count = runtime_context.get_state(
ValueStateDescriptor("session_count", Types.LONG())
)
def process_element(self, value, ctx):
# Регистрация timer через event time
timer_ts = value.ts + 30_000 # 30 sec session timeout
ctx.timer_service().register_event_time_timer(timer_ts)
self.last_event.update(value.ts)
self.session_count.update((self.session_count.value() or 0) + 1)
def on_timer(self, timestamp, ctx):
# Сессия закончилась — emit aggregate
yield Row(
user_id=ctx.get_current_key(),
session_end=timestamp,
event_count=self.session_count.value() or 0
)
self.session_count.clear()
self.last_event.clear()
Особенности:
on_timerтоже generator.- Event time timers ждут watermarks; processing time timers — wall clock.
- Каждый зарегистрированный timer = небольшая запись в timer state — миллионы pending timers вылазят боком (RocksDB inflation).
Side outputs
Для emit multiple stream types из одной функции — side outputs:
from pyflink.datastream import OutputTag
from pyflink.common import Types
# Объявление tag-а
late_events_tag = OutputTag("late", Types.STRING())
suspicious_tag = OutputTag("suspicious", Types.ROW_NAMED(
["user_id", "reason"],
[Types.STRING(), Types.STRING()]
))
class Router(ProcessFunction):
def process_element(self, value, ctx):
if value.ts {'<'} ctx.timer_service().current_watermark() - 60_000:
# Late event — отдельный stream
ctx.output(late_events_tag, f"{value.user_id}:{value.ts}")
elif value.amount {'>'} 100000:
# Suspicious — отдельный stream
ctx.output(suspicious_tag, Row(user_id=value.user_id, reason="big_amount"))
else:
# Main stream
yield value
routed = stream.process(Router(), output_type=Types.ROW_NAMED([...], [...]))
late_stream = routed.get_side_output(late_events_tag)
suspicious_stream = routed.get_side_output(suspicious_tag)
Side outputs не имеют overhead vs main output — это просто разные downstream connections.
Connector setup в Python
Sources и sinks конфигурируются через builder pattern, такой же как в Java:
from pyflink.datastream.connectors.kafka import (
KafkaSource,
KafkaOffsetsInitializer,
KafkaSink,
KafkaRecordSerializationSchema
)
from pyflink.common.serialization import SimpleStringSchema
# Source
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("broker:9092") \
.set_topics("orders") \
.set_group_id("flink-group") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
stream = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "kafka-source")
# Sink
kafka_sink = KafkaSink.builder() \
.set_bootstrap_servers("broker:9092") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("processed")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.build()
stream.sink_to(kafka_sink)
Доступны connector-ы:
- KafkaSource/KafkaSink (Kafka)
- JdbcSink (PostgreSQL, MySQL)
- FileSource/FileSink (S3, local)
- IcebergSink (Iceberg lake)
- ElasticsearchSink
Не все Java connector-ы имеют Python API — для exotic source-ов придётся писать custom через SourceFunction.
Async I/O в Python
Async I/O (вызов внешнего сервиса с unblocking) поддерживается в PyFlink через AsyncFunction:
from pyflink.datastream.async_function import AsyncFunction
import aiohttp
class EnrichWithGeo(AsyncFunction):
def open(self, runtime_context):
self.session = aiohttp.ClientSession()
async def async_invoke(self, value):
async with self.session.get(f"https://geo-api/{value.ip}") as resp:
geo = await resp.json()
return [Row(ip=value.ip, country=geo['country'])]
def close(self):
# cleanup в Python harness
pass
stream.transform(
"async_enrich",
EnrichWithGeo(),
output_type=Types.ROW_NAMED([...], [...]),
timeout=5000, # ms
capacity=100, # in-flight requests
)
Под капотом async function запускается в asyncio event loop в Python harness. До 100 concurrent requests в одном process — это полезно для I/O bound enrichment (API calls, DB lookups). CPU bound async UDF не дают benefit (GIL).
Custom serializer для Python types
Для нестандартных типов нужен custom serializer на Java side. PyFlink не позволяет создать чисто-Python custom serializer — каждое значение должно быть конвертируемо в один из поддерживаемых Types.
Workaround: сериализуйте сложный Python объект в bytes/string внутри MapFunction:
import pickle
def serialize_model_state(state):
return pickle.dumps(state)
def deserialize_model_state(bytes):
return pickle.loads(bytes)
stream.map(
lambda x: (x.user_id, serialize_model_state(x.model)),
output_type=Types.TUPLE([Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE())])
)
Это не максимально эффективно (pickle медленный, bytes тяжелые), но работает для случаев, когда нужно прокинуть unsupported тип через Flink stream. Для production — лучше переделать в проблемный тип в поддерживаемый Row/Tuple.
Connecting streams (CoFunction)
Two-stream operations через connect:
from pyflink.datastream.functions import CoProcessFunction
class Joiner(CoProcessFunction):
def open(self, runtime_context):
self.left_state = runtime_context.get_state(
ValueStateDescriptor("left", Types.ROW(...))
)
self.right_state = runtime_context.get_state(
ValueStateDescriptor("right", Types.ROW(...))
)
def process_element1(self, value, ctx):
# event из левого stream
right = self.right_state.value()
if right is not None:
yield merge(value, right)
else:
self.left_state.update(value)
def process_element2(self, value, ctx):
# event из правого stream
left = self.left_state.value()
if left is not None:
yield merge(left, value)
else:
self.right_state.update(value)
stream1.connect(stream2) \
.key_by(lambda x: x.user_id, lambda y: y.user_id) \
.process(Joiner(), output_type=Types.ROW_NAMED([...], [...]))
Real production patterns
Pattern 1: Lightweight Python wrapper + Java state. Pipeline backbone в Java для производительности, Python только для финального enrichment (ML inference, custom output formatting). PyFlink поддерживает mixing — вы можете передавать stream между Java и Python operator-ами.
Pattern 2: Stateless Python для preprocessing. Парсинг raw bytes, нормализация, валидация — в stateless Python функциях. Дальше keyBy и stateful logic на Java/Scala. Python harness не несёт state cost.
Pattern 3: Pandas UDF для compute-heavy. Если нужно делать numpy/pandas computation на batch — Pandas UDF в 10-50x быстрее scalar Python. Кейс: aggregation over window, ML feature engineering.
Antipattern: stateful hot path в Python. Это самый частый failure mode — команда пишет KeyedProcessFunction в Python, тестирует на 1K events/sec, всё работает, в production на 100K events/sec — backpressure и job стопается.
Любой stateful Python operator в hot path — потенциальный bottleneck. Перед production load test обязательно: запустить chaos test с peak throughput, измерить state RPC count через metrics, оценить cache hit rate. Если cache hit меньше 70% — кейс плох для Python.
Попробуй сам
-
End-to-end Python job. Напишите job: KafkaSource -> parse JSON -> keyBy user_id -> KeyedProcessFunction с ValueState -> KafkaSink. Тест на 10K events/sec. Замерьте per-event latency и CPU.
-
Side outputs. Расширьте job выше: добавьте late events stream (события старше 60 sec watermark) и suspicious stream (amount > $10K). Каждый side output пусть пишется в отдельный Kafka topic.
-
Async enrichment. Реализуйте AsyncFunction, которая делает HTTP call к mock geo-API. Тест на разной capacity (10, 100, 500 concurrent). Найдите sweet spot для вашего API latency.