Learning Platform
Глоссарий Troubleshooting
Урок 12.05 · 32 мин
Продвинутый
Catalog Plugin APICatalogPluginTableCatalogSupportsNamespacesmulti-catalogIcebergDelta Lake

Кастомные каталоги: Catalog Plugin API

До Spark 3.0 понятие “каталог” в Spark было монолитным: один Hive Metastore (или SessionCatalog), одна точка правды о том, какие таблицы существуют. Это работало для простых архитектур, но не масштабировалось до реальности большого предприятия: несколько data lakes на разных облаках, Iceberg на S3, Delta Lake на ADLS, таблицы в Snowflake, функции в Unity Catalog.

Catalog Plugin API — это ответ на эту проблему. Он позволяет одновременно использовать несколько каталогов, каждый из которых реализует свою логику — от простого in-memory каталога для тестов до полноценного Iceberg REST Catalog с таблицами на нескольких облаках.

Конфигурация: spark.sql.catalog.*

Каталог регистрируется через конфигурацию:

# Регистрируем каталог с именем "iceberg"
spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type=rest
spark.sql.catalog.iceberg.uri=https://catalog.example.com
spark.sql.catalog.iceberg.oauth2-server-uri=https://auth.example.com/token
spark.sql.catalog.iceberg.credential=client-id:client-secret

# Регистрируем второй каталог "delta"
spark.sql.catalog.delta=io.delta.sql.DeltaCatalog
spark.sql.catalog.delta.type=hive
spark.sql.catalog.delta.uri=thrift://hive-metastore:9083

# Spark-сессионный каталог (заменяет стандартный SessionCatalog)
spark.sql.catalog.spark_catalog=io.delta.sql.DeltaCatalog

После этой конфигурации:

  • iceberg.production.events — таблица events в namespace production каталога iceberg
  • delta.warehouse.orders — таблица orders в namespace warehouse каталога delta
  • default.users — таблица в стандартном каталоге (spark_catalog)

Все опции с префиксом spark.sql.catalog.NAME. передаются методу initialize() каталога с убранным префиксом. Это означает, что каталог получает {"type": "rest", "uri": "...", ...} — полную конфигурацию.

CatalogPlugin: базовый интерфейс

Все каталоги реализуют маркерный интерфейс CatalogPlugin:

// org.apache.spark.sql.connector.catalog.CatalogPlugin
public interface CatalogPlugin {

  // Вызывается один раз при загрузке каталога
  // name: имя каталога ("iceberg", "delta", "production")
  // options: конфигурация с убранным префиксом
  void initialize(String name, CaseInsensitiveStringMap options);

  // Имя каталога
  String name();

  // Описание для explain() и логов
  default String[] defaultNamespace() {
    return new String[0];  // По умолчанию -- корневой namespace
  }
}

CatalogPlugin — только маркер. Чтобы каталог умел что-то делать, он реализует дополнительные интерфейсы.

TableCatalog: CRUD для таблиц

TableCatalog — основной интерфейс для работы с таблицами:

// org.apache.spark.sql.connector.catalog.TableCatalog
public interface TableCatalog extends CatalogPlugin {

  // Получить таблицу по имени
  Table loadTable(Identifier ident) throws NoSuchTableException;

  // Получить таблицу с версией (Iceberg/Delta time travel)
  default Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
    throw new UnsupportedOperationException();
  }
  default Table loadTable(Identifier ident, String version) throws NoSuchTableException {
    throw new UnsupportedOperationException();
  }

  // Создать таблицу
  Table createTable(
    Identifier ident,
    Column[] columns,      // Spark 4.0: Column вместо StructType
    Transform[] partitions,
    Map<String, String> properties
  ) throws TableAlreadyExistsException, NoSuchNamespaceException;

  // Изменить таблицу (ALTER TABLE)
  Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException;

  // Удалить таблицу
  boolean dropTable(Identifier ident);

  // Переименовать таблицу
  void renameTable(Identifier from, Identifier to)
    throws NoSuchTableException, TableAlreadyExistsException;

  // Проверить существование таблицы
  default boolean tableExists(Identifier ident) {
    try { loadTable(ident); return true; }
    catch (NoSuchTableException e) { return false; }
  }

  // Список таблиц в namespace
  Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;
}

TableChange — sealed hierarchy для ALTER TABLE:

// Добавить колонку
TableChange.addColumn(new String[]{"new_col"}, DataTypes.StringType, true, "comment")

// Переименовать колонку
TableChange.renameColumn(new String[]{"old_name"}, "new_name")

// Удалить колонку
TableChange.deleteColumn(new String[]{"col_to_delete"}, false)

// Изменить тип колонки (если поддерживается)
TableChange.updateColumnType(new String[]{"amount"}, DataTypes.DoubleType)

// Обновить свойства таблицы
TableChange.setProperty("write.target-file-size-bytes", "134217728")
TableChange.removeProperty("obsolete.property")

SupportsNamespaces: управление пространствами имён

// org.apache.spark.sql.connector.catalog.SupportsNamespaces
public interface SupportsNamespaces extends CatalogPlugin {

  // Список namespace-ов в namespace (рекурсивно или на одном уровне)
  String[][] listNamespaces() throws NoSuchNamespaceException;
  String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException;

  // Метаданные namespace
  Map<String, String> loadNamespaceMetadata(String[] namespace)
    throws NoSuchNamespaceException;

  // Создать namespace
  void createNamespace(String[] namespace, Map<String, String> metadata)
    throws NamespaceAlreadyExistsException;

  // Изменить namespace (комментарий, свойства)
  void alterNamespace(String[] namespace, NamespaceChange... changes)
    throws NoSuchNamespaceException;

  // Удалить namespace
  boolean dropNamespace(String[] namespace, boolean cascade)
    throws NoSuchNamespaceException, NonEmptyNamespaceException;

  // Проверить существование namespace
  default boolean namespaceExists(String[] namespace) {
    try { loadNamespaceMetadata(namespace); return true; }
    catch (NoSuchNamespaceException e) { return false; }
  }
}

FunctionCatalog: каталог функций

// org.apache.spark.sql.connector.catalog.FunctionCatalog
public interface FunctionCatalog extends CatalogPlugin {

  // Список функций в namespace
  Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException;

  // Загрузить функцию
  UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException;

  // Проверить существование функции
  default boolean functionExists(Identifier ident) {
    try { loadFunction(ident); return true; }
    catch (NoSuchFunctionException e) { return false; }
  }
}

UnboundFunction — это фабрика, которая создаёт конкретную BoundFunction при связывании с аргументами. BoundFunction возвращает результирующий тип и либо генерирует код (ScalarFunction), либо производит агрегацию (AggregateFunction).

Реализация кастомного каталога: in-memory каталог для тестов

Напишем полный каталог для unit-тестов и демонстрации — in-memory хранилище таблиц:

package com.example.catalog

import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util
import java.util.concurrent.ConcurrentHashMap

class InMemoryCatalog extends TableCatalog with SupportsNamespaces {

  private var catalogName: String = _
  private val tables = new ConcurrentHashMap[Identifier, InMemoryTable]()
  private val namespaces = new ConcurrentHashMap[String, util.Map[String, String]]()

  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
    catalogName = name
    // Создаём namespace "default" автоматически
    namespaces.put("default", new util.HashMap[String, String]())
  }

  override def name(): String = catalogName

  // --- TableCatalog ---

  override def listTables(namespace: Array[String]): Array[Identifier] = {
    val nsKey = namespace.mkString(".")
    tables.keys().asScala
      .filter(_.namespace().mkString(".") == nsKey)
      .toArray
  }

  override def loadTable(ident: Identifier): Table = {
    Option(tables.get(ident)).getOrElse(
      throw new NoSuchTableException(ident.toString)
    )
  }

  override def createTable(
    ident: Identifier,
    columns: Array[Column],
    partitions: Array[Transform],
    properties: util.Map[String, String]
  ): Table = {
    if (tables.containsKey(ident)) {
      throw new TableAlreadyExistsException(ident.toString)
    }
    val schema = CatalogV2Util.v2ColumnsToStructType(columns)
    val table = new InMemoryTable(ident.name(), schema, partitions, properties)
    tables.put(ident, table)
    table
  }

  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
    val existing = loadTable(ident)
    // Применяем изменения к схеме (упрощённо)
    val newTable = applyTableChanges(existing, changes)
    tables.put(ident, newTable)
    newTable
  }

  override def dropTable(ident: Identifier): Boolean =
    tables.remove(ident) != null

  override def renameTable(from: Identifier, to: Identifier): Unit = {
    val table = loadTable(from)
    tables.remove(from)
    tables.put(to, table)
  }

  // --- SupportsNamespaces ---

  override def listNamespaces(): Array[Array[String]] =
    namespaces.keys().asScala.map(_.split("\\.")).toArray

  override def listNamespaces(namespace: Array[String]): Array[Array[String]] = {
    val prefix = namespace.mkString(".")
    namespaces.keys().asScala
      .filter(_.startsWith(prefix + "."))
      .map(_.split("\\."))
      .toArray
  }

  override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = {
    val key = namespace.mkString(".")
    Option(namespaces.get(key)).getOrElse(
      throw new NoSuchNamespaceException(namespace.mkString("."))
    )
  }

  override def createNamespace(
    namespace: Array[String],
    metadata: util.Map[String, String]
  ): Unit = {
    val key = namespace.mkString(".")
    if (namespaces.containsKey(key)) {
      throw new NamespaceAlreadyExistsException(key)
    }
    namespaces.put(key, new util.HashMap[String, String](metadata))
  }

  override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = {
    val key = namespace.mkString(".")
    val meta = loadNamespaceMetadata(namespace)
    changes.foreach {
      case NamespaceChange.setProperty(key2, value) => meta.put(key2, value)
      case NamespaceChange.removeProperty(key2) => meta.remove(key2)
    }
  }

  override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = {
    val key = namespace.mkString(".")
    if (cascade) {
      // Удаляем все таблицы в namespace
      tables.keys().asScala
        .filter(_.namespace().mkString(".") == key)
        .foreach(tables.remove)
    }
    namespaces.remove(key) != null
  }
}

Multi-catalog routing: как Spark выбирает каталог

Когда Spark встречает идентификатор вида catalog.namespace.table, он извлекает первый компонент (catalog) и ищет соответствующий каталог в реестре. Если каталог найден — следующие компоненты трактуются как namespace + имя таблицы.

Multi-catalog routing в Spark 4.0
SQL: iceberg.production.eventsSQL запрос или DataFrame API: FROM iceberg.production.events. Analyzer извлекает первый компонент 'iceberg' и ищет его в CatalogManager
Analyzer: MultipartIdentifierHelper.parseMultipartIdentifier
CatalogManagerCatalogManager -- реестр каталогов. Создаёт экземпляры CatalogPlugin по имени при первом обращении через Class.forName(). Кэширует созданные экземпляры
catalog='iceberg'
catalog='delta'
catalog='spark_catalog'
IcebergCatalogSparkCatalog (Iceberg) -- реализует TableCatalog + SupportsNamespaces. Загружает таблицы через Iceberg catalog (REST, Glue, HMS, Nessie). Каждый loadTable() -- запрос к REST API каталога
DeltaCatalogDeltaCatalog -- реализует TableCatalog + DeltaCatalogBase. Хранит метаданные в Delta Log файлах (transaction log на filesystem). Может работать с HMS или без
SessionCatalogspark_catalog -- встроенный SessionCatalog. Хранит в Hive Metastore (или Derby для локального режима). Обратная совместимость с таблицами без qualified catalog prefix

Правила routing:

  1. Если идентификатор начинается с зарегистрированного имени каталога — используется этот каталог.
  2. Иначе — используется spark_catalog (дефолтный SessionCatalog).
  3. USE CATALOG iceberg меняет текущий каталог по умолчанию для сессии.
  4. SET CATALOG iceberg — алиас для USE CATALOG.

Как Iceberg реализует Catalog Plugin API

Apache Iceberg имеет несколько реализаций CatalogPlugin в зависимости от backend:

// Конфигурация для Iceberg REST catalog
spark.conf.set("spark.sql.catalog.icerest", "org.apache.iceberg.spark.SparkCatalog")
spark.conf.set("spark.sql.catalog.icerest.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
spark.conf.set("spark.sql.catalog.icerest.uri", "https://my-catalog.example.com")
spark.conf.set("spark.sql.catalog.icerest.warehouse", "s3://my-bucket/warehouse")

// SparkCatalog -- обёртка над нативным Iceberg Catalog
// SparkCatalog.loadTable() делает следующее:
// 1. Вызывает IcebergCatalog.loadTable(TableIdentifier) -- HTTP GET к REST Catalog
// 2. Получает TableMetadata с текущим snapshot, schema, partition spec, sort orders
// 3. Оборачивает в SparkTable (реализует Table + SupportsRead + SupportsWrite + ...)
// 4. При необходимости -- time travel: loadTable(ident, snapshotId или asOf timestamp)

// Создание Iceberg таблицы через Catalog API
spark.sql("""
    CREATE TABLE icerest.production.events (
        id BIGINT NOT NULL,
        event_type STRING,
        amount DOUBLE,
        ts TIMESTAMP
    ) USING iceberg
    PARTITIONED BY (days(ts))
    TBLPROPERTIES (
        'write.target-file-size-bytes' = '134217728',
        'write.parquet.compression-codec' = 'zstd',
        'read.split.target-size' = '134217728'
    )
""")
// Iceberg создаёт: REST API вызов catalog/v1/namespaces/production/tables
// с TableMetadata.json, который будет версия 1 этой таблицы

// Time travel -- через TableCatalog.loadTable(ident, version/timestamp)
val df = spark.read.option("as-of-timestamp", "2024-01-15 12:00:00")
    .table("icerest.production.events")
// Iceberg загрузит snapshot ближайший к этому времени

Iceberg также реализует StagingTableCatalog — расширение, позволяющее “stage” таблицу перед финальным commit:

// org.apache.spark.sql.connector.catalog.StagingTableCatalog
public interface StagingTableCatalog extends TableCatalog {

  // Начать создание таблицы "staged" -- без реального создания в метасторе
  StagedTable stageCreate(Identifier ident, Column[] columns,
    Transform[] partitions, Map<String, String> properties);

  // Staged замена существующей таблицы (атомарный swap)
  StagedTable stageReplace(Identifier ident, Column[] columns,
    Transform[] partitions, Map<String, String> properties);

  // Staged создание или замена (CREATE OR REPLACE TABLE)
  StagedTable stageCreateOrReplace(Identifier ident, Column[] columns,
    Transform[] partitions, Map<String, String> properties);
}

StagedTable.commitStagedChanges() вызывается Spark после того, как все данные записаны. Для Iceberg это момент создания первого snapshot — таблица становится видимой в каталоге атомарно, вместе с данными. Без StagingTableCatalog была бы проблема: таблица создаётся в метасторе сначала, потом данные пишутся — если процесс упадёт между двумя этапами, в каталоге будет пустая таблица.

Unity Catalog: enterprise multi-catalog

Unity Catalog (Databricks open source) — это реализация нескольких интерфейсов:

spark.sql.catalog.unity=io.unitycatalog.spark.UCSingleCatalog
spark.sql.catalog.unity.uri=https://unitycatalog.example.com
spark.sql.catalog.unity.token=dapi...

UCSingleCatalog реализует TableCatalog + SupportsNamespaces + FunctionCatalog:

  • loadTable() делает REST вызов к Unity Catalog API: GET /api/2.1/unity-catalog/tables/{full_name}. Ответ содержит storage location (S3/ADLS path), format (delta/iceberg/parquet), schema.
  • Возвращаемый Table — это либо DeltaTable (если format=DELTA), либо IcebergTable (если format=ICEBERG), либо ExternalParquetTable.
  • Governance: Unity Catalog проверяет права доступа при loadTable(). Если у пользователя нет SELECT privilege — loadTable() выбрасывает SecurityException и данные никогда не покидают хранилище.
  • loadFunction() возвращает Python UDF или SQL UDF, зарегистрированные в Unity Catalog — они становятся доступны через SQL как обычные функции.

Реализация DelegatingCatalogExtension

Часто нужно не написать каталог с нуля, а расширить существующий — добавить audit logging, прозрачное шифрование, автоматические TBLPROPERTIES, row-level security. Для этого есть DelegatingCatalogExtension:

package com.example.catalog

import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util

// Добавляем audit logging ко всем операциям с таблицами
class AuditingCatalog extends DelegatingCatalogExtension {

  private var auditLogger: AuditLogger = _

  override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
    super.initialize(name, options)  // DelegatingCatalogExtension загружает делегата
    // Делегат загружается через опцию "delegate" или стандартное имя
    auditLogger = new AuditLogger(options.getOrDefault("audit.endpoint", ""))
  }

  override def loadTable(ident: Identifier): Table = {
    val startTime = System.currentTimeMillis()
    try {
      val table = super.loadTable(ident)
      auditLogger.log(AuditEvent.READ, ident, success = true,
        durationMs = System.currentTimeMillis() - startTime)
      table
    } catch {
      case e: NoSuchTableException =>
        auditLogger.log(AuditEvent.READ, ident, success = false,
          durationMs = System.currentTimeMillis() - startTime, error = Some(e.getMessage))
        throw e
    }
  }

  override def createTable(
    ident: Identifier,
    columns: Array[Column],
    partitions: Array[Transform],
    properties: util.Map[String, String]
  ): Table = {
    // Добавляем обязательные TBLPROPERTIES к каждой новой таблице
    val enrichedProperties = new util.HashMap[String, String](properties)
    enrichedProperties.put("created.by", getCurrentUser())
    enrichedProperties.put("created.at", System.currentTimeMillis().toString)
    enrichedProperties.put("classification", properties.getOrDefault("classification", "internal"))

    val table = super.createTable(ident, columns, partitions, enrichedProperties)
    auditLogger.log(AuditEvent.CREATE, ident, success = true)
    table
  }

  override def dropTable(ident: Identifier): Boolean = {
    // Проверяем разрешение на удаление (например, через внешний policy engine)
    if (!hasDropPermission(ident)) {
      throw new SecurityException(s"Not authorized to drop table $ident")
    }
    val result = super.dropTable(ident)
    if (result) auditLogger.log(AuditEvent.DROP, ident, success = true)
    result
  }

  private def getCurrentUser(): String =
    spark.sparkContext.sparkUser
}

Конфигурация: spark.sql.catalog.production=com.example.catalog.AuditingCatalog + указание на делегата (стандартный SessionCatalog или Iceberg).

Identifier: адресация объектов в каталоге

Identifier — это пара (namespace: Array[String], name: String):

// Создание Identifier
val ident = Identifier.of(Array("production", "europe"), "events")
// catalog: "icerest" (определяется контекстом)
// namespace: ["production", "europe"]
// name: "events"

// В SQL: icerest.production.europe.events

// Идентификатор в стандартном каталоге
val defaultIdent = Identifier.of(Array("default"), "users")
// В SQL: users (или default.users)

Глубина namespace-ов зависит от реализации каталога. Iceberg поддерживает многоуровневые namespace-ы (database.schema.table). Delta Lake обычно использует одноуровневые (database.table). Unity Catalog использует трёхуровневые (catalog.schema.table) — но с точки зрения Spark Plugin API каталог — это первый уровень, поэтому Unity передаёт двухуровневый namespace.

Попробуй сам

Исследуем multi-catalog через Iceberg (если доступен) или через встроенный InMemoryCatalog:

from pyspark.sql import SparkSession

# Настроим два каталога одновременно
spark = SparkSession.builder \
    .appName("multi-catalog-demo") \
    .master("local[*]") \
    .config("spark.jars.packages",
            "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0") \
    .config("spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.local_iceberg",
            "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local_iceberg.type", "hadoop") \
    .config("spark.sql.catalog.local_iceberg.warehouse", "/tmp/iceberg-demo") \
    .getOrCreate()

# --- Создаём namespace и таблицы в Iceberg каталоге ---
spark.sql("CREATE NAMESPACE IF NOT EXISTS local_iceberg.analytics")
spark.sql("CREATE NAMESPACE IF NOT EXISTS local_iceberg.raw")

spark.sql("""
    CREATE TABLE IF NOT EXISTS local_iceberg.raw.events (
        id BIGINT,
        ts TIMESTAMP,
        event_type STRING,
        payload STRING
    ) USING iceberg
    PARTITIONED BY (days(ts))
""")

spark.sql("""
    CREATE TABLE IF NOT EXISTS local_iceberg.analytics.daily_summary (
        date DATE,
        event_type STRING,
        count BIGINT
    ) USING iceberg
""")

# Вставляем данные
spark.sql("""
    INSERT INTO local_iceberg.raw.events VALUES
    (1, TIMESTAMP '2024-01-15 10:00:00', 'purchase', '{"amount": 99.9}'),
    (2, TIMESTAMP '2024-01-15 11:00:00', 'view', '{"product_id": 42}'),
    (3, TIMESTAMP '2024-01-16 09:00:00', 'purchase', '{"amount": 149.9}')
""")

# Cross-catalog запрос: читаем из Iceberg, пишем в SessionCatalog
spark.sql("""
    INSERT INTO local_iceberg.analytics.daily_summary
    SELECT DATE(ts), event_type, COUNT(*) as count
    FROM local_iceberg.raw.events
    GROUP BY DATE(ts), event_type
""")

# --- Исследуем Iceberg time travel через catalog API ---
spark.sql("INSERT INTO local_iceberg.raw.events VALUES (4, TIMESTAMP '2024-01-17 12:00:00', 'refund', '{}')")

# Текущее состояние
print("=== Текущее состояние ===")
spark.table("local_iceberg.raw.events").show()

# Исторические snapshot-ы
print("=== Snapshot history ===")
spark.sql("SELECT * FROM local_iceberg.raw.events.history").show()

# Time travel -- snapshot до последней вставки
snapshots = spark.sql("SELECT snapshot_id FROM local_iceberg.raw.events.snapshots ORDER BY committed_at").collect()
if len(snapshots) >= 2:
    first_snapshot = snapshots[0]["snapshot_id"]
    print(f"=== Time travel к snapshot {first_snapshot} ===")
    spark.read \
        .option("snapshot-id", first_snapshot) \
        .table("local_iceberg.raw.events") \
        .show()

# --- Список namespace-ов через catalog API ---
print("=== Namespaces в local_iceberg ===")
spark.sql("SHOW NAMESPACES IN local_iceberg").show()

print("=== Таблицы в local_iceberg.raw ===")
spark.sql("SHOW TABLES IN local_iceberg.raw").show()

# --- Переключаем текущий каталог ---
spark.sql("USE CATALOG local_iceberg")
spark.sql("USE raw")
# Теперь SELECT * FROM events работает без qualified name
spark.sql("SELECT * FROM events").show()

# --- Смешанные запросы из двух каталогов ---
# Данные из Iceberg + встроенный SessionCatalog
spark.sql("USE CATALOG spark_catalog")
spark.sql("CREATE TABLE IF NOT EXISTS enrichment (event_type STRING, label STRING)")
spark.sql("INSERT INTO enrichment VALUES ('purchase', 'Revenue'), ('view', 'Engagement'), ('refund', 'Return')")

# JOIN данных из разных каталогов
spark.sql("""
    SELECT e.ts, e.event_type, en.label
    FROM local_iceberg.raw.events e
    JOIN spark_catalog.default.enrichment en ON e.event_type = en.event_type
    ORDER BY e.ts
""").show()
TIP

При разработке кастомного каталога добавьте реализацию supportsNamespaceProperty(String key) — это позволит Spark подсказывать UI какие свойства namespace поддерживаются (комментарии, owner, location). Без этого DESCRIBE NAMESPACE EXTENDED не покажет кастомные свойства. В Spark 4.0 этот метод по умолчанию возвращает false для всех ключей.

Транзакционная семантика через StagingTableCatalog

Паттерн для надёжной записи данных в кастомный каталог:

// Spark автоматически использует StagingTableCatalog если каталог его реализует
// При CREATE TABLE AS SELECT или INSERT OVERWRITE

// Вот что происходит под капотом при CREATE TABLE AS SELECT
// в каталоге, реализующем StagingTableCatalog:

// Шаг 1: catalog.stageCreate(ident, columns, partitions, properties)
//   -> Возвращает StagedTable -- таблица ещё не видна в каталоге

// Шаг 2: Spark записывает данные через StagedTable.newWriteBuilder()
//   -> DataWriter.write() на каждом executor
//   -> DataWriter.commit() возвращает WriterCommitMessage

// Шаг 3: Если все executor'ы успешно: stagedTable.commitStagedChanges()
//   -> Таблица становится видимой в каталоге с данными атомарно

// Шаг 4: Если любой executor упал: stagedTable.abortStagedChanges()
//   -> Staged данные удаляются, таблица не появляется в каталоге

// Это именно то, что делает Iceberg:
// commitStagedChanges() = создать первый Iceberg snapshot с manifest-файлами
// Пока snapshot не создан, данные физически есть на S3, но таблица "не существует"
Apache Iceberg: устройство таблиц и транзакции
Проверка знанийKnowledge check
Вы реализовали кастомный каталог для внутреннего data lake. Каталог реализует TableCatalog и SupportsNamespaces. В production вы замечаете: запросы вида SELECT * FROM mycat.prod.events работают корректно, но SELECT * FROM events (без qualified имени) иногда смотрит в SessionCatalog вместо вашего каталога. Команда CREATE TABLE events также создаёт таблицу в SessionCatalog. Как исправить, чтобы все неквалифицированные имена по умолчанию резолвились в ваш каталог?
ОтветAnswer
Проблема: не задан текущий каталог по умолчанию. Spark использует 'spark_catalog' (SessionCatalog) как дефолт. Решения: 1) USE CATALOG mycat в начале сессии или spark.sql("USE CATALOG mycat") -- меняет текущий каталог для этой SparkSession. Сохраняется только в рамках сессии. 2) Конфигурация spark.sql.defaultCatalog=mycat -- задаёт дефолтный каталог при создании SparkSession. Работает для всех сессий. 3) Заменить spark_catalog: spark.sql.catalog.spark_catalog=com.example.MyCatalog -- тогда SessionCatalog заменяется вашим каталогом, и все неквалифицированные имена идут к нему. Это САМЫЙ СИЛЬНЫЙ вариант -- работает для Hive-совместимых таблиц. Используется DeltaCatalog именно так: он заменяет spark_catalog и добавляет Delta-функциональность поверх стандартного Hive Metastore. 4) Реализовать defaultNamespace() в CatalogPlugin -- возвращает namespace по умолчанию (например, ['production']), чтобы SELECT * FROM events резолвилось как mycat.production.events. Оптимальный production-паттерн: задать spark.sql.defaultCatalog=mycat и spark.sql.catalog.mycat.defaultNamespace=production в конфигурации кластера. Тогда разработчики могут писать неквалифицированные имена и они автоматически попадут в правильный каталог.

Проверьте понимание

Результат: 0 из 0
Прикладной
Вопрос 1 из 4. Spark встречает в SQL запросе имя таблицы 'analytics.orders'. Зарегистрированы каталоги: 'analytics' (кастомный), 'spark_catalog' (SessionCatalog). Как Spark резолвит этот идентификатор?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5