Learning Platform
Глоссарий Troubleshooting
Урок 19.02 · 26 мин
Продвинутый
PyFlink DataStreamProcessFunctionRuntimeContextMapFunctionType Mapping

DataStream API в Python

После архитектурного urок-а — конкретный API. PyFlink DataStream API повторяет Java DataStream API почти 1:1 по семантике, но имеет несколько importantных отличий в синтаксисе, типизации и performance characteristics.

Этот урок про практическую работу с PyFlink DataStream API: какие классы для каких задач, как корректно типизировать, какие паттерны работают в production, а какие выглядят правильно, но дают катастрофичную performance.

DataStream transformations: map, filter, flatMap

StreamExecutionEnvironment в Python

Точка входа любого PyFlink job — StreamExecutionEnvironment:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Types

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

# Source
source = env.from_collection(
    collection=[(1, "alice"), (2, "bob"), (3, "carol")],
    type_info=Types.TUPLE([Types.INT(), Types.STRING()])
)

# Transformation
result = source \
    .map(lambda x: (x[0], x[1].upper()), output_type=Types.TUPLE([Types.INT(), Types.STRING()])) \
    .filter(lambda x: x[0] {'>'} 1)

# Sink
result.print()

env.execute("my_job")

Отличия от Java:

  • output_type=Types.TUPLE(...) обязателен почти всегда (Python не имеет type erasure problems как Java, но system типов разный).
  • Лямбды это Python callable, любой объект с __call__.
  • env.execute() блокирующий — возвращает результат только после остановки job-а.

Type system: pyflink.common.Types

Flink-у нужна явная информация о типах для сериализации между Python и Java. В Python нет статической типизации, поэтому используются объекты Types:

from pyflink.common.typeinfo import Types

# Basic types
Types.STRING()
Types.INT()       # 32-bit
Types.LONG()      # 64-bit
Types.DOUBLE()
Types.BOOLEAN()
Types.BYTE()
Types.SHORT()
Types.FLOAT()

# Sql time types
Types.SQL_TIMESTAMP()  # java.sql.Timestamp
Types.INSTANT()        # java.time.Instant
Types.LOCAL_DATE()

# Composite types
Types.TUPLE([Types.STRING(), Types.LONG()])
Types.ROW([Types.STRING(), Types.LONG()])
Types.ROW_NAMED(["user_id", "amount"], [Types.STRING(), Types.LONG()])

# Collections
Types.LIST(Types.STRING())
Types.MAP(Types.STRING(), Types.LONG())
Types.PRIMITIVE_ARRAY(Types.INT())
Types.BASIC_ARRAY(Types.STRING())  # Object[]

Row vs Tuple: оба сериализуются как positional record. Row отличается тем, что имеет named accessors (row['user_id']), Tuple — только positional (tuple[0]). Row предпочтительнее для readability в большом коде.

from pyflink.common import Row

def parse_event(raw: str) -{'>'} Row:
    parts = raw.split(",")
    return Row(user_id=parts[0], amount=int(parts[1]), ts=int(parts[2]))

stream.map(
    parse_event,
    output_type=Types.ROW_NAMED(
        ["user_id", "amount", "ts"],
        [Types.STRING(), Types.LONG(), Types.LONG()]
    )
)

Без output_type Flink не знает, как сериализовать результат, и упадёт с TypeError.


Function classes vs lambdas

В Java DataStream API вы выбираете между Function (lightweight stateless) и RichFunction (имеет RuntimeContext, open, close). В Python та же дихотомия, но через class hierarchy.

from pyflink.datastream.functions import (
    MapFunction,
    FilterFunction,
    FlatMapFunction,
    ProcessFunction,
    KeyedProcessFunction,
)

class ParseEvent(MapFunction):
    def map(self, raw_string):
        parts = raw_string.split(",")
        return Row(user_id=parts[0], amount=int(parts[1]))

stream.map(ParseEvent(), output_type=Types.ROW_NAMED([...], [...]))

vs lambda:

stream.map(
    lambda raw: Row(user_id=raw.split(",")[0], amount=int(raw.split(",")[1])),
    output_type=Types.ROW_NAMED([...], [...])
)

Когда class предпочтительнее:

  • Нужен setup (open()) — загрузка модели, инициализация connections.
  • Нужен RuntimeContext для metrics, state, side outputs.
  • Code reuse — функция используется в нескольких местах pipeline.

Когда lambda OK:

  • Stateless transformation в одну строку.
  • Quick prototyping.

RuntimeContext в Python

RuntimeContext даёт доступ к task-level информации и инструментам: metrics, state, configuration. В Python:

from pyflink.datastream.functions import KeyedProcessFunction
from pyflink.common import Types
from pyflink.datastream.state import ValueStateDescriptor

class FraudDetector(KeyedProcessFunction):
    def open(self, runtime_context):
        # Регистрация state
        self.balance = runtime_context.get_state(
            ValueStateDescriptor("balance", Types.DOUBLE())
        )
        self.alert_count = runtime_context.get_state(
            ValueStateDescriptor("alert_count", Types.LONG())
        )
        # Metrics
        self.alerts_counter = runtime_context.get_metrics_group().counter("alerts_fired")
        # Task info
        self.task_index = runtime_context.get_index_of_this_subtask()
        self.task_parallelism = runtime_context.get_number_of_parallel_subtasks()

    def process_element(self, value, ctx):
        current = self.balance.value() or 0.0
        current += value.amount
        self.balance.update(current)

        if current {'>'} 10000:
            count = (self.alert_count.value() or 0) + 1
            self.alert_count.update(count)
            self.alerts_counter.inc()
            yield Row(
                user_id=ctx.get_current_key(),
                level="HIGH",
                count=count,
                timestamp=ctx.timestamp()
            )

    def close(self):
        # cleanup if needed
        pass

Важно:

  • process_element — это generator (yield вместо return). Несколько yield = несколько output events.
  • ctx дает доступ к timestamp, current key, side outputs, timer service.
  • State access внутри process_element дорогой (RPC) — кэшируйте на стороне Python если возможно.

Timers в Python

Timer service работает почти так же, как в Java:

class SessionWindow(KeyedProcessFunction):
    def open(self, runtime_context):
        self.last_event = runtime_context.get_state(
            ValueStateDescriptor("last_event", Types.LONG())
        )
        self.session_count = runtime_context.get_state(
            ValueStateDescriptor("session_count", Types.LONG())
        )

    def process_element(self, value, ctx):
        # Регистрация timer через event time
        timer_ts = value.ts + 30_000  # 30 sec session timeout
        ctx.timer_service().register_event_time_timer(timer_ts)

        self.last_event.update(value.ts)
        self.session_count.update((self.session_count.value() or 0) + 1)

    def on_timer(self, timestamp, ctx):
        # Сессия закончилась — emit aggregate
        yield Row(
            user_id=ctx.get_current_key(),
            session_end=timestamp,
            event_count=self.session_count.value() or 0
        )
        self.session_count.clear()
        self.last_event.clear()

Особенности:

  • on_timer тоже generator.
  • Event time timers ждут watermarks; processing time timers — wall clock.
  • Каждый зарегистрированный timer = небольшая запись в timer state — миллионы pending timers вылазят боком (RocksDB inflation).

Side outputs

Для emit multiple stream types из одной функции — side outputs:

from pyflink.datastream import OutputTag
from pyflink.common import Types

# Объявление tag-а
late_events_tag = OutputTag("late", Types.STRING())
suspicious_tag = OutputTag("suspicious", Types.ROW_NAMED(
    ["user_id", "reason"],
    [Types.STRING(), Types.STRING()]
))

class Router(ProcessFunction):
    def process_element(self, value, ctx):
        if value.ts {'<'} ctx.timer_service().current_watermark() - 60_000:
            # Late event — отдельный stream
            ctx.output(late_events_tag, f"{value.user_id}:{value.ts}")
        elif value.amount {'>'} 100000:
            # Suspicious — отдельный stream
            ctx.output(suspicious_tag, Row(user_id=value.user_id, reason="big_amount"))
        else:
            # Main stream
            yield value

routed = stream.process(Router(), output_type=Types.ROW_NAMED([...], [...]))
late_stream = routed.get_side_output(late_events_tag)
suspicious_stream = routed.get_side_output(suspicious_tag)

Side outputs не имеют overhead vs main output — это просто разные downstream connections.


PyFlink DataStream API: типичный pipeline shape
KafkaSourceKafkaSource: ingest raw events from Kafka. Сериализация через KafkaRecordDeserializer — конвертация bytes в Python objects через JsonRowDeserialization или custom Avro/Protobuf.
.map(parse)MapFunction для parsing: stateless, lambda работает. Python harness здесь делает основную CPU работу — parse JSON, validate, transform. Output type обязателен.
.key_by(user_id)keyBy для группировки по user_id: shuffle через сеть, события того же user попадают в одну subtask. Здесь Python не работает — shuffle это Java-side операция.
KeyedProcessFunctionKeyedProcessFunction: где живёт stateful logic. State access делается через RPC обратно в Java — дорого. Cache в Python harness амортизирует cost. Здесь — потенциальный bottleneck.
ValueStateValueState/ListState/MapState: state живёт в Java side (RocksDB или HashMap). Python harness получает данные через gRPC, кэширует, обновляет с lazy write. Cache size настраивается через python.state.cache-size.
TimerTimer service: registered timers хранятся в Flink, on_timer вызывается из Python harness при срабатывании. Cost регистрации — RPC + state write.
Side outputsSide outputs для разделения streams: late, suspicious, normal. Каждый side output — отдельный downstream connection. Не дороже main output.
KafkaSinkSink: запись в Kafka, Iceberg, или другой connector. Sink runs в Java side, Python сериализует output в bytes которые далее уходят в connector.

Connector setup в Python

Sources и sinks конфигурируются через builder pattern, такой же как в Java:

from pyflink.datastream.connectors.kafka import (
    KafkaSource,
    KafkaOffsetsInitializer,
    KafkaSink,
    KafkaRecordSerializationSchema
)
from pyflink.common.serialization import SimpleStringSchema

# Source
kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers("broker:9092") \
    .set_topics("orders") \
    .set_group_id("flink-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

stream = env.from_source(kafka_source, WatermarkStrategy.no_watermarks(), "kafka-source")

# Sink
kafka_sink = KafkaSink.builder() \
    .set_bootstrap_servers("broker:9092") \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("processed")
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
    ) \
    .build()

stream.sink_to(kafka_sink)

Доступны connector-ы:

  • KafkaSource/KafkaSink (Kafka)
  • JdbcSink (PostgreSQL, MySQL)
  • FileSource/FileSink (S3, local)
  • IcebergSink (Iceberg lake)
  • ElasticsearchSink

Не все Java connector-ы имеют Python API — для exotic source-ов придётся писать custom через SourceFunction.


Async I/O в Python

Async I/O (вызов внешнего сервиса с unblocking) поддерживается в PyFlink через AsyncFunction:

from pyflink.datastream.async_function import AsyncFunction
import aiohttp

class EnrichWithGeo(AsyncFunction):
    def open(self, runtime_context):
        self.session = aiohttp.ClientSession()

    async def async_invoke(self, value):
        async with self.session.get(f"https://geo-api/{value.ip}") as resp:
            geo = await resp.json()
        return [Row(ip=value.ip, country=geo['country'])]

    def close(self):
        # cleanup в Python harness
        pass

stream.transform(
    "async_enrich",
    EnrichWithGeo(),
    output_type=Types.ROW_NAMED([...], [...]),
    timeout=5000,    # ms
    capacity=100,    # in-flight requests
)

Под капотом async function запускается в asyncio event loop в Python harness. До 100 concurrent requests в одном process — это полезно для I/O bound enrichment (API calls, DB lookups). CPU bound async UDF не дают benefit (GIL).


Custom serializer для Python types

Для нестандартных типов нужен custom serializer на Java side. PyFlink не позволяет создать чисто-Python custom serializer — каждое значение должно быть конвертируемо в один из поддерживаемых Types.

Workaround: сериализуйте сложный Python объект в bytes/string внутри MapFunction:

import pickle

def serialize_model_state(state):
    return pickle.dumps(state)

def deserialize_model_state(bytes):
    return pickle.loads(bytes)

stream.map(
    lambda x: (x.user_id, serialize_model_state(x.model)),
    output_type=Types.TUPLE([Types.STRING(), Types.PRIMITIVE_ARRAY(Types.BYTE())])
)

Это не максимально эффективно (pickle медленный, bytes тяжелые), но работает для случаев, когда нужно прокинуть unsupported тип через Flink stream. Для production — лучше переделать в проблемный тип в поддерживаемый Row/Tuple.


Connecting streams (CoFunction)

Two-stream operations через connect:

from pyflink.datastream.functions import CoProcessFunction

class Joiner(CoProcessFunction):
    def open(self, runtime_context):
        self.left_state = runtime_context.get_state(
            ValueStateDescriptor("left", Types.ROW(...))
        )
        self.right_state = runtime_context.get_state(
            ValueStateDescriptor("right", Types.ROW(...))
        )

    def process_element1(self, value, ctx):
        # event из левого stream
        right = self.right_state.value()
        if right is not None:
            yield merge(value, right)
        else:
            self.left_state.update(value)

    def process_element2(self, value, ctx):
        # event из правого stream
        left = self.left_state.value()
        if left is not None:
            yield merge(left, value)
        else:
            self.right_state.update(value)

stream1.connect(stream2) \
    .key_by(lambda x: x.user_id, lambda y: y.user_id) \
    .process(Joiner(), output_type=Types.ROW_NAMED([...], [...]))

Real production patterns

Pattern 1: Lightweight Python wrapper + Java state. Pipeline backbone в Java для производительности, Python только для финального enrichment (ML inference, custom output formatting). PyFlink поддерживает mixing — вы можете передавать stream между Java и Python operator-ами.

Pattern 2: Stateless Python для preprocessing. Парсинг raw bytes, нормализация, валидация — в stateless Python функциях. Дальше keyBy и stateful logic на Java/Scala. Python harness не несёт state cost.

Pattern 3: Pandas UDF для compute-heavy. Если нужно делать numpy/pandas computation на batch — Pandas UDF в 10-50x быстрее scalar Python. Кейс: aggregation over window, ML feature engineering.

Antipattern: stateful hot path в Python. Это самый частый failure mode — команда пишет KeyedProcessFunction в Python, тестирует на 1K events/sec, всё работает, в production на 100K events/sec — backpressure и job стопается.

WARNING

Любой stateful Python operator в hot path — потенциальный bottleneck. Перед production load test обязательно: запустить chaos test с peak throughput, измерить state RPC count через metrics, оценить cache hit rate. Если cache hit меньше 70% — кейс плох для Python.


Попробуй сам

  1. End-to-end Python job. Напишите job: KafkaSource -> parse JSON -> keyBy user_id -> KeyedProcessFunction с ValueState -> KafkaSink. Тест на 10K events/sec. Замерьте per-event latency и CPU.

  2. Side outputs. Расширьте job выше: добавьте late events stream (события старше 60 sec watermark) и suspicious stream (amount > $10K). Каждый side output пусть пишется в отдельный Kafka topic.

  3. Async enrichment. Реализуйте AsyncFunction, которая делает HTTP call к mock geo-API. Тест на разной capacity (10, 100, 500 concurrent). Найдите sweet spot для вашего API latency.

Проверка знанийKnowledge check
У вас PyFlink job с KeyedProcessFunction на 50 миллионах активных ключей. На каждое событие функция читает ValueState, обновляет, и регистрирует event-time timer на +1 минуту. После 6 часов работы наблюдаете растущий backpressure, RocksDB grows до 200GB, и checkpoint duration увеличился с 30 секунд до 4 минут. В чём проблема и как её решить?
ОтветAnswer
Проблема — timer state explosion. Каждое событие регистрирует новый timer, и timer storage в RocksDB растёт. На 50M ключей с регулярными updates у вас миллионы pending timers, что добавляет к RocksDB и убивает checkpoint speed (Flink должен записать все pending timers в checkpoint). Решения по приоритету: (1) De-duplicate timers: вместо register_event_time_timer на каждое событие, проверять не зарегистрирован ли уже timer для этого ключа. Использовать дополнительный ValueState "next_timer_ts" — если value != null и not yet fired, skip registration. Это снижает timer count на 80-95% для hot keys. (2) Coalesce timers: округлять timer timestamp до 1-минутных бакетов — много событий получают тот же timer вместо разных микросекундных. Меньше timer entries в RocksDB. (3) Использовать processing-time timers вместо event-time, если business logic это позволяет — они дешевле в storage и не зависят от watermark advancement. (4) Если background cleanup state не нужен таймером — использовать TTL state (state.set_ttl) вместо timers. TTL чистит данные lazy, без отдельной timer записи. (5) Уменьшить количество active keys через TTL state (state.set_ttl) или периодическим explicit cleanup — старые сессии удалять, RocksDB не растёт бесконечно. Для checkpoint speed: включить incremental checkpoints, чтобы только delta попадала в checkpoint storage.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Почему почти всегда нужно указывать output_type при использовании lambda в PyFlink DataStream API?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 4