Learning Platform
Глоссарий
Troubleshooting

Решение проблем Apache DataFusion

Частые ошибки и проблемы при работе с Apache DataFusion — симптомы, причины и пошаговые решения.

Область

Категория

Показано 24 из 24 ошибок

Симптомы

  • RecordBatch не создаётся — ошибка несовпадения типов колонок
  • При построении RecordBatch из массивов Arrow возвращается ArrowError
  • Тип данных в ArrayRef не соответствует ожидаемому в Schema

Причина

Схема (Schema), переданная в RecordBatch::try_new(), объявляет один тип для поля, а реальный ArrayRef содержит другой. Часто возникает при ручном создании батчей или в TableProvider, когда schema() возвращает одну схему, а scan() — батчи с другой.

Решение

  1. Сравните Schema из schema() с фактическими типами ArrayRef в батче
  2. Используйте arrow::compute::cast() для приведения типов, если данные совместимы
  3. Убедитесь, что TableProvider::schema() и TableProvider::scan() возвращают одинаковую схему

Связанные уроки:

Симптомы

  • Запрос прерывается с ошибкой о превышении лимита памяти
  • MemoryPool возвращает ResourcesExhausted при аллокации
  • Крупные JOIN или агрегации падают, хотя данные помещаются в RAM

Причина

MemoryPool в DataFusion ограничивает общий объём памяти для запроса или сессии. При обработке больших объёмов данных (JOIN, сортировка, агрегация) промежуточные буферы превышают лимит пула.

Решение

  1. Увеличьте лимит: ctx.runtime_env().memory_pool.set_limit(bytes)
  2. Используйте FairSpillPool вместо GreedyMemoryPool для более равномерного распределения
  3. Уменьшите batch_size в RuntimeConfig для снижения пикового потребления
  4. Добавьте repartition перед JOIN для лучшего распределения данных

Связанные уроки:

Симптомы

  • SQL-запрос с CAST() или неявным приведением типов падает
  • Ошибка при попытке прочитать строковую колонку как числовую
  • Parquet/CSV файл содержит невалидные значения для целевого типа

Причина

Arrow выполняет строгое приведение типов. Если строковое значение не может быть распарсено в целевой тип (например, 'abc' → Int32), cast завершается ошибкой вместо возврата NULL.

Решение

  1. Используйте TRY_CAST(col AS INT) вместо CAST() — возвращает NULL при ошибке конвертации
  2. Предварительно очистите данные: SELECT * FROM t WHERE col ~ '^[0-9]+$'
  3. При чтении CSV задайте правильный schema в CsvReadOptions::new().schema(&schema)

Связанные уроки:

Симптомы

  • SQL-запрос к зарегистрированной таблице возвращает 'table not found'
  • После перезапуска программы таблицы пропадают из каталога
  • ctx.sql("SELECT * FROM my_table") падает при первом вызове

Причина

Таблица не зарегистрирована в текущем SessionContext. В DataFusion регистрация таблиц существует только в памяти — при каждом новом SessionContext нужно регистрировать заново. Также возможна ошибка в имени каталога/схемы.

Решение

  1. Зарегистрируйте таблицу до выполнения запроса: ctx.register_parquet("my_table", "path.parquet", ...).await?
  2. Проверьте доступные таблицы: ctx.sql("SHOW TABLES").await?
  3. Укажите полное имя: SELECT * FROM datafusion.public.my_table
  4. Для персистентного каталога реализуйте свой CatalogProvider

Связанные уроки:

Симптомы

  • JOIN-запрос падает с ошибкой 'Ambiguous reference'
  • Колонка присутствует в нескольких таблицах, и DataFusion не может выбрать
  • Ошибка при SELECT без полного квалификатора таблицы

Причина

Несколько таблиц в JOIN содержат колонку с одинаковым именем. DataFusion требует явного указания таблицы-источника для неоднозначных ссылок, как и стандартный SQL.

Решение

  1. Используйте полную квалификацию: SELECT a.id, b.id FROM table_a a JOIN table_b b ON a.id = b.id
  2. Задайте алиасы таблицам: FROM orders AS o JOIN products AS p
  3. В DataFrame API: left.join(right, JoinType::Inner, &["id"], &["id"], None)?

Симптомы

  • Оконная функция с RANGE BETWEEN возвращает 'not implemented'
  • Работает с ROWS BETWEEN, но падает с RANGE BETWEEN
  • Сложные оконные выражения не поддерживаются

Причина

DataFusion поддерживает RANGE BETWEEN только для ограниченного набора типов (numeric, temporal). Для строковых и других типов рамка RANGE не реализована. Это известное ограничение, а не баг.

Решение

  1. Замените RANGE BETWEEN на ROWS BETWEEN — в большинстве случаев результат эквивалентен
  2. Для temporal-колонок убедитесь, что тип — Timestamp, а не Utf8
  3. Проверьте текущий список поддерживаемых функций: SELECT * FROM information_schema.df_settings

Связанные уроки:

Симптомы

  • EXPLAIN ANALYZE показывает 0 строк на одном из шагов плана
  • Запрос возвращает пустой результат, хотя данные есть
  • FilterExec отсекает все строки — предикат не совпадает с данными

Причина

Фильтр не совпадает с данными из-за несовпадения типов (например, строковый '1' != числовой 1), регистра (case-sensitive сравнение), или предикат применяется к пустому батчу после предыдущей операции.

Решение

  1. Запустите EXPLAIN ANALYZE и найдите шаг с output_rows=0
  2. Проверьте типы колонок: DESCRIBE my_table
  3. Убедитесь, что литерал имеет правильный тип: WHERE id = CAST('1' AS INT) вместо WHERE id = '1'
  4. Используйте EXPLAIN (без ANALYZE) для просмотра логического плана до оптимизации

Связанные уроки:

Симптомы

  • Scalar UDF не компилируется с ошибкой trait bound
  • Попытка использовать `as_primitive_array::<Utf8>()` вызывает ошибку типов
  • Rust не находит реализацию ArrowPrimitiveType для строковых типов

Причина

В Arrow типы Utf8/LargeUtf8/Binary — не примитивные. Они хранятся как offset-буфер + данные, а не как массив значений фиксированного размера. Для строковых массивов нужны специализированные downcast-методы.

Решение

  1. Используйте as_string::(array) вместо as_primitive_array::(array)
  2. Для бинарных данных: as_binary::(array) или as_large_binary(array)
  3. Изучите набор downcast-функций в arrow::array — каждый тип данных имеет свой метод

Связанные уроки:

Симптомы

  • UDF регистрируется успешно, но падает при вызове в запросе
  • Ошибка 'returned a different schema' при выполнении SQL с пользовательской функцией
  • Тип возврата UDF не совпадает с объявленным в return_type()

Причина

Функция return_type() в ScalarUDF объявляет один тип, а фактический ArrayRef, возвращаемый из invoke(), содержит другой. DataFusion проверяет соответствие после выполнения функции.

Решение

  1. Сравните тип в return_type() с типом ArrayRef в invoke()
  2. Если тип зависит от входных данных, используйте return_type_from_exprs() вместо статического return_type
  3. Используйте arrow::compute::cast() в конце invoke() для приведения к ожидаемому типу

Связанные уроки:

Симптомы

  • Кастомный TableProvider падает с паникой при первом запросе
  • SELECT * FROM custom_table вызывает panic в рантайме
  • Схема, возвращаемая scan(), не совпадает со schema()

Причина

DataFusion валидирует, что RecordBatch из ExecutionPlan (возвращённого scan()) соответствует схеме, объявленной в schema(). Если TableProvider создаёт схему динамически или с ошибкой, возникает несовпадение.

Решение

  1. Убедитесь, что schema() и scan() используют один и тот же объект Arc
  2. Сохраните схему в поле структуры при создании провайдера: self.schema.clone()
  3. Если schema зависит от проекции — проверьте, что scan(projection) корректно фильтрует поля

Связанные уроки:

Симптомы

  • Пользовательское правило оптимизации вызывает панику
  • Запросы с определёнными паттернами (JOIN, подзапрос) крашат оптимизатор
  • Panic с трейсом, указывающим на кастомный OptimizerRule

Причина

Кастомный OptimizerRule неправильно обрабатывает структуру логического плана. Типичные причины: обращение к children по индексу без проверки длины, отсутствие обработки всех вариантов LogicalPlan enum, или мутация плана без пересчёта схемы.

Решение

  1. Оборачивайте тело правила в catch_unwind на этапе отладки для получения backtrace
  2. Проверяйте plan.inputs().len() перед обращением по индексу
  3. Используйте plan.map_children() или plan.rewrite() вместо ручного обхода дерева
  4. Покройте правило тестами с EXPLAIN для различных паттернов запросов

Симптомы

  • Проект не компилируется после добавления зависимости datafusion
  • cargo build выдаёт ошибку 'use of undeclared crate or module'
  • Ошибка появляется при импорте datafusion::prelude::*

Причина

Версия datafusion в Cargo.toml не совпадает с версиями смежных крейтов (datafusion-common, datafusion-expr, arrow). DataFusion требует строгого совпадения версий всех своих подкрейтов.

Решение

  1. Используйте одну версию для всех datafusion-* крейтов в Cargo.toml
  2. Проверьте совместимость: cargo tree -i datafusion — все версии должны совпадать
  3. Для DataFusion 40+ используйте arrow = { version = "53" } (версия Arrow привязана к конкретной версии DataFusion)

Связанные уроки:

Симптомы

  • JOIN-запрос между большими таблицами выполняется медленно
  • EXPLAIN показывает неоптимальный порядок JOIN (большая таблица слева)
  • Планировщик не использует hash join, когда это было бы эффективнее

Причина

Без статистики (row count, column cardinality) оптимизатор не может выбрать порядок JOIN и тип JOIN. DataFusion использует эвристики, которые часто неоптимальны для больших данных.

Решение

  1. Зарегистрируйте таблицу с включённой статистикой: CsvReadOptions::new().has_header(true) + ANALYZE TABLE
  2. Для Parquet файлов статистика считывается автоматически из row group metadata
  3. Вручную укажите hint: SELECT /*+ HASH_JOIN(a, b) */ ... (если поддерживается)
  4. Задайте порядок вручную: маленькую таблицу ставьте в правую сторону JOIN

Симптомы

  • Hash Join падает с out-of-memory на больших таблицах
  • Запрос с несколькими JOIN потребляет всю доступную память
  • Декартово произведение из-за отсутствия или неправильного условия JOIN

Причина

Hash Join строит хеш-таблицу целиком в памяти для правой стороны JOIN. Если правая таблица огромна или условие JOIN слишком широкое (cross join), происходит взрыв памяти.

Решение

  1. Проверьте условие JOIN — убедитесь, что есть ON clause и он достаточно селективен
  2. Поместите меньшую таблицу справа: big_table JOIN small_table
  3. Включите spill-to-disk: настройте FairSpillPool с путём для временных файлов
  4. Разбейте запрос: материализуйте промежуточные результаты через CREATE TABLE AS

Симптомы

  • Запрос использует только одно ядро CPU при наличии многих
  • EXPLAIN ANALYZE показывает partitions=1 для большого файла
  • Время выполнения не улучшается при увеличении target_partitions

Причина

Один большой файл (CSV или JSON) читается как одна партиция, так как DataFusion не может разбить текстовый файл на параллельные чанки. Parquet файлы с одним row group имеют ту же проблему.

Решение

  1. Разбейте данные на несколько файлов и зарегистрируйте директорию: ctx.register_parquet("t", "data/", ...)
  2. Конвертируйте CSV в Parquet с несколькими row groups
  3. Настройте target_partitions в SessionConfig: config.with_target_partitions(num_cpus::get())
  4. Добавьте repartition после чтения: .repartition(Partitioning::RoundRobinBatch(8))

Симптомы

  • import datafusion вызывает ModuleNotFoundError
  • pip install datafusion завершился успешно, но модуль не импортируется
  • В virtualenv модуль не найден

Причина

Пакет datafusion-python требует совместимой версии Python (3.8+) и может не собираться на некоторых платформах (ARM Linux). Также возможен конфликт между системным Python и virtualenv.

Решение

  1. Установите в активный virtualenv: python -m venv .venv && source .venv/bin/activate && pip install datafusion
  2. Проверьте установку: pip show datafusion — убедитесь, что Location соответствует sys.path
  3. На macOS ARM: pip install datafusion --no-binary :all: для сборки из исходников
  4. Убедитесь, что используете Python 3.8+: python --version

Связанные уроки:

Симптомы

  • Ошибка при передаче PyArrow RecordBatch в DataFusion Python
  • ArrowInvalid при вызове ctx.register_record_batches()
  • После обновления PyArrow старый код перестал работать

Причина

datafusion-python привязан к конкретной версии PyArrow через Rust-биндинги (pyo3-arrow). Если установленная версия PyArrow не совпадает с ожидаемой, данные передаются с неправильным маппингом типов.

Решение

  1. Проверьте совместимость: pip show datafusion pyarrow — версии должны быть совместимы
  2. Установите рекомендуемую версию PyArrow: pip install 'pyarrow>=14.0,<16.0' (для datafusion 37+)
  3. Пересоздайте virtualenv с чистыми зависимостями: pip install datafusion подтянет совместимый pyarrow

Связанные уроки:

Симптомы

  • Передача pandas DataFrame в ctx.register_record_batches() вызывает TypeError
  • Попытка использовать pandas объекты напрямую в DataFusion API
  • Ошибка при смешивании pandas и Arrow API

Причина

DataFusion Python работает с PyArrow, а не с pandas напрямую. Функции регистрации ожидают RecordBatch или RecordBatchReader. Pandas DataFrame нужно конвертировать через PyArrow.

Решение

  1. Конвертируйте DataFrame в Arrow: table = pa.Table.from_pandas(df) затем ctx.register_record_batches("t", [table.to_batches()])
  2. Или используйте ctx.from_pandas(df) если такой метод доступен в вашей версии
  3. Для обратной конвертации: result.to_pandas() на результате запроса

Связанные уроки:

Симптомы

  • Ballista клиент не может подключиться к scheduler
  • BallistaContext::remote() возвращает Connection refused
  • Scheduler процесс запущен, но не принимает соединения

Причина

Scheduler не запущен, не успел инициализироваться, или слушает на другом адресе/порту. В Docker-окружении возможна проблема с сетью между контейнерами.

Решение

  1. Проверьте статус scheduler: curl http://localhost:50050/api/state
  2. Убедитесь, что scheduler слушает: ss -tlnp | grep 50050
  3. В Docker: используйте имя сервиса вместо localhost: BallistaContext::remote("scheduler", 50050, ...)
  4. Дождитесь инициализации: scheduler может запускаться 5-10 секунд

Связанные уроки:

Симптомы

  • Запрос с кастомным UDF выполняется локально, но падает в Ballista
  • Ошибка сериализации при отправке плана на executor
  • UDF зарегистрирована в клиенте, но не найдена на worker

Причина

В распределённом режиме физический план сериализуется и отправляется на executor-ы. Кастомные UDF должны быть зарегистрированы на каждом executor при старте, иначе десериализация плана не найдёт функцию.

Решение

  1. Зарегистрируйте UDF в FunctionRegistry на каждом executor при старте
  2. Используйте --udf-lib флаг для загрузки shared library с UDF
  3. Альтернатива: замените UDF на стандартные SQL-выражения для распределённых запросов

Связанные уроки:

Симптомы

  • Распределённый запрос зависает на фазе shuffle
  • Некоторые executor-ы не могут обменяться данными
  • Таймауты при чтении промежуточных результатов

Причина

Executor-ы не могут связаться друг с другом напрямую (peer-to-peer). Частые причины: firewall блокирует порты между нодами, неправильная конфигурация внешнего адреса executor (advertise address), или сеть Docker overlay не настроена.

Решение

  1. Убедитесь, что порты executor-ов доступны между нодами (по умолчанию 50051)
  2. Задайте внешний адрес: --external-host при запуске executor
  3. В Docker Compose: используйте единую overlay-сеть для всех сервисов
  4. Увеличьте таймауты: --shuffle-reader-timeout 60s

Связанные уроки:

Симптомы

  • Docker build зависает и крашится при компиляции datafusion
  • Процесс cargo build убивается с signal 9 (SIGKILL) внутри контейнера
  • Docker Desktop показывает 100% использование памяти

Причина

Компиляция DataFusion и Arrow из исходников требует 4-8 GB RAM. Docker Desktop по умолчанию ограничивает контейнеры 2 GB, что недостаточно для параллельной компиляции Rust.

Решение

  1. Увеличьте лимит памяти Docker Desktop до 8 GB (Settings → Resources → Memory)
  2. Ограничьте параллельность: ENV CARGO_BUILD_JOBS=2 в Dockerfile
  3. Используйте multi-stage build: собирайте в builder-образе, копируйте только бинарник в финальный
  4. Используйте готовые бинарные образы: FROM datafusion/datafusion-cli:latest

Связанные уроки:

Симптомы

  • Запись результатов запроса в файл внутри контейнера падает с Permission denied
  • Volume mount не имеет прав на запись
  • Контейнер запускается от root, но volume принадлежит другому пользователю

Причина

Docker volume mount наследует права хост-директории. Если процесс внутри контейнера работает от другого UID, чем владелец директории на хосте, запись невозможна.

Решение

  1. Задайте UID при запуске: docker run -u $(id -u):$(id -g) ...
  2. Создайте директорию заранее с правами: mkdir -p ./output && chmod 777 ./output
  3. В Dockerfile: RUN mkdir /data/output && chown 1000:1000 /data/output
  4. Используйте named volumes вместо bind mounts: docker volume create datafusion_output

Связанные уроки:

Симптомы

  • Чтение Parquet файла падает с ошибкой schema mismatch
  • Файл читался раньше, но после добавления колонки — ошибка
  • Несколько Parquet файлов в директории имеют разные схемы

Причина

DataFusion при чтении директории Parquet файлов ожидает единую схему. Если файлы были созданы с разными версиями схемы (schema evolution), или первый файл задаёт схему, которой нет в остальных, возникает ошибка.

Решение

  1. Используйте schema_merge при регистрации: ParquetReadOptions::default().schema_merge(true)
  2. Укажите явную схему при чтении: ctx.register_parquet_with_schema("t", path, schema, opts)
  3. Перепишите файлы с единой схемой через DataFusion: CREATE TABLE ... AS SELECT * FROM old_table