Частые ошибки и проблемы при работе с Apache DataFusion — симптомы, причины и пошаговые решения.
Схема (Schema), переданная в RecordBatch::try_new(), объявляет один тип для поля, а реальный ArrayRef содержит другой. Часто возникает при ручном создании батчей или в TableProvider, когда schema() возвращает одну схему, а scan() — батчи с другой.
schema() с фактическими типами ArrayRef в батчеarrow::compute::cast() для приведения типов, если данные совместимыTableProvider::schema() и TableProvider::scan() возвращают одинаковую схемуMemoryPool в DataFusion ограничивает общий объём памяти для запроса или сессии. При обработке больших объёмов данных (JOIN, сортировка, агрегация) промежуточные буферы превышают лимит пула.
ctx.runtime_env().memory_pool.set_limit(bytes)FairSpillPool вместо GreedyMemoryPool для более равномерного распределенияbatch_size в RuntimeConfig для снижения пикового потребленияArrow выполняет строгое приведение типов. Если строковое значение не может быть распарсено в целевой тип (например, 'abc' → Int32), cast завершается ошибкой вместо возврата NULL.
TRY_CAST(col AS INT) вместо CAST() — возвращает NULL при ошибке конвертацииSELECT * FROM t WHERE col ~ '^[0-9]+$'CsvReadOptions::new().schema(&schema)Таблица не зарегистрирована в текущем SessionContext. В DataFusion регистрация таблиц существует только в памяти — при каждом новом SessionContext нужно регистрировать заново. Также возможна ошибка в имени каталога/схемы.
ctx.register_parquet("my_table", "path.parquet", ...).await?ctx.sql("SHOW TABLES").await?SELECT * FROM datafusion.public.my_tableНесколько таблиц в JOIN содержат колонку с одинаковым именем. DataFusion требует явного указания таблицы-источника для неоднозначных ссылок, как и стандартный SQL.
SELECT a.id, b.id FROM table_a a JOIN table_b b ON a.id = b.idFROM orders AS o JOIN products AS pleft.join(right, JoinType::Inner, &["id"], &["id"], None)?DataFusion поддерживает RANGE BETWEEN только для ограниченного набора типов (numeric, temporal). Для строковых и других типов рамка RANGE не реализована. Это известное ограничение, а не баг.
RANGE BETWEEN на ROWS BETWEEN — в большинстве случаев результат эквивалентенSELECT * FROM information_schema.df_settingsФильтр не совпадает с данными из-за несовпадения типов (например, строковый '1' != числовой 1), регистра (case-sensitive сравнение), или предикат применяется к пустому батчу после предыдущей операции.
EXPLAIN ANALYZE и найдите шаг с output_rows=0DESCRIBE my_tableWHERE id = CAST('1' AS INT) вместо WHERE id = '1'EXPLAIN (без ANALYZE) для просмотра логического плана до оптимизацииВ Arrow типы Utf8/LargeUtf8/Binary — не примитивные. Они хранятся как offset-буфер + данные, а не как массив значений фиксированного размера. Для строковых массивов нужны специализированные downcast-методы.
as_string::(array) вместо as_primitive_array::(array) as_binary::(array) или as_large_binary(array)arrow::array — каждый тип данных имеет свой методФункция return_type() в ScalarUDF объявляет один тип, а фактический ArrayRef, возвращаемый из invoke(), содержит другой. DataFusion проверяет соответствие после выполнения функции.
return_type() с типом ArrayRef в invoke()return_type_from_exprs() вместо статического return_typearrow::compute::cast() в конце invoke() для приведения к ожидаемому типуDataFusion валидирует, что RecordBatch из ExecutionPlan (возвращённого scan()) соответствует схеме, объявленной в schema(). Если TableProvider создаёт схему динамически или с ошибкой, возникает несовпадение.
schema() и scan() используют один и тот же объект Arcself.schema.clone()scan(projection) корректно фильтрует поляКастомный OptimizerRule неправильно обрабатывает структуру логического плана. Типичные причины: обращение к children по индексу без проверки длины, отсутствие обработки всех вариантов LogicalPlan enum, или мутация плана без пересчёта схемы.
catch_unwind на этапе отладки для получения backtraceplan.map_children() или plan.rewrite() вместо ручного обхода дереваВерсия datafusion в Cargo.toml не совпадает с версиями смежных крейтов (datafusion-common, datafusion-expr, arrow). DataFusion требует строгого совпадения версий всех своих подкрейтов.
cargo tree -i datafusion — все версии должны совпадатьarrow = { version = "53" } (версия Arrow привязана к конкретной версии DataFusion)Без статистики (row count, column cardinality) оптимизатор не может выбрать порядок JOIN и тип JOIN. DataFusion использует эвристики, которые часто неоптимальны для больших данных.
CsvReadOptions::new().has_header(true) + ANALYZE TABLESELECT /*+ HASH_JOIN(a, b) */ ... (если поддерживается)Hash Join строит хеш-таблицу целиком в памяти для правой стороны JOIN. Если правая таблица огромна или условие JOIN слишком широкое (cross join), происходит взрыв памяти.
big_table JOIN small_tableFairSpillPool с путём для временных файловОдин большой файл (CSV или JSON) читается как одна партиция, так как DataFusion не может разбить текстовый файл на параллельные чанки. Parquet файлы с одним row group имеют ту же проблему.
ctx.register_parquet("t", "data/", ...)target_partitions в SessionConfig: config.with_target_partitions(num_cpus::get()).repartition(Partitioning::RoundRobinBatch(8))Пакет datafusion-python требует совместимой версии Python (3.8+) и может не собираться на некоторых платформах (ARM Linux). Также возможен конфликт между системным Python и virtualenv.
python -m venv .venv && source .venv/bin/activate && pip install datafusionpip show datafusion — убедитесь, что Location соответствует sys.pathpip install datafusion --no-binary :all: для сборки из исходниковpython --versiondatafusion-python привязан к конкретной версии PyArrow через Rust-биндинги (pyo3-arrow). Если установленная версия PyArrow не совпадает с ожидаемой, данные передаются с неправильным маппингом типов.
pip show datafusion pyarrow — версии должны быть совместимыpip install 'pyarrow>=14.0,<16.0' (для datafusion 37+)pip install datafusion подтянет совместимый pyarrowDataFusion Python работает с PyArrow, а не с pandas напрямую. Функции регистрации ожидают RecordBatch или RecordBatchReader. Pandas DataFrame нужно конвертировать через PyArrow.
table = pa.Table.from_pandas(df) затем ctx.register_record_batches("t", [table.to_batches()])ctx.from_pandas(df) если такой метод доступен в вашей версииresult.to_pandas() на результате запросаScheduler не запущен, не успел инициализироваться, или слушает на другом адресе/порту. В Docker-окружении возможна проблема с сетью между контейнерами.
curl http://localhost:50050/api/statess -tlnp | grep 50050BallistaContext::remote("scheduler", 50050, ...)В распределённом режиме физический план сериализуется и отправляется на executor-ы. Кастомные UDF должны быть зарегистрированы на каждом executor при старте, иначе десериализация плана не найдёт функцию.
--udf-lib флаг для загрузки shared library с UDFExecutor-ы не могут связаться друг с другом напрямую (peer-to-peer). Частые причины: firewall блокирует порты между нодами, неправильная конфигурация внешнего адреса executor (advertise address), или сеть Docker overlay не настроена.
--external-host при запуске executor--shuffle-reader-timeout 60sКомпиляция DataFusion и Arrow из исходников требует 4-8 GB RAM. Docker Desktop по умолчанию ограничивает контейнеры 2 GB, что недостаточно для параллельной компиляции Rust.
ENV CARGO_BUILD_JOBS=2 в DockerfileFROM datafusion/datafusion-cli:latestDocker volume mount наследует права хост-директории. Если процесс внутри контейнера работает от другого UID, чем владелец директории на хосте, запись невозможна.
docker run -u $(id -u):$(id -g) ...mkdir -p ./output && chmod 777 ./outputRUN mkdir /data/output && chown 1000:1000 /data/outputdocker volume create datafusion_outputDataFusion при чтении директории Parquet файлов ожидает единую схему. Если файлы были созданы с разными версиями схемы (schema evolution), или первый файл задаёт схему, которой нет в остальных, возникает ошибка.
schema_merge при регистрации: ParquetReadOptions::default().schema_merge(true)ctx.register_parquet_with_schema("t", path, schema, opts)CREATE TABLE ... AS SELECT * FROM old_table