Частые ошибки при работе с Parquet, ORC, Avro, Delta Lake, Iceberg и Arrow — симптомы, причины и пошаговые решения.
Parquet хранит timestamp с микросекундной точностью (по умолчанию), а pandas использует наносекунды (datetime64[ns]). PyArrow 13+ по умолчанию запрещает потерю точности при downcast.
coerce_timestamps='us' при записи: pq.write_table(table, 'file.parquet', coerce_timestamps='us')df['ts'] = df['ts'].dt.floor('us')pq.write_table(table, 'file.parquet', coerce_timestamps='ns', allow_truncated_timestamps=False)Файл повреждён или не является Parquet. Parquet-файл должен начинаться и заканчиваться magic bytes PAR1 (4 байта). Частые причины: неполная запись (writer упал до flush footer), файл переименован из другого формата, или это _SUCCESS/_metadata файл.
head -c 4 file.parquet | xxd — должно быть PAR1pq.read_table('dir/', filters=...) вместо чтения конкретного файлаparquet-tools meta file.parquet для диагностикиwriter.close() или context manager with pq.ParquetWriter(...)Имя колонки отличается из-за case sensitivity (Parquet хранит имена as-is), автоматического переименования или использования разных схем записи. В Spark schema merging может объединить файлы с разными схемами некорректно.
pq.read_schema('file.parquet') или DESCRIBE SELECT * FROM 'file.parquet' в DuckDBspark.conf.set('spark.sql.caseSensitive', 'false').option('mergeSchema', 'true') или .schema(explicit_schema)pq.read_table('file.parquet', columns=['col1','col2']) с точными именамиDictionary Encoding накапливает все уникальные значения в памяти до записи Data Page. Если кардинальность колонки высокая (URL, UUID), словарь занимает гигабайты. Parquet переключается на PLAIN при переполнении страницы, но к этому моменту словарь уже в памяти.
.option('parquet.enable.dictionary', 'false') или pq.write_table(table, 'out.parquet', use_dictionary=['low_card_col'])spark.conf.set('spark.sql.parquet.columnarWriterBatchSize', '1024') для уменьшения пакетов записиReader schema несовместима с writer schema. Типичные причины: поле добавлено без default-значения (нарушение backward compatibility), nullable поле читается как required, или тип данных изменён несовместимо (string → int).
curl -X POST -d '{"schema": ...}' http://registry:8081/compatibility/subjects/topic-value/versions/latest["null", "string"] с "default": null{"name": "new_field", "type": "string", "default": ""}Avro Object Container File повреждён — sync marker потерян или данные в блоке усечены. Частая причина: writer не вызвал flush()/close(), S3 multipart upload завершился частично, или файл был перезаписан конкурентным процессом.
avro-tools tojson --head 10 file.avrowith DataFileWriter(...) as writer:) для гарантии flushacks=all и retries>0Delta Lake по умолчанию не разрешает изменение схемы при записи. Новые колонки, изменённые типы или удалённые поля блокируются. Автоматическое mergeSchema включается явно.
.option('mergeSchema', 'true').save() или spark.conf.set('spark.databricks.delta.schema.autoMerge.enabled', 'true')write_deltalake(dt, df, mode='append', schema_mode='merge').option('overwriteSchema', 'true').mode('overwrite').save()dt.schema() vs df.schemaDelta Lake использует optimistic concurrency control — каждый коммит проверяет, что файлы не были изменены с момента начала транзакции. Конфликт возникает, когда два writer-а изменяют пересекающиеся partition-ы или файлы одновременно.
for attempt in range(3): try: write(...) except ConcurrentAppendException: passdelta.isolationLevel=WriteSerializable (по умолчанию) vs Serializable для вашего use caseDelta Lake защищает от случайного удаления файлов, которые могут быть нужны для time travel или активных запросов. По умолчанию VACUUM удаляет только файлы старше 168 часов (7 дней). Retention < 7 дней требует явного override.
spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', 'false') ПЕРЕД вызовом VACUUMVACUUM table_name RETAIN 168 HOURSdt.vacuum(retention_hours=168, enforce_retention_duration=False)Конфигурация Spark-каталога Iceberg неполная или использует устаревший формат. Каждый тип каталога (REST, Hive, Glue, JDBC) требует свой набор параметров. REST Catalog требует uri, type, и правильный catalog-impl.
spark.conf.set('spark.sql.catalog.my_catalog', 'org.apache.iceberg.spark.SparkCatalog') + spark.conf.set('spark.sql.catalog.my_catalog.type', 'rest') + spark.conf.set('spark.sql.catalog.my_catalog.uri', 'http://...')type=hive + uri=thrift://... — не нужен catalog-impltype=glue — подхватывает AWS credentials автоматически--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0Iceberg разрешает только безопасные преобразования типов: int→long, float→double, расширение decimal precision. Изменение nullable → required запрещено, т.к. существующие файлы могут содержать NULL. Iceberg не перезаписывает данные при schema evolution.
Iceberg использует optimistic concurrency — коммит проверяет, что snapshot не изменился с момента начала операции. Конфликт возникает при конкурентных записях в пересекающиеся partition spec-ы. Retry-стратегия зависит от типа каталога.
write.commit.retry.num-retries=4 и write.commit.retry.min-wait-ms=100Parquet footer содержит схему и метаданные Row Groups и находится в конце файла. Если файл усечён (неполная запись, обрыв сети при загрузке на S3) или это пустой файл (0 строк, но footer есть), чтение падает. Spark иногда создаёт пустые файлы при записи пустых партиций.
[f for f in files if os.path.getsize(f) > 0]SELECT * FROM parquet_scan('dir/*.parquet', union_by_name=true) — пропускает невалидные файлы.option('pathGlobFilter', '*.parquet').option('recursiveFileLookup', 'true')aws s3api list-multipart-uploadsORC footer повреждён или версия протокола несовместима. ORC magic bytes — ORC в начале файла. Причины: writer не закрыл файл, несовместимый compression codec (LZO требует отдельную библиотеку), или файл создан версией ORC, не поддерживаемой reader-ом.
orc-tools meta file.orc — покажет stripe-ы, типы, compressionhadoop-lzo в classpathorc-tools data file.orc | headorc-tools convert file.orc -o output.orc для пересоздания footerСлишком маленький row_group_size создаёт множество Row Groups в одном файле. Каждый Row Group хранит свою статистику — при тысячах мелких групп metadata footer раздувается и замедляет планирование. Типичная причина: запись по одной строке без буферизации.
pq.write_table(table, 'out.parquet', row_group_size=1_000_000)spark.conf.set('spark.sql.parquet.rowGroupSize', '134217728') — 128 MB по умолчаниюORC привязывает вложенные колонки по позиции в schema (column ID), а не по имени. Если порядок полей в struct изменился между записью и чтением, данные читаются неправильно. В отличие от Iceberg (привязка по Column ID) и Avro (привязка по имени).
SET hive.orc.schema.resolution=name вместо стандартного position-based resolutionpyiceberg подключён к другому каталогу, чем Spark. Частая ситуация: Spark использует Hive Metastore, а pyiceberg — REST Catalog. Каждый каталог имеет свой namespace таблиц. Также возможно несовпадение warehouse path.
catalog.list_namespaces() и catalog.list_tables('db')uri = Spark spark.sql.catalog..uri thrift://host:9083, а не JDBC URLArrow аллоцирует память через собственный MemoryPool. При чтении целого файла в память (read_table) все Row Groups загружаются одновременно. Конкатенация батчей (concat_tables) создаёт копию. String колонки с длинными значениями особенно затратны.
pq.ParquetFile('file.parquet').iter_batches(batch_size=100_000)pq.read_table('f.parquet', columns=['col1','col2'])table.to_pandas(self_destruct=True) для освобождения Arrow memory при конвертацииpa.total_allocated_bytes() для отслеживания текущего потребленияDelta Lake использует версионирование протокола (reader/writer version). Новые фичи (deletion vectors, column mapping, v2 checkpoints) требуют более новый reader. Databricks часто создаёт таблицы с protocol v3/v7, а open-source клиенты поддерживают v1-2/v1-5.
pip install deltalake>=0.17ALTER TABLE t SET TBLPROPERTIES('delta.minReaderVersion'='1', 'delta.minWriterVersion'='2') — только если не используете v3+ фичиdt.protocol() — посмотрите, какие reader/writer features требуютсяНесовпадение proto-схемы: поле было переназначено на другой wire type (например, int32 → string на том же field number), или данные повреждены. Protobuf привязывает поля по номеру — изменение типа при сохранении номера нарушает десериализацию.
reserved: reserved 5, 6; reserved "old_field";protoc --decode_raw < data.binprotoc --decode_raw показывает raw field numbers и typesКаждый файл = overhead: metadata read, footer parsing, file open syscall, S3 GET request. При тысячах файлов overhead доминирует над полезным I/O. Streaming-записи и высокоселективные INSERT создают множество мелких файлов.
OPTIMIZE table_name объединяет мелкие файлы в целевой размер (~1 GB)spark.sql('CALL system.rewrite_data_files(table => "db.t", strategy => "binpack")')df.repartition(n).write.parquet(...) или .coalesce(n) для контроля числа файловПри Partition Evolution Iceberg создаёт новый partition spec, но старые файлы сохраняют прежний. Проблема возникает, если writer использует устаревший metadata — стартовал до ALTER TABLE, но коммитит после. Также: конфликт при concurrent ALTER TABLE + INSERT.
table.spec() или SELECT * FROM my_table.partitionswrite.commit.retry.num-retries=4Hudi использует file group locking для конкурентной записи. Два writer-а не могут обновлять один file group одновременно. В MoR-таблицах compaction и запись могут конфликтовать, если работают с пересекающимися file groups.
hoodie.write.concurrency.mode=optimistic_concurrency_control + hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProviderhoodie.cleaner.policy.failed.writes=LAZY для автоматической очистки failed writesНовая схема нарушает заданный уровень совместимости subject-а (BACKWARD, FORWARD, FULL). Типичные нарушения: удалено поле без default, добавлено required поле, изменён тип несовместимо.
GET /config/{subject} — BACKWARD по умолчаниюPOST /compatibility/subjects/{subject}/versions/latestСтарые версии Spark (< 3.0) и Hive записывали timestamp как INT96 — нестандартный тип Parquet, занимающий 12 байт. Современные инструменты не поддерживают INT96 по умолчанию. Стандартный тип — INT64 timestamp с микросекундной точностью.
pq.read_table('file.parquet', coerce_int96_timestamp_unit='ms') — конвертирует при чтенииspark.conf.set('spark.sql.parquet.outputTimestampType', 'TIMESTAMP_MICROS')SET arrow_lossless_conversion = true может помочь с некоторыми legacy файлами