Learning Platform
Глоссарий Troubleshooting
Урок 18.04 · 35 мин
Продвинутый
MigrationCSV to ParquetHive to IcebergParquet to LanceShadow WritesDual ReadsRolloutCost Estimation

Стратегии миграции форматов

Выбор формата — первый шаг. Второй — миграция существующих данных без даунтайма, потери данных и разрушения downstream pipeline’ов. Миграция формата хранения — одна из самых рискованных операций в data engineering: затрагивает весь стек от ingest до BI.

Этот урок — о паттернах миграции, а не о конкретных инструментах. Инструменты меняются; паттерны — устойчивы.

Три паттерна миграции

Три паттерна миграции форматов
Big BangОдномоментная миграция: остановить pipeline → конвертировать все данные → переключить все consumers → запустить pipeline. Просто в теории, катастрофично на практике: downtime часы-дни, нет rollback, все ошибки — в production.
Shadow WriteПараллельная запись в старый и новый формат. Consumers продолжают читать старый формат. Валидация нового формата на отдельном read path. Переключение — когда новый формат валидирован. Минимальный risk.
Gradual RolloutПостепенная миграция по партициям/таблицам. Сначала — новые данные в новом формате. Исторические — конвертируются партиция за партицией. Consumers адаптируются постепенно. Средний risk, длительный процесс.
WARNING

Big Bang — антипаттерн для production-систем объёмом > 1 TB или с > 5 downstream consumers. Используйте только для dev/staging или маленьких таблиц (< 100 GB, < 3 consumers). Для production — Shadow Write или Gradual Rollout.

Миграция 1: CSV / JSON → Parquet

Самая распространённая миграция — переход от текстовых форматов к колоночным. Типичная ситуация: legacy pipeline записывает CSV в S3, downstream пользуется Athena/Presto.

CSV → Parquet: поэтапная миграция

Исходное: CSV файлы → S3 → Athena/Presto

Исходное состояние: CSV/JSON файлы в S3/GCS. Glue Catalog или Hive Metastore поверх. Athena/Presto/Spark — query engines. Downstream: BI dashboards, ML pipelines, ad-hoc SQL.
Фаза 1: Schema LockЗафиксировать schema: CSV не содержит типов — типы определяются при чтении (string → int). Создать schema definition (Glue Catalog, Avro schema, JSON Schema). Прогнать все данные через schema validation — найти и починить anomalies (mixed types, NULL representations, encoding issues).
Фаза 2: Shadow WriteДобавить параллельную запись в Parquet: тот же pipeline, два output'а. CSV → s3://data/csv/... (как раньше). Parquet → s3://data/parquet/... (новый). Consumers продолжают читать CSV. Parquet — для валидации.
Фаза 3: SwitchoverПереключить consumers на Parquet. Catalog: UPDATE TABLE LOCATION → s3://data/parquet/. Мониторинг: row counts, query latency, error rates. Rollback: переключить обратно на CSV (1 ALTER TABLE).

Конвертация исторических данных

# Пакетная конвертация CSV → Parquet (PyArrow)
import pyarrow as pa
import pyarrow.csv as pcsv
import pyarrow.parquet as pq
from pathlib import Path

# Фиксированная schema — не полагаемся на inference
schema = pa.schema([
 ("user_id", pa.int64()),
 ("event_type", pa.dictionary(pa.int32(), pa.string())),
 ("timestamp", pa.timestamp("ms")),
 ("amount", pa.decimal128(10, 2)),
])

convert_options = pcsv.ConvertOptions(column_types=schema)
read_options = pcsv.ReadOptions(block_size=256 * 1024 * 1024) # 256MB chunks

for csv_path in Path("s3://data/csv/").glob("**/*.csv"):
 table = pcsv.read_csv(csv_path, convert_options=convert_options, read_options=read_options)

 # Parquet с оптимальными настройками
 pq.write_table(
 table,
 csv_path.with_suffix(".parquet").as_posix().replace("/csv/", "/parquet/"),
 compression="zstd",
 compression_level=3,
 row_group_size=1_000_000,
 use_dictionary=["event_type"], # dictionary для low cardinality
 )

Оценка выигрыша

CSV → Parquet: типичный выигрыш
StorageCSV: 1 TB. Parquet (ZSTD-3): ~100-200 GB. Компрессия 5-10x. Экономия на S3 Standard: ~$18/TB/month → ~$2-4/TB/month. Для 100 TB: $1600/month savings.
Query SpeedFull scan: 5-20x ускорение (column projection + compression = меньше I/O). Filtered query: 10-100x ускорение (predicate pushdown пропускает 90%+ данных). Athena/Presto cost: пропорционален scanned bytes → 5-20x дешевле.
RisksSchema mismatch: CSV с mixed types → Parquet strict schema = ошибки конвертации. NULL handling: CSV 'NULL', '', 'N/A', 'null' → Parquet null. Encoding: UTF-8 vs Latin-1 vs mixed → потеря данных. Тестировать на полном объёме.

Миграция 2: Hive Tables → Iceberg / Delta Lake

Переход от Hive-managed Parquet таблиц к table format. Типичная ситуация: данные уже в Parquet, но нет ACID, нет time travel, ручной partition management.

Hive → Iceberg: in-place migration

Hive Parquet → Iceberg: metadata-only migration

Ключевое преимущество: Iceberg и Delta Lake хранят данные в Parquet. Миграция Hive → Iceberg может быть metadata-only: существующие Parquet файлы НЕ перезаписываются. Iceberg создаёт metadata (manifests, snapshots) поверх существующих файлов.
Spark SQLCALL spark_catalog.system.migrate('db.table') — Spark Iceberg procedure. Создаёт Iceberg metadata поверх существующих Parquet файлов. Файлы не копируются. Время: секунды-минуты (зависит от количества файлов, не от объёма данных). Обратная совместимость: Hive readers перестают видеть таблицу (нужен Iceberg connector).
Delta LakeCONVERT TO DELTA parquet.`s3://path/` — Delta Lake конвертация. Аналогично: создаёт _delta_log/ рядом с существующими Parquet файлами. Файлы не перезаписываются. Delta transaction log: JSON commits.
ВалидацияПосле миграции: сравнить row counts, schema, sample rows. SELECT COUNT(*) FROM old_hive_table = SELECT COUNT(*) FROM new_iceberg_table. Hash-сумма по sample партициям. Query результаты: 10-20 production запросов на обоих таблицах — результаты идентичны.

In-place vs Full Rewrite

In-Place Migrate vs Full Rewrite

In-Place (metadata-only)

In-place migration: только metadata. Данные остаются как есть. Быстро (секунды), безопасно (файлы не трогаем), обратимо. Но: существующие Parquet файлы не оптимизированы (старый row group size, старая компрессия, не sorted).
ПлюсыМгновенная миграция, нет I/O на данные, нет дополнительного storage, обратимая. Подходит для 90% случаев.
МинусыФайлы не оптимизированы: старый compression (Snappy вместо ZSTD), неоптимальный row group size, нет сортировки. Решение: post-migrate OPTIMIZE/rewrite по партициям.

Full Rewrite

Full rewrite: перезапись всех данных в новый формат с оптимальными настройками. Дорого (часы, double storage), но файлы оптимизированы с первого дня.
ПлюсыДанные оптимизированы: ZSTD compression, правильный row group size, sorted by frequently filtered columns. Максимальная read performance с первого дня.
МинусыДлительная операция (часы-дни для TB-scale). Double storage во время миграции. Compute cost (Spark cluster). Сложнее rollback. Оправдано для critical tables с высоким read QPS.

Gradual Rollout для Hive → Iceberg

Gradual Rollout: партиция за партицией

Gradual: партиция за партицией

Стратегия: новые партиции сразу в Iceberg. Исторические — конвертировать по одной, начиная с самых старых (наименее критичных). Каждую конвертированную партицию валидировать. Если ошибка — rollback одной партиции, не всей таблицы.
Неделя 1In-place migrate: metadata-only конвертация всей таблицы. Новые данные пишутся в Iceberg. Consumers переключены на Iceberg reads. Исторические данные — как были (Parquet от Hive).
Недели 2-4Post-migrate optimize: перезапись исторических партиций по одной. Начать с самых старых (Q1 2023). Каждая партиция: rewrite → validate → next. Параллельно: мониторинг read performance.
Неделя 5+Cleanup: удалить старые неоптимизированные файлы (expire_snapshots). Финальная валидация: все партиции оптимизированы. Мониторинг: 1 неделя на стабильность.

Миграция 3: Parquet → Lance (ML workloads)

Миграция Parquet → Lance — специфический case для ML-pipeline’ов, которые страдают от медленного random access в Parquet.

Parquet → Lance: миграция ML данных

Зачем: random access 100x, vector search, versioning

Зачем: Parquet random access ~1ms/row → Lance ~10μs/row (100x). Dataset versioning: Lance нативный, Parquet — нет. Vector search: Lance встроенный, Parquet — нет. Миграция оправдана для training data, embedding stores, feature stores.
КонвертацияPyArrow → Lance: читаем Parquet через PyArrow, записываем в Lance. lance.write_dataset(table, 'output.lance'). Скорость: ~200-500 MB/s (зависит от CPU). 1 TB Parquet → ~1-2 часа на мощной машине.
Vector IndexПосле конвертации: создать vector index на embedding колонке. ds.create_index('embedding', index_type='IVF_PQ', num_partitions=256). Время: 30-60 минут на 100M vectors. Без индекса — brute-force kNN (медленно на > 1M vectors).
ВалидацияRow count: Parquet row count = Lance row count. Schema: все колонки и типы совпадают. Sample: 1000 случайных строк — побитовое сравнение. Random access: benchmark p50/p99 latency vs Parquet.

Dual Read: Parquet + Lance

Для миграции ML-pipeline: dual read — чтение из обоих форматов с валидацией:

Dual Read: параллельное чтение для валидации

DataLoader Request: 1024 rows

Training pipeline: DataLoader запрашивает mini-batch из 1024 случайных строк. В dual read mode: запрашивает из обоих форматов, сравнивает результаты. Shadow mode: Lance — primary, Parquet — shadow (для валидации, не для training).
Lance (primary)Primary path: Lance random access (~10μs/row). Результат: Arrow RecordBatch с 1024 строками. Это данные, которые идут в модель.
Parquet (shadow)Shadow path: Parquet read тех же row IDs. Результат сравнивается с Lance (побитово). Если mismatch — логировать и алертить. Shadow path не влияет на training latency (async).
РезультатЕсли 0 mismatches за 1 неделю: отключить shadow path, удалить Parquet copy. Если mismatches > 0: investigate, fix конвертацию, повторить. Типичные причины: floating point precision, NULL handling, timezone differences.

Оценка стоимости миграции

Перед началом миграции — оценка стоимости: compute, storage, engineering time, risk.

Cost Estimation Framework
Compute CostФормула: data_volume_GB × conversion_rate_GB_per_hour × cluster_cost_per_hour. Примеры: CSV→Parquet: ~500 GB/hr на c5.4xlarge ($0.68/hr). 10 TB → 20 часов → ~$14. Hive→Iceberg in-place: ~0 (metadata only). Hive→Iceberg rewrite: ~200 GB/hr → 50 часов → ~$34.
Storage CostВо время миграции: double storage (old + new). Длительность: 1-4 недели (shadow write period). S3 Standard: $0.023/GB/month. 10 TB double storage × 1 month = ~$230 extra. После миграции: Parquet обычно меньше CSV (5-10x) → storage savings.
Engineering TimeСамая дорогая часть: инженеры планируют, выполняют, валидируют, мониторят. CSV→Parquet: ~1-2 недели (1 engineer). Hive→Iceberg: ~2-4 недели (1 engineer, зависит от количества таблиц). Downstream changes: обновление queries, dashboards, ML pipelines.
Risk CostВероятность проблем × impact. Data loss: backup перед миграцией (стоимость: дополнительный snapshot/copy). Downtime: shadow write pattern = 0 downtime. Rollback: in-place migrate обратима (удалить metadata). Full rewrite: нужен backup.

Чеклист миграции

Чеклист: перед, во время, после миграции
ПередПодготовка: inventory (какие таблицы, объёмы, consumers), schema validation (зафиксировать типы), backup (snapshot/copy критичных таблиц), test migration (dev/staging), runbook (шаги, rollback plan, alerts).
Во времяExecution: shadow write (параллельная запись), validation (row count, checksums, query results), monitoring (error rates, latency, costs), communication (downstream teams в курсе).
ПослеCleanup: удалить старые данные (после retention period), отключить shadow write, optimize новые таблицы (compression, sort order), document lessons learned.

Миграция между Table Formats

Отдельный случай — миграция между table format’ами: Delta Lake → Iceberg или Hudi → Delta Lake.

Миграция между Table Formats
Delta → IcebergТри подхода: 1) UniForm — Delta генерирует Iceberg metadata автоматически (Delta 3.x). Не полноценная миграция — Iceberg read-only. 2) Iceberg migrate procedure — metadata-only migration (файлы не копируются). 3) Full rewrite — CTAS from Delta into Iceberg.
Hudi → IcebergСложнее: Hudi MOR имеет log files (Avro) + base files (Parquet). Iceberg понимает только Parquet. Нужна compaction (merge log → base) перед миграцией. После compaction: Iceberg migrate procedure.
Iceberg → DeltaCTAS: CREATE TABLE delta_table USING delta AS SELECT * FROM iceberg_table. Или Delta Lake shallow clone (если данные в Parquet — не копировать файлы). Менее распространённый путь: обычно мигрируют В Iceberg, не из.
TIP

UniForm (Delta Lake 3.x) — самый элегантный подход к межформатной совместимости: Delta таблица автоматически генерирует Iceberg metadata. Consumers, использующие Iceberg readers (Trino, Snowflake), читают Delta-данные без миграции. Это не миграция — это совместимость без миграции.

Rollback Strategy

Каждая миграция должна иметь план отката. Без rollback plan — миграция не начинается.

Rollback Strategies по типу миграции
In-Place MigrateRollback: удалить metadata (Iceberg manifests/snapshots или Delta _delta_log/). Parquet файлы — не тронуты. Переключить Catalog обратно на Hive. Время: минуты. Риск: минимальный — данные не изменялись.
Full RewriteRollback: переключить Catalog на backup copy (snapshot/old path). Удалить rewritten файлы. Нужен backup old data (не удалять до подтверждения). Время: минуты (catalog switch), но storage cost до cleanup.
Shadow WriteRollback: отключить shadow write (новый формат), consumers уже на старом формате. Удалить shadow данные. Самый безопасный: consumers никогда не переключались. Rollback = do nothing.
Gradual RolloutRollback: переключить только мигрированные партиции обратно. Немигрированные — уже на старом формате. Частичный rollback: одна партиция, не вся таблица. Сложнее, но granular.

Итоги

Миграция: ключевые принципы
Shadow WriteВсегда предпочитать параллельную запись: 0 downtime, обратимая, валидация до переключения. Дороже (double storage + compute), но безопаснее.
In-PlaceДля Hive→Iceberg/Delta: metadata-only migration. Данные не трогаем. Optimize после миграции (gradual rewrite). Самый быстрый путь.
ValidateВалидация на каждом шаге: row counts, checksums, production queries. Не доверять 'миграция прошла без ошибок' — проверять данные. Minimum: 1 неделя мониторинга после переключения.
RollbackПлан отката до начала миграции. Тестировать rollback на dev/staging. Не удалять old данные до подтверждения успеха (retention period >= 2 недели).

В следующем уроке — конкретные case studies: как реальные компании выбирали и мигрировали форматы.

Проверьте понимание

Результат: 0 из 0
Концептуальный
Вопрос 1 из 4. Почему Big Bang миграция (остановить → конвертировать → запустить) — антипаттерн для production-систем объёмом > 1 TB?

Закончили урок?

Отметьте его как пройденный, чтобы отслеживать свой прогресс

Войдите чтобы оценить урок

Прогресс модуля
0 из 5