Learning Platform
Глоссарий Troubleshooting
Урок 11.03 · 14 мин
Продвинутый
Arrow FlightgRPCFlightServerFlightClientDoGetDoPutGetFlightInfo

Arrow Flight: высокоскоростной обмен данными

Проблема: JDBC/ODBC не подходят для больших данных

JDBC и ODBC были разработаны в 1990-х для передачи небольших результатов SQL-запросов. Их архитектура имеет фундаментальные ограничения для аналитических нагрузок:

JDBC/ODBC pipeline

~30 сек для 1 GB, CPU-bound serialization

СерверColumnar Storage
1. Row-by-row serialization
2. TCP transfer (with row framing)
text/binary encoding
КлиентParse rows → rebuild columns

Ключевые проблемы JDBC/ODBC:

  • Row-oriented serialization: данные из колоночного хранилища конвертируются в строки, а клиент собирает их обратно в колонки — двойная бесполезная работа
  • Text encoding: числа конвертируются в строки и обратно (JDBC)
  • Однопоточность: один TCP stream, невозможно параллелизировать
  • Нет streaming: весь результат буферизируется перед отправкой

Arrow Flight: columnar streaming по gRPC

Arrow Flight — это RPC-фреймворк для передачи Arrow-данных, построенный поверх gRPC. Вместо строчной сериализации Flight стримит RecordBatch’и в Arrow columnar format напрямую.

Arrow Flight pipeline

~0.3 сек для 1 GB, zero-serialization overhead

СерверColumnar Storage
RecordBatch #1
RecordBatch #2
RecordBatch #3
…parallel streams…
КлиентArrow RecordBatchready to process (no deser.)

Преимущества Flight:

  • Columnar format end-to-end: нет конвертации row ↔ columnar
  • Zero-deserialization: RecordBatch на клиенте уже в Arrow format
  • Parallel streams: несколько gRPC streams одновременно
  • Streaming: клиент начинает обработку первого batch’а сразу

Архитектура Flight: FlightServer и FlightClient

Flight использует клиент-серверную модель с 4 ключевыми RPC-методами:

Flight: Client ↔ Server RPC

4 ключевых RPC-метода Arrow Flight

FlightClient
GetFlightInfo(desc)«Какие данные доступны?»
← FlightInfoschema, endpoints, total_records
DoGet(ticket)«Отдай данные»
◄═ RecordBatch streamcolumnar data
DoPut(stream)«Прими данные»
═► RecordBatch streamcolumnar data
ListFlights(criteria)«Что есть?»
← FlightInfo listlist of datasets
FlightServer

Ключевые RPC-методы

МетодОписаниеАналог в SQL
GetFlightInfoМетаданные о наборе данных (schema, размер, endpoints)DESCRIBE TABLE
DoGetПолучить данные как stream RecordBatch’ейSELECT * FROM
DoPutОтправить данные на серверINSERT INTO
ListFlightsСписок доступных наборов данныхSHOW TABLES
DoActionВыполнить произвольное действие (compact, vacuum)Stored procedure
DoExchangeДвунаправленный обмен данными

Пример: простой FlightServer на Python

import pyarrow as pa
import pyarrow.flight as flight

class SimpleFlightServer(flight.FlightServerBase):
    """Минимальный Flight-сервер с одним набором данных."""

    def __init__(self, location):
        super().__init__(location)
        # Пример данных -- таблица сотрудников
        self.data = pa.table({
            'id': [1, 2, 3, 4, 5, 6],
            'name': ['Анна', 'Борис', 'Вера', 'Григорий', 'Дарья', 'Евгений'],
            'dept': ['IT', 'HR', 'IT', 'Finance', 'IT', 'HR'],
            'salary': [95000, 78000, 112000, 89000, 103000, 81000]
        })

    def get_flight_info(self, context, descriptor):
        """Метаданные: schema + количество записей."""
        schema = self.data.schema
        endpoints = [flight.FlightEndpoint(
            b"employees", [self.location]
        )]
        return flight.FlightInfo(
            schema, descriptor, endpoints,
            total_records=self.data.num_rows,
            total_bytes=self.data.nbytes
        )

    def do_get(self, context, ticket):
        """Отдаём данные как stream RecordBatch'ей."""
        # В реальности: фильтрация, партиционирование
        return flight.RecordBatchStream(self.data)

    def list_flights(self, context, criteria):
        """Список доступных datasets."""
        descriptor = flight.FlightDescriptor.for_path("employees")
        info = self.get_flight_info(context, descriptor)
        yield info

# Запуск сервера
server = SimpleFlightServer("grpc://0.0.0.0:8815")
server.serve()

Пример: FlightClient

import pyarrow.flight as flight

# Подключение к серверу
client = flight.connect("grpc://localhost:8815")

# Список доступных данных
for fl in client.list_flights():
    print(f"Path: {fl.descriptor.path}")
    print(f"Schema: {fl.schema}")
    print(f"Records: {fl.total_records}")

# Получение метаданных
descriptor = flight.FlightDescriptor.for_path("employees")
info = client.get_flight_info(descriptor)

# Чтение данных (streaming!)
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all()  # Arrow Table, ready to use

# Или обработка по batch'ам (streaming pipeline):
reader = client.do_get(info.endpoints[0].ticket)
for batch in reader:
    # batch -- RecordBatch, обрабатываем без ожидания всех данных
    process_batch(batch.data)
TIP

Flight vs REST API для передачи данных

REST API обычно использует JSON — текстовый формат, который парсится медленно и занимает в 3-5x больше места чем бинарные данные. Flight передаёт данные в Arrow binary format через gRPC (HTTP/2), что даёт: (1) бинарный формат без text encoding, (2) HTTP/2 multiplexing для параллельных streams, (3) native streaming без pagination, (4) schema enforcement на уровне протокола.

Flight SQL: SQL-запросы через Flight

Arrow Flight SQL — расширение Flight для отправки SQL-запросов. Клиент отправляет SQL-строку, сервер возвращает результат как Arrow RecordBatch stream:

import pyarrow.flight as flight

# Flight SQL клиент
client = flight.connect("grpc://analytics-server:8815")

# Отправка SQL-запроса
descriptor = flight.FlightDescriptor.for_command(
    b"SELECT dept, AVG(salary) FROM employees GROUP BY dept"
)
info = client.get_flight_info(descriptor)

# Результат -- Arrow RecordBatch stream
reader = client.do_get(info.endpoints[0].ticket)
result = reader.read_all()
# result уже Arrow Table -- нет десериализации!

Flight SQL постепенно заменяет JDBC/ODBC для аналитических инструментов: DBeaver, Apache Superset, и другие BI-инструменты уже поддерживают Flight SQL-драйверы.

Производительность: Flight vs JDBC/ODBC

МетрикаJDBC (PostgreSQL)ODBC (MySQL)Arrow Flight
100 MB transfer3.2 сек4.1 сек0.15 сек
1 GB transfer31 сек38 сек0.9 сек
10 GB transfer~5 мин~6 мин~8 сек
SerializationRow + text encodeRow + binaryZero (Arrow native)
Parallelism1 stream1 streamN parallel streams
First byte latencyAfter full queryAfter full queryAfter first batch

Для 10 GB: Flight быстрее JDBC в ~40x. Разница растёт с размером данных, потому что Flight не тратит CPU на сериализацию.

WARNING

Anti-pattern: JDBC для bulk data transfer

# ПЛОХО: JDBC для больших выгрузок
jdbc_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host/db") \
    .option("dbtable", "large_table") \
    .load()
# 10 GB → ~5 мин, row-by-row serialization

# ЛУЧШЕ: если сервер поддерживает Flight
flight_df = spark.read \
    .format("arrow") \
    .option("uri", "grpc://host:8815") \
    .option("descriptor", "large_table") \
    .load()
# 10 GB → ~8 сек, Arrow native streaming

JDBC/ODBC подходят для OLTP (маленькие запросы, много пользователей). Для bulk аналитических выгрузок Arrow Flight даёт на порядки лучшую производительность.

Проверка знанийKnowledge check
Какие 4 ключевых RPC-метода предоставляет Arrow Flight и для чего каждый из них используется?
ОтветAnswer
(1) GetFlightInfo -- получение метаданных о наборе данных: schema, количество записей, endpoints для чтения. (2) DoGet -- чтение данных как stream Arrow RecordBatch'ей; аналог SELECT. (3) DoPut -- запись данных на сервер через stream RecordBatch'ей; аналог INSERT. (4) ListFlights -- получение списка доступных наборов данных; аналог SHOW TABLES. Дополнительно: DoAction для произвольных операций и DoExchange для двунаправленного обмена.
Проверка знанийKnowledge check
Почему Arrow Flight в 10-100x быстрее JDBC для передачи больших объёмов данных?
ОтветAnswer
Три причины: (1) Zero serialization -- JDBC конвертирует данные из колоночного хранилища в строки с text encoding, Flight передаёт Arrow RecordBatch'и напрямую (columnar end-to-end). (2) Parallel streams -- gRPC поддерживает множество параллельных HTTP/2 streams, JDBC ограничен одним TCP соединением. (3) Streaming -- Flight начинает передачу первого batch'а сразу, JDBC буферизирует весь результат. Для 10 GB данных: JDBC ~5 мин, Flight ~8 сек.
TIP

Для углублённого изучения Arrow Flight Protocol (gRPC layer, middleware, security, production deployment) см. курс Storage Formats Deep-Dive.

Что дальше?

В следующем уроке мы рассмотрим, как Spark Connect использует Arrow Flight для клиент-серверной архитектуры Spark. Это позволяет запускать PySpark-код на лёгком клиенте без JVM, а Spark-кластер работает как удалённый сервер.

Проверьте понимание

Результат: 0 из 0
Аналитический
Вопрос 1 из 4. Почему JDBC/ODBC не подходят для передачи больших аналитических результатов (1+ ГБ)?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 6