Кастомные каталоги: Catalog Plugin API
До Spark 3.0 понятие “каталог” в Spark было монолитным: один Hive Metastore (или SessionCatalog), одна точка правды о том, какие таблицы существуют. Это работало для простых архитектур, но не масштабировалось до реальности большого предприятия: несколько data lakes на разных облаках, Iceberg на S3, Delta Lake на ADLS, таблицы в Snowflake, функции в Unity Catalog.
Catalog Plugin API — это ответ на эту проблему. Он позволяет одновременно использовать несколько каталогов, каждый из которых реализует свою логику — от простого in-memory каталога для тестов до полноценного Iceberg REST Catalog с таблицами на нескольких облаках.
Конфигурация: spark.sql.catalog.*
Каталог регистрируется через конфигурацию:
# Регистрируем каталог с именем "iceberg"
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=rest
spark.sql.catalog.iceberg.uri=https://catalog.example.com
spark.sql.catalog.iceberg.oauth2-server-uri=https://auth.example.com/token
spark.sql.catalog.iceberg.credential=client-id:client-secret
# Регистрируем второй каталог "delta"
spark.sql.catalog.delta=io.delta.sql.DeltaCatalog
spark.sql.catalog.delta.type=hive
spark.sql.catalog.delta.uri=thrift://hive-metastore:9083
# Spark-сессионный каталог (заменяет стандартный SessionCatalog)
spark.sql.catalog.spark_catalog=io.delta.sql.DeltaCatalog
После этой конфигурации:
iceberg.production.events— таблицаeventsв namespaceproductionкаталогаicebergdelta.warehouse.orders— таблицаordersв namespacewarehouseкаталогаdeltadefault.users— таблица в стандартном каталоге (spark_catalog)
Все опции с префиксом spark.sql.catalog.NAME. передаются методу initialize() каталога с убранным префиксом. Это означает, что каталог получает {"type": "rest", "uri": "...", ...} — полную конфигурацию.
CatalogPlugin: базовый интерфейс
Все каталоги реализуют маркерный интерфейс CatalogPlugin:
// org.apache.spark.sql.connector.catalog.CatalogPlugin
public interface CatalogPlugin {
// Вызывается один раз при загрузке каталога
// name: имя каталога ("iceberg", "delta", "production")
// options: конфигурация с убранным префиксом
void initialize(String name, CaseInsensitiveStringMap options);
// Имя каталога
String name();
// Описание для explain() и логов
default String[] defaultNamespace() {
return new String[0]; // По умолчанию -- корневой namespace
}
}
CatalogPlugin — только маркер. Чтобы каталог умел что-то делать, он реализует дополнительные интерфейсы.
TableCatalog: CRUD для таблиц
TableCatalog — основной интерфейс для работы с таблицами:
// org.apache.spark.sql.connector.catalog.TableCatalog
public interface TableCatalog extends CatalogPlugin {
// Получить таблицу по имени
Table loadTable(Identifier ident) throws NoSuchTableException;
// Получить таблицу с версией (Iceberg/Delta time travel)
default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
throw new UnsupportedOperationException();
}
default Table loadTable(Identifier ident, String version) throws NoSuchTableException {
throw new UnsupportedOperationException();
}
// Создать таблицу
Table createTable(
Identifier ident,
Column[] columns, // Spark 4.0: Column вместо StructType
Transform[] partitions,
Map<String, String> properties
) throws TableAlreadyExistsException, NoSuchNamespaceException;
// Изменить таблицу (ALTER TABLE)
Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException;
// Удалить таблицу
boolean dropTable(Identifier ident);
// Переименовать таблицу
void renameTable(Identifier from, Identifier to)
throws NoSuchTableException, TableAlreadyExistsException;
// Проверить существование таблицы
default boolean tableExists(Identifier ident) {
try { loadTable(ident); return true; }
catch (NoSuchTableException e) { return false; }
}
// Список таблиц в namespace
Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;
}
TableChange — sealed hierarchy для ALTER TABLE:
// Добавить колонку
TableChange.addColumn(new String[]{"new_col"}, DataTypes.StringType, true, "comment")
// Переименовать колонку
TableChange.renameColumn(new String[]{"old_name"}, "new_name")
// Удалить колонку
TableChange.deleteColumn(new String[]{"col_to_delete"}, false)
// Изменить тип колонки (если поддерживается)
TableChange.updateColumnType(new String[]{"amount"}, DataTypes.DoubleType)
// Обновить свойства таблицы
TableChange.setProperty("write.target-file-size-bytes", "134217728")
TableChange.removeProperty("obsolete.property")
SupportsNamespaces: управление пространствами имён
// org.apache.spark.sql.connector.catalog.SupportsNamespaces
public interface SupportsNamespaces extends CatalogPlugin {
// Список namespace-ов в namespace (рекурсивно или на одном уровне)
String[][] listNamespaces() throws NoSuchNamespaceException;
String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException;
// Метаданные namespace
Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException;
// Создать namespace
void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException;
// Изменить namespace (комментарий, свойства)
void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException;
// Удалить namespace
boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException, NonEmptyNamespaceException;
// Проверить существование namespace
default boolean namespaceExists(String[] namespace) {
try { loadNamespaceMetadata(namespace); return true; }
catch (NoSuchNamespaceException e) { return false; }
}
}
FunctionCatalog: каталог функций
// org.apache.spark.sql.connector.catalog.FunctionCatalog
public interface FunctionCatalog extends CatalogPlugin {
// Список функций в namespace
Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException;
// Загрузить функцию
UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;
// Проверить существование функции
default boolean functionExists(Identifier ident) {
try { loadFunction(ident); return true; }
catch (NoSuchFunctionException e) { return false; }
}
}
UnboundFunction — это фабрика, которая создаёт конкретную BoundFunction при связывании с аргументами. BoundFunction возвращает результирующий тип и либо генерирует код (ScalarFunction), либо производит агрегацию (AggregateFunction).
Реализация кастомного каталога: in-memory каталог для тестов
Напишем полный каталог для unit-тестов и демонстрации — in-memory хранилище таблиц:
package com.example.catalog
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util
import java.util.concurrent.ConcurrentHashMap
class InMemoryCatalog extends TableCatalog with SupportsNamespaces {
private var catalogName: String = _
private val tables = new ConcurrentHashMap[Identifier, InMemoryTable]()
private val namespaces = new ConcurrentHashMap[String, util.Map[String, String]]()
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
catalogName = name
// Создаём namespace "default" автоматически
namespaces.put("default", new util.HashMap[String, String]())
}
override def name(): String = catalogName
// --- TableCatalog ---
override def listTables(namespace: Array[String]): Array[Identifier] = {
val nsKey = namespace.mkString(".")
tables.keys().asScala
.filter(_.namespace().mkString(".") == nsKey)
.toArray
}
override def loadTable(ident: Identifier): Table = {
Option(tables.get(ident)).getOrElse(
throw new NoSuchTableException(ident.toString)
)
}
override def createTable(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]
): Table = {
if (tables.containsKey(ident)) {
throw new TableAlreadyExistsException(ident.toString)
}
val schema = CatalogV2Util.v2ColumnsToStructType(columns)
val table = new InMemoryTable(ident.name(), schema, partitions, properties)
tables.put(ident, table)
table
}
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val existing = loadTable(ident)
// Применяем изменения к схеме (упрощённо)
val newTable = applyTableChanges(existing, changes)
tables.put(ident, newTable)
newTable
}
override def dropTable(ident: Identifier): Boolean =
tables.remove(ident) != null
override def renameTable(from: Identifier, to: Identifier): Unit = {
val table = loadTable(from)
tables.remove(from)
tables.put(to, table)
}
// --- SupportsNamespaces ---
override def listNamespaces(): Array[Array[String]] =
namespaces.keys().asScala.map(_.split("\\.")).toArray
override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
val prefix = namespace.mkString(".")
namespaces.keys().asScala
.filter(_.startsWith(prefix + "."))
.map(_.split("\\."))
.toArray
}
override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
val key = namespace.mkString(".")
Option(namespaces.get(key)).getOrElse(
throw new NoSuchNamespaceException(namespace.mkString("."))
)
}
override def createNamespace(
namespace: Array[String],
metadata: util.Map[String, String]
): Unit = {
val key = namespace.mkString(".")
if (namespaces.containsKey(key)) {
throw new NamespaceAlreadyExistsException(key)
}
namespaces.put(key, new util.HashMap[String, String](metadata))
}
override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
val key = namespace.mkString(".")
val meta = loadNamespaceMetadata(namespace)
changes.foreach {
case NamespaceChange.setProperty(key2, value) => meta.put(key2, value)
case NamespaceChange.removeProperty(key2) => meta.remove(key2)
}
}
override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = {
val key = namespace.mkString(".")
if (cascade) {
// Удаляем все таблицы в namespace
tables.keys().asScala
.filter(_.namespace().mkString(".") == key)
.foreach(tables.remove)
}
namespaces.remove(key) != null
}
}
Multi-catalog routing: как Spark выбирает каталог
Когда Spark встречает идентификатор вида catalog.namespace.table, он извлекает первый компонент (catalog) и ищет соответствующий каталог в реестре. Если каталог найден — следующие компоненты трактуются как namespace + имя таблицы.
Правила routing:
- Если идентификатор начинается с зарегистрированного имени каталога — используется этот каталог.
- Иначе — используется
spark_catalog(дефолтный SessionCatalog). USE CATALOG icebergменяет текущий каталог по умолчанию для сессии.SET CATALOG iceberg— алиас дляUSE CATALOG.
Как Iceberg реализует Catalog Plugin API
Apache Iceberg имеет несколько реализаций CatalogPlugin в зависимости от backend:
// Конфигурация для Iceberg REST catalog
spark.conf.set("spark.sql.catalog.icerest", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.icerest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
spark.conf.set("spark.sql.catalog.icerest.uri", "https://my-catalog.example.com")
spark.conf.set("spark.sql.catalog.icerest.warehouse", "s3://my-bucket/warehouse")
// SparkCatalog -- обёртка над нативным Iceberg Catalog
// SparkCatalog.loadTable() делает следующее:
// 1. Вызывает IcebergCatalog.loadTable(TableIdentifier) -- HTTP GET к REST Catalog
// 2. Получает TableMetadata с текущим snapshot, schema, partition spec, sort orders
// 3. Оборачивает в SparkTable (реализует Table + SupportsRead + SupportsWrite + ...)
// 4. При необходимости -- time travel: loadTable(ident, snapshotId или asOf timestamp)
// Создание Iceberg таблицы через Catalog API
spark.sql("""
CREATE TABLE icerest.production.events (
id BIGINT NOT NULL,
event_type STRING,
amount DOUBLE,
ts TIMESTAMP
) USING iceberg
PARTITIONED BY (days(ts))
TBLPROPERTIES (
'write.target-file-size-bytes' = '134217728',
'write.parquet.compression-codec' = 'zstd',
'read.split.target-size' = '134217728'
)
""")
// Iceberg создаёт: REST API вызов catalog/v1/namespaces/production/tables
// с TableMetadata.json, который будет версия 1 этой таблицы
// Time travel -- через TableCatalog.loadTable(ident, version/timestamp)
val df = spark.read.option("as-of-timestamp", "2024-01-15 12:00:00")
.table("icerest.production.events")
// Iceberg загрузит snapshot ближайший к этому времени
Iceberg также реализует StagingTableCatalog — расширение, позволяющее “stage” таблицу перед финальным commit:
// org.apache.spark.sql.connector.catalog.StagingTableCatalog
public interface StagingTableCatalog extends TableCatalog {
// Начать создание таблицы "staged" -- без реального создания в метасторе
StagedTable stageCreate(Identifier ident, Column[] columns,
Transform[] partitions, Map<String, String> properties);
// Staged замена существующей таблицы (атомарный swap)
StagedTable stageReplace(Identifier ident, Column[] columns,
Transform[] partitions, Map<String, String> properties);
// Staged создание или замена (CREATE OR REPLACE TABLE)
StagedTable stageCreateOrReplace(Identifier ident, Column[] columns,
Transform[] partitions, Map<String, String> properties);
}
StagedTable.commitStagedChanges() вызывается Spark после того, как все данные записаны. Для Iceberg это момент создания первого snapshot — таблица становится видимой в каталоге атомарно, вместе с данными. Без StagingTableCatalog была бы проблема: таблица создаётся в метасторе сначала, потом данные пишутся — если процесс упадёт между двумя этапами, в каталоге будет пустая таблица.
Unity Catalog: enterprise multi-catalog
Unity Catalog (Databricks open source) — это реализация нескольких интерфейсов:
spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog
spark.sql.catalog.unity.uri=https://unitycatalog.example.com
spark.sql.catalog.unity.token=dapi...
UCSingleCatalog реализует TableCatalog + SupportsNamespaces + FunctionCatalog:
- loadTable() делает REST вызов к Unity Catalog API:
GET /api/2.1/unity-catalog/tables/{full_name}. Ответ содержит storage location (S3/ADLS path), format (delta/iceberg/parquet), schema. - Возвращаемый
Table— это либоDeltaTable(если format=DELTA), либоIcebergTable(если format=ICEBERG), либоExternalParquetTable. - Governance: Unity Catalog проверяет права доступа при
loadTable(). Если у пользователя нет SELECT privilege —loadTable()выбрасываетSecurityExceptionи данные никогда не покидают хранилище. loadFunction()возвращает Python UDF или SQL UDF, зарегистрированные в Unity Catalog — они становятся доступны через SQL как обычные функции.
Реализация DelegatingCatalogExtension
Часто нужно не написать каталог с нуля, а расширить существующий — добавить audit logging, прозрачное шифрование, автоматические TBLPROPERTIES, row-level security. Для этого есть DelegatingCatalogExtension:
package com.example.catalog
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util
// Добавляем audit logging ко всем операциям с таблицами
class AuditingCatalog extends DelegatingCatalogExtension {
private var auditLogger: AuditLogger = _
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
super.initialize(name, options) // DelegatingCatalogExtension загружает делегата
// Делегат загружается через опцию "delegate" или стандартное имя
auditLogger = new AuditLogger(options.getOrDefault("audit.endpoint", ""))
}
override def loadTable(ident: Identifier): Table = {
val startTime = System.currentTimeMillis()
try {
val table = super.loadTable(ident)
auditLogger.log(AuditEvent.READ, ident, success = true,
durationMs = System.currentTimeMillis() - startTime)
table
} catch {
case e: NoSuchTableException =>
auditLogger.log(AuditEvent.READ, ident, success = false,
durationMs = System.currentTimeMillis() - startTime, error = Some(e.getMessage))
throw e
}
}
override def createTable(
ident: Identifier,
columns: Array[Column],
partitions: Array[Transform],
properties: util.Map[String, String]
): Table = {
// Добавляем обязательные TBLPROPERTIES к каждой новой таблице
val enrichedProperties = new util.HashMap[String, String](properties)
enrichedProperties.put("created.by", getCurrentUser())
enrichedProperties.put("created.at", System.currentTimeMillis().toString)
enrichedProperties.put("classification", properties.getOrDefault("classification", "internal"))
val table = super.createTable(ident, columns, partitions, enrichedProperties)
auditLogger.log(AuditEvent.CREATE, ident, success = true)
table
}
override def dropTable(ident: Identifier): Boolean = {
// Проверяем разрешение на удаление (например, через внешний policy engine)
if (!hasDropPermission(ident)) {
throw new SecurityException(s"Not authorized to drop table $ident")
}
val result = super.dropTable(ident)
if (result) auditLogger.log(AuditEvent.DROP, ident, success = true)
result
}
private def getCurrentUser(): String =
spark.sparkContext.sparkUser
}
Конфигурация: spark.sql.catalog.production=com.example.catalog.AuditingCatalog + указание на делегата (стандартный SessionCatalog или Iceberg).
Identifier: адресация объектов в каталоге
Identifier — это пара (namespace: Array[String], name: String):
// Создание Identifier
val ident = Identifier.of(Array("production", "europe"), "events")
// catalog: "icerest" (определяется контекстом)
// namespace: ["production", "europe"]
// name: "events"
// В SQL: icerest.production.europe.events
// Идентификатор в стандартном каталоге
val defaultIdent = Identifier.of(Array("default"), "users")
// В SQL: users (или default.users)
Глубина namespace-ов зависит от реализации каталога. Iceberg поддерживает многоуровневые namespace-ы (database.schema.table). Delta Lake обычно использует одноуровневые (database.table). Unity Catalog использует трёхуровневые (catalog.schema.table) — но с точки зрения Spark Plugin API каталог — это первый уровень, поэтому Unity передаёт двухуровневый namespace.
Попробуй сам
Исследуем multi-catalog через Iceberg (если доступен) или через встроенный InMemoryCatalog:
from pyspark.sql import SparkSession
# Настроим два каталога одновременно
spark = SparkSession.builder \
.appName("multi-catalog-demo") \
.master("local[*]") \
.config("spark.jars.packages",
"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local_iceberg",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local_iceberg.type", "hadoop") \
.config("spark.sql.catalog.local_iceberg.warehouse", "/tmp/iceberg-demo") \
.getOrCreate()
# --- Создаём namespace и таблицы в Iceberg каталоге ---
spark.sql("CREATE NAMESPACE IF NOT EXISTS local_iceberg.analytics")
spark.sql("CREATE NAMESPACE IF NOT EXISTS local_iceberg.raw")
spark.sql("""
CREATE TABLE IF NOT EXISTS local_iceberg.raw.events (
id BIGINT,
ts TIMESTAMP,
event_type STRING,
payload STRING
) USING iceberg
PARTITIONED BY (days(ts))
""")
spark.sql("""
CREATE TABLE IF NOT EXISTS local_iceberg.analytics.daily_summary (
date DATE,
event_type STRING,
count BIGINT
) USING iceberg
""")
# Вставляем данные
spark.sql("""
INSERT INTO local_iceberg.raw.events VALUES
(1, TIMESTAMP '2024-01-15 10:00:00', 'purchase', '{"amount": 99.9}'),
(2, TIMESTAMP '2024-01-15 11:00:00', 'view', '{"product_id": 42}'),
(3, TIMESTAMP '2024-01-16 09:00:00', 'purchase', '{"amount": 149.9}')
""")
# Cross-catalog запрос: читаем из Iceberg, пишем в SessionCatalog
spark.sql("""
INSERT INTO local_iceberg.analytics.daily_summary
SELECT DATE(ts), event_type, COUNT(*) as count
FROM local_iceberg.raw.events
GROUP BY DATE(ts), event_type
""")
# --- Исследуем Iceberg time travel через catalog API ---
spark.sql("INSERT INTO local_iceberg.raw.events VALUES (4, TIMESTAMP '2024-01-17 12:00:00', 'refund', '{}')")
# Текущее состояние
print("=== Текущее состояние ===")
spark.table("local_iceberg.raw.events").show()
# Исторические snapshot-ы
print("=== Snapshot history ===")
spark.sql("SELECT * FROM local_iceberg.raw.events.history").show()
# Time travel -- snapshot до последней вставки
snapshots = spark.sql("SELECT snapshot_id FROM local_iceberg.raw.events.snapshots ORDER BY committed_at").collect()
if len(snapshots) >= 2:
first_snapshot = snapshots[0]["snapshot_id"]
print(f"=== Time travel к snapshot {first_snapshot} ===")
spark.read \
.option("snapshot-id", first_snapshot) \
.table("local_iceberg.raw.events") \
.show()
# --- Список namespace-ов через catalog API ---
print("=== Namespaces в local_iceberg ===")
spark.sql("SHOW NAMESPACES IN local_iceberg").show()
print("=== Таблицы в local_iceberg.raw ===")
spark.sql("SHOW TABLES IN local_iceberg.raw").show()
# --- Переключаем текущий каталог ---
spark.sql("USE CATALOG local_iceberg")
spark.sql("USE raw")
# Теперь SELECT * FROM events работает без qualified name
spark.sql("SELECT * FROM events").show()
# --- Смешанные запросы из двух каталогов ---
# Данные из Iceberg + встроенный SessionCatalog
spark.sql("USE CATALOG spark_catalog")
spark.sql("CREATE TABLE IF NOT EXISTS enrichment (event_type STRING, label STRING)")
spark.sql("INSERT INTO enrichment VALUES ('purchase', 'Revenue'), ('view', 'Engagement'), ('refund', 'Return')")
# JOIN данных из разных каталогов
spark.sql("""
SELECT e.ts, e.event_type, en.label
FROM local_iceberg.raw.events e
JOIN spark_catalog.default.enrichment en ON e.event_type = en.event_type
ORDER BY e.ts
""").show()
При разработке кастомного каталога добавьте реализацию supportsNamespaceProperty(String key) — это позволит Spark подсказывать UI какие свойства namespace поддерживаются (комментарии, owner, location). Без этого DESCRIBE NAMESPACE EXTENDED не покажет кастомные свойства. В Spark 4.0 этот метод по умолчанию возвращает false для всех ключей.
Транзакционная семантика через StagingTableCatalog
Паттерн для надёжной записи данных в кастомный каталог:
// Spark автоматически использует StagingTableCatalog если каталог его реализует
// При CREATE TABLE AS SELECT или INSERT OVERWRITE
// Вот что происходит под капотом при CREATE TABLE AS SELECT
// в каталоге, реализующем StagingTableCatalog:
// Шаг 1: catalog.stageCreate(ident, columns, partitions, properties)
// -> Возвращает StagedTable -- таблица ещё не видна в каталоге
// Шаг 2: Spark записывает данные через StagedTable.newWriteBuilder()
// -> DataWriter.write() на каждом executor
// -> DataWriter.commit() возвращает WriterCommitMessage
// Шаг 3: Если все executor'ы успешно: stagedTable.commitStagedChanges()
// -> Таблица становится видимой в каталоге с данными атомарно
// Шаг 4: Если любой executor упал: stagedTable.abortStagedChanges()
// -> Staged данные удаляются, таблица не появляется в каталоге
// Это именно то, что делает Iceberg:
// commitStagedChanges() = создать первый Iceberg snapshot с manifest-файлами
// Пока snapshot не создан, данные физически есть на S3, но таблица "не существует"
Apache Iceberg: устройство таблиц и транзакции