MigrationCSV to ParquetHive to IcebergParquet to LanceShadow WritesDual ReadsRolloutCost Estimation
Стратегии миграции форматов
Выбор формата — первый шаг. Второй — миграция существующих данных без даунтайма, потери данных и разрушения downstream pipeline’ов. Миграция формата хранения — одна из самых рискованных операций в data engineering: затрагивает весь стек от ingest до BI.
Этот урок — о паттернах миграции, а не о конкретных инструментах. Инструменты меняются; паттерны — устойчивы.
Три паттерна миграции
Три паттерна миграции форматов
Big BangОдномоментная миграция: остановить pipeline → конвертировать все данные → переключить все consumers → запустить pipeline. Просто в теории, катастрофично на практике: downtime часы-дни, нет rollback, все ошибки — в production.
Shadow WriteПараллельная запись в старый и новый формат. Consumers продолжают читать старый формат. Валидация нового формата на отдельном read path. Переключение — когда новый формат валидирован. Минимальный risk.
Gradual RolloutПостепенная миграция по партициям/таблицам. Сначала — новые данные в новом формате. Исторические — конвертируются партиция за партицией. Consumers адаптируются постепенно. Средний risk, длительный процесс.
WARNING
Big Bang — антипаттерн для production-систем объёмом > 1 TB или с > 5 downstream consumers. Используйте только для dev/staging или маленьких таблиц (< 100 GB, < 3 consumers). Для production — Shadow Write или Gradual Rollout.
Миграция 1: CSV / JSON → Parquet
Самая распространённая миграция — переход от текстовых форматов к колоночным. Типичная ситуация: legacy pipeline записывает CSV в S3, downstream пользуется Athena/Presto.
CSV → Parquet: поэтапная миграция
Исходное: CSV файлы → S3 → Athena/Presto
Исходное состояние: CSV/JSON файлы в S3/GCS. Glue Catalog или Hive Metastore поверх. Athena/Presto/Spark — query engines. Downstream: BI dashboards, ML pipelines, ad-hoc SQL.
Фаза 1: Schema LockЗафиксировать schema: CSV не содержит типов — типы определяются при чтении (string → int). Создать schema definition (Glue Catalog, Avro schema, JSON Schema). Прогнать все данные через schema validation — найти и починить anomalies (mixed types, NULL representations, encoding issues).
Фаза 2: Shadow WriteДобавить параллельную запись в Parquet: тот же pipeline, два output'а. CSV → s3://data/csv/... (как раньше). Parquet → s3://data/parquet/... (новый). Consumers продолжают читать CSV. Parquet — для валидации.
Фаза 3: SwitchoverПереключить consumers на Parquet. Catalog: UPDATE TABLE LOCATION → s3://data/parquet/. Мониторинг: row counts, query latency, error rates. Rollback: переключить обратно на CSV (1 ALTER TABLE).
Конвертация исторических данных
# Пакетная конвертация CSV → Parquet (PyArrow)import pyarrow as paimport pyarrow.csv as pcsvimport pyarrow.parquet as pqfrom pathlib import Path# Фиксированная schema — не полагаемся на inferenceschema = pa.schema([ ("user_id", pa.int64()), ("event_type", pa.dictionary(pa.int32(), pa.string())), ("timestamp", pa.timestamp("ms")), ("amount", pa.decimal128(10, 2)),])convert_options = pcsv.ConvertOptions(column_types=schema)read_options = pcsv.ReadOptions(block_size=256 * 1024 * 1024) # 256MB chunksfor csv_path in Path("s3://data/csv/").glob("**/*.csv"): table = pcsv.read_csv(csv_path, convert_options=convert_options, read_options=read_options) # Parquet с оптимальными настройками pq.write_table( table, csv_path.with_suffix(".parquet").as_posix().replace("/csv/", "/parquet/"), compression="zstd", compression_level=3, row_group_size=1_000_000, use_dictionary=["event_type"], # dictionary для low cardinality )
RisksSchema mismatch: CSV с mixed types → Parquet strict schema = ошибки конвертации. NULL handling: CSV 'NULL', '', 'N/A', 'null' → Parquet null. Encoding: UTF-8 vs Latin-1 vs mixed → потеря данных. Тестировать на полном объёме.
Миграция 2: Hive Tables → Iceberg / Delta Lake
Переход от Hive-managed Parquet таблиц к table format. Типичная ситуация: данные уже в Parquet, но нет ACID, нет time travel, ручной partition management.
Hive → Iceberg: in-place migration
Hive Parquet → Iceberg: metadata-only migration
Ключевое преимущество: Iceberg и Delta Lake хранят данные в Parquet. Миграция Hive → Iceberg может быть metadata-only: существующие Parquet файлы НЕ перезаписываются. Iceberg создаёт metadata (manifests, snapshots) поверх существующих файлов.
Spark SQLCALL spark_catalog.system.migrate('db.table') — Spark Iceberg procedure. Создаёт Iceberg metadata поверх существующих Parquet файлов. Файлы не копируются. Время: секунды-минуты (зависит от количества файлов, не от объёма данных). Обратная совместимость: Hive readers перестают видеть таблицу (нужен Iceberg connector).
Delta LakeCONVERT TO DELTA parquet.`s3://path/` — Delta Lake конвертация. Аналогично: создаёт _delta_log/ рядом с существующими Parquet файлами. Файлы не перезаписываются. Delta transaction log: JSON commits.
ВалидацияПосле миграции: сравнить row counts, schema, sample rows. SELECT COUNT(*) FROM old_hive_table = SELECT COUNT(*) FROM new_iceberg_table. Hash-сумма по sample партициям. Query результаты: 10-20 production запросов на обоих таблицах — результаты идентичны.
In-place vs Full Rewrite
In-Place Migrate vs Full Rewrite
In-Place (metadata-only)
In-place migration: только metadata. Данные остаются как есть. Быстро (секунды), безопасно (файлы не трогаем), обратимо. Но: существующие Parquet файлы не оптимизированы (старый row group size, старая компрессия, не sorted).
ПлюсыМгновенная миграция, нет I/O на данные, нет дополнительного storage, обратимая. Подходит для 90% случаев.
МинусыФайлы не оптимизированы: старый compression (Snappy вместо ZSTD), неоптимальный row group size, нет сортировки. Решение: post-migrate OPTIMIZE/rewrite по партициям.
Full Rewrite
Full rewrite: перезапись всех данных в новый формат с оптимальными настройками. Дорого (часы, double storage), но файлы оптимизированы с первого дня.
ПлюсыДанные оптимизированы: ZSTD compression, правильный row group size, sorted by frequently filtered columns. Максимальная read performance с первого дня.
МинусыДлительная операция (часы-дни для TB-scale). Double storage во время миграции. Compute cost (Spark cluster). Сложнее rollback. Оправдано для critical tables с высоким read QPS.
Gradual Rollout для Hive → Iceberg
Gradual Rollout: партиция за партицией
Gradual: партиция за партицией
Стратегия: новые партиции сразу в Iceberg. Исторические — конвертировать по одной, начиная с самых старых (наименее критичных). Каждую конвертированную партицию валидировать. Если ошибка — rollback одной партиции, не всей таблицы.
Неделя 1In-place migrate: metadata-only конвертация всей таблицы. Новые данные пишутся в Iceberg. Consumers переключены на Iceberg reads. Исторические данные — как были (Parquet от Hive).
Недели 2-4Post-migrate optimize: перезапись исторических партиций по одной. Начать с самых старых (Q1 2023). Каждая партиция: rewrite → validate → next. Параллельно: мониторинг read performance.
Неделя 5+Cleanup: удалить старые неоптимизированные файлы (expire_snapshots). Финальная валидация: все партиции оптимизированы. Мониторинг: 1 неделя на стабильность.
Миграция 3: Parquet → Lance (ML workloads)
Миграция Parquet → Lance — специфический case для ML-pipeline’ов, которые страдают от медленного random access в Parquet.
Parquet → Lance: миграция ML данных
Зачем: random access 100x, vector search, versioning
Зачем: Parquet random access ~1ms/row → Lance ~10μs/row (100x). Dataset versioning: Lance нативный, Parquet — нет. Vector search: Lance встроенный, Parquet — нет. Миграция оправдана для training data, embedding stores, feature stores.
КонвертацияPyArrow → Lance: читаем Parquet через PyArrow, записываем в Lance. lance.write_dataset(table, 'output.lance'). Скорость: ~200-500 MB/s (зависит от CPU). 1 TB Parquet → ~1-2 часа на мощной машине.
Vector IndexПосле конвертации: создать vector index на embedding колонке. ds.create_index('embedding', index_type='IVF_PQ', num_partitions=256). Время: 30-60 минут на 100M vectors. Без индекса — brute-force kNN (медленно на > 1M vectors).
ВалидацияRow count: Parquet row count = Lance row count. Schema: все колонки и типы совпадают. Sample: 1000 случайных строк — побитовое сравнение. Random access: benchmark p50/p99 latency vs Parquet.
Dual Read: Parquet + Lance
Для миграции ML-pipeline: dual read — чтение из обоих форматов с валидацией:
Dual Read: параллельное чтение для валидации
DataLoader Request: 1024 rows
Training pipeline: DataLoader запрашивает mini-batch из 1024 случайных строк. В dual read mode: запрашивает из обоих форматов, сравнивает результаты. Shadow mode: Lance — primary, Parquet — shadow (для валидации, не для training).
Lance (primary)Primary path: Lance random access (~10μs/row). Результат: Arrow RecordBatch с 1024 строками. Это данные, которые идут в модель.
Parquet (shadow)Shadow path: Parquet read тех же row IDs. Результат сравнивается с Lance (побитово). Если mismatch — логировать и алертить. Shadow path не влияет на training latency (async).
РезультатЕсли 0 mismatches за 1 неделю: отключить shadow path, удалить Parquet copy. Если mismatches > 0: investigate, fix конвертацию, повторить. Типичные причины: floating point precision, NULL handling, timezone differences.
Оценка стоимости миграции
Перед началом миграции — оценка стоимости: compute, storage, engineering time, risk.
Storage CostВо время миграции: double storage (old + new). Длительность: 1-4 недели (shadow write period). S3 Standard: $0.023/GB/month. 10 TB double storage × 1 month = ~$230 extra. После миграции: Parquet обычно меньше CSV (5-10x) → storage savings.
Engineering TimeСамая дорогая часть: инженеры планируют, выполняют, валидируют, мониторят. CSV→Parquet: ~1-2 недели (1 engineer). Hive→Iceberg: ~2-4 недели (1 engineer, зависит от количества таблиц). Downstream changes: обновление queries, dashboards, ML pipelines.
Risk CostВероятность проблем × impact. Data loss: backup перед миграцией (стоимость: дополнительный snapshot/copy). Downtime: shadow write pattern = 0 downtime. Rollback: in-place migrate обратима (удалить metadata). Full rewrite: нужен backup.
Во времяExecution: shadow write (параллельная запись), validation (row count, checksums, query results), monitoring (error rates, latency, costs), communication (downstream teams в курсе).
ПослеCleanup: удалить старые данные (после retention period), отключить shadow write, optimize новые таблицы (compression, sort order), document lessons learned.
Миграция между Table Formats
Отдельный случай — миграция между table format’ами: Delta Lake → Iceberg или Hudi → Delta Lake.
Миграция между Table Formats
Delta → IcebergТри подхода: 1) UniForm — Delta генерирует Iceberg metadata автоматически (Delta 3.x). Не полноценная миграция — Iceberg read-only. 2) Iceberg migrate procedure — metadata-only migration (файлы не копируются). 3) Full rewrite — CTAS from Delta into Iceberg.
Hudi → IcebergСложнее: Hudi MOR имеет log files (Avro) + base files (Parquet). Iceberg понимает только Parquet. Нужна compaction (merge log → base) перед миграцией. После compaction: Iceberg migrate procedure.
Iceberg → DeltaCTAS: CREATE TABLE delta_table USING delta AS SELECT * FROM iceberg_table. Или Delta Lake shallow clone (если данные в Parquet — не копировать файлы). Менее распространённый путь: обычно мигрируют В Iceberg, не из.
TIP
UniForm (Delta Lake 3.x) — самый элегантный подход к межформатной совместимости: Delta таблица автоматически генерирует Iceberg metadata. Consumers, использующие Iceberg readers (Trino, Snowflake), читают Delta-данные без миграции. Это не миграция — это совместимость без миграции.
Rollback Strategy
Каждая миграция должна иметь план отката. Без rollback plan — миграция не начинается.
Rollback Strategies по типу миграции
In-Place MigrateRollback: удалить metadata (Iceberg manifests/snapshots или Delta _delta_log/). Parquet файлы — не тронуты. Переключить Catalog обратно на Hive. Время: минуты. Риск: минимальный — данные не изменялись.
Full RewriteRollback: переключить Catalog на backup copy (snapshot/old path). Удалить rewritten файлы. Нужен backup old data (не удалять до подтверждения). Время: минуты (catalog switch), но storage cost до cleanup.
Shadow WriteRollback: отключить shadow write (новый формат), consumers уже на старом формате. Удалить shadow данные. Самый безопасный: consumers никогда не переключались. Rollback = do nothing.
Gradual RolloutRollback: переключить только мигрированные партиции обратно. Немигрированные — уже на старом формате. Частичный rollback: одна партиция, не вся таблица. Сложнее, но granular.
Итоги
Миграция: ключевые принципы
Shadow WriteВсегда предпочитать параллельную запись: 0 downtime, обратимая, валидация до переключения. Дороже (double storage + compute), но безопаснее.
In-PlaceДля Hive→Iceberg/Delta: metadata-only migration. Данные не трогаем. Optimize после миграции (gradual rewrite). Самый быстрый путь.
ValidateВалидация на каждом шаге: row counts, checksums, production queries. Не доверять 'миграция прошла без ошибок' — проверять данные. Minimum: 1 неделя мониторинга после переключения.
RollbackПлан отката до начала миграции. Тестировать rollback на dev/staging. Не удалять old данные до подтверждения успеха (retention period >= 2 недели).
В следующем уроке — конкретные case studies: как реальные компании выбирали и мигрировали форматы.
Доступ закрыт
Требуется вход
Для доступа к материалам курса необходимо войти через Telegram
Проверьте понимание
Результат: 0 из 0
Концептуальный
Закончили урок?
Отметьте его как пройденный, чтобы отслеживать свой прогресс