Arrow Flight: высокоскоростной обмен данными
Проблема: JDBC/ODBC не подходят для больших данных
JDBC и ODBC были разработаны в 1990-х для передачи небольших результатов SQL-запросов. Их архитектура имеет фундаментальные ограничения для аналитических нагрузок:
~30 сек для 1 GB, CPU-bound serialization
Ключевые проблемы JDBC/ODBC:
- Row-oriented serialization: данные из колоночного хранилища конвертируются в строки, а клиент собирает их обратно в колонки — двойная бесполезная работа
- Text encoding: числа конвертируются в строки и обратно (JDBC)
- Однопоточность: один TCP stream, невозможно параллелизировать
- Нет streaming: весь результат буферизируется перед отправкой
Arrow Flight: columnar streaming по gRPC
Arrow Flight — это RPC-фреймворк для передачи Arrow-данных, построенный поверх gRPC. Вместо строчной сериализации Flight стримит RecordBatch’и в Arrow columnar format напрямую.
~0.3 сек для 1 GB, zero-serialization overhead
Преимущества Flight:
- Columnar format end-to-end: нет конвертации row ↔ columnar
- Zero-deserialization: RecordBatch на клиенте уже в Arrow format
- Parallel streams: несколько gRPC streams одновременно
- Streaming: клиент начинает обработку первого batch’а сразу
Архитектура Flight: FlightServer и FlightClient
Flight использует клиент-серверную модель с 4 ключевыми RPC-методами:
4 ключевых RPC-метода Arrow Flight
Ключевые RPC-методы
| Метод | Описание | Аналог в SQL |
|---|---|---|
GetFlightInfo | Метаданные о наборе данных (schema, размер, endpoints) | DESCRIBE TABLE |
DoGet | Получить данные как stream RecordBatch’ей | SELECT * FROM |
DoPut | Отправить данные на сервер | INSERT INTO |
ListFlights | Список доступных наборов данных | SHOW TABLES |
DoAction | Выполнить произвольное действие (compact, vacuum) | Stored procedure |
DoExchange | Двунаправленный обмен данными | — |
Пример: простой FlightServer на Python
import pyarrow as pa
import pyarrow.flight as flight
class SimpleFlightServer(flight.FlightServerBase):
"""Минимальный Flight-сервер с одним набором данных."""
def __init__(self, location):
super().__init__(location)
# Пример данных -- таблица сотрудников
self.data = pa.table({
'id': [1, 2, 3, 4, 5, 6],
'name': ['Анна', 'Борис', 'Вера', 'Григорий', 'Дарья', 'Евгений'],
'dept': ['IT', 'HR', 'IT', 'Finance', 'IT', 'HR'],
'salary': [95000, 78000, 112000, 89000, 103000, 81000]
})
def get_flight_info(self, context, descriptor):
"""Метаданные: schema + количество записей."""
schema = self.data.schema
endpoints = [flight.FlightEndpoint(
b"employees", [self.location]
)]
return flight.FlightInfo(
schema, descriptor, endpoints,
total_records=self.data.num_rows,
total_bytes=self.data.nbytes
)
def do_get(self, context, ticket):
"""Отдаём данные как stream RecordBatch'ей."""
# В реальности: фильтрация, партиционирование
return flight.RecordBatchStream(self.data)
def list_flights(self, context, criteria):
"""Список доступных datasets."""
descriptor = flight.FlightDescriptor.for_path("employees")
info = self.get_flight_info(context, descriptor)
yield info
# Запуск сервера
server = SimpleFlightServer("grpc://0.0.0.0:8815")
server.serve()
Пример: FlightClient
import pyarrow.flight as flight
# Подключение к серверу
client = flight.connect("grpc://localhost:8815")
# Список доступных данных
for fl in client.list_flights():
print(f"Path: {fl.descriptor.path}")
print(f"Schema: {fl.schema}")
print(f"Records: {fl.total_records}")
# Получение метаданных
descriptor = flight.FlightDescriptor.for_path("employees")
info = client.get_flight_info(descriptor)
# Чтение данных (streaming!)
reader = client.do_get(info.endpoints[0].ticket)
table = reader.read_all() # Arrow Table, ready to use
# Или обработка по batch'ам (streaming pipeline):
reader = client.do_get(info.endpoints[0].ticket)
for batch in reader:
# batch -- RecordBatch, обрабатываем без ожидания всех данных
process_batch(batch.data)
Flight vs REST API для передачи данных
REST API обычно использует JSON — текстовый формат, который парсится медленно и занимает в 3-5x больше места чем бинарные данные. Flight передаёт данные в Arrow binary format через gRPC (HTTP/2), что даёт: (1) бинарный формат без text encoding, (2) HTTP/2 multiplexing для параллельных streams, (3) native streaming без pagination, (4) schema enforcement на уровне протокола.
Flight SQL: SQL-запросы через Flight
Arrow Flight SQL — расширение Flight для отправки SQL-запросов. Клиент отправляет SQL-строку, сервер возвращает результат как Arrow RecordBatch stream:
import pyarrow.flight as flight
# Flight SQL клиент
client = flight.connect("grpc://analytics-server:8815")
# Отправка SQL-запроса
descriptor = flight.FlightDescriptor.for_command(
b"SELECT dept, AVG(salary) FROM employees GROUP BY dept"
)
info = client.get_flight_info(descriptor)
# Результат -- Arrow RecordBatch stream
reader = client.do_get(info.endpoints[0].ticket)
result = reader.read_all()
# result уже Arrow Table -- нет десериализации!
Flight SQL постепенно заменяет JDBC/ODBC для аналитических инструментов: DBeaver, Apache Superset, и другие BI-инструменты уже поддерживают Flight SQL-драйверы.
Производительность: Flight vs JDBC/ODBC
| Метрика | JDBC (PostgreSQL) | ODBC (MySQL) | Arrow Flight |
|---|---|---|---|
| 100 MB transfer | 3.2 сек | 4.1 сек | 0.15 сек |
| 1 GB transfer | 31 сек | 38 сек | 0.9 сек |
| 10 GB transfer | ~5 мин | ~6 мин | ~8 сек |
| Serialization | Row + text encode | Row + binary | Zero (Arrow native) |
| Parallelism | 1 stream | 1 stream | N parallel streams |
| First byte latency | After full query | After full query | After first batch |
Для 10 GB: Flight быстрее JDBC в ~40x. Разница растёт с размером данных, потому что Flight не тратит CPU на сериализацию.
Anti-pattern: JDBC для bulk data transfer
# ПЛОХО: JDBC для больших выгрузок
jdbc_df = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host/db") \
.option("dbtable", "large_table") \
.load()
# 10 GB → ~5 мин, row-by-row serialization
# ЛУЧШЕ: если сервер поддерживает Flight
flight_df = spark.read \
.format("arrow") \
.option("uri", "grpc://host:8815") \
.option("descriptor", "large_table") \
.load()
# 10 GB → ~8 сек, Arrow native streamingJDBC/ODBC подходят для OLTP (маленькие запросы, много пользователей). Для bulk аналитических выгрузок Arrow Flight даёт на порядки лучшую производительность.
Для углублённого изучения Arrow Flight Protocol (gRPC layer, middleware, security, production deployment) см. курс Storage Formats Deep-Dive.
Что дальше?
В следующем уроке мы рассмотрим, как Spark Connect использует Arrow Flight для клиент-серверной архитектуры Spark. Это позволяет запускать PySpark-код на лёгком клиенте без JVM, а Spark-кластер работает как удалённый сервер.