Skip to content

Integrate DataFusion as execution engine for compute-heavy operations #3554

Description

@qzyu999

Feature Request / Improvement

Problem

PyIceberg cannot perform several operations at production scale due to unbounded memory requirements in the PyArrow execution path:

  • Tables with equality deletes are unreadable (hard ValueError)
  • CoW deletes OOM on large Parquet files (~1GB)
  • CoW overwrite OOMs (same pattern as delete)
  • Upsert uses O(n) row-by-row comparison
  • Compaction not implemented, requires external sort (infeasible in-memory for large tables)
  • Orphan file deletion OOMs (LEFT ANTI JOIN of millions of paths)

The list goes on (documented below) with operations that don't scale in the typical single-node environment of PyIceberg. These block PyIceberg from achieving feature parity with Java Iceberg for V2/V3.

Proposed Solution

Integrate Apache DataFusion (already exists via pip install 'pyiceberg[pyiceberg-core]') as an optional execution engine behind an automatic engine-resolution layer. When installed, compute-heavy operations use DataFusion's spill-to-disk execution (bounded memory). When not installed, the existing PyArrow path remains unchanged (works for small data, OOMs gracefully on large).

No existing behavior changes. No forced dependency. DuckDB-style UX where only a developer only needs to configure a memory budget if they so choose.

Update: We also choose to pivot the decision on the architectural boundaries after valuable feedback from @kevinjqliu. That is, we avoid dependency on iceberg-rust and allow PyIceberg to own the semantics and only use DataFusion as the execution engine for compute-heavy operations, like how we currently utilize PyArrow.

Design Doc

Support for PyIceberg DataFusion Integration

Relation to Existing DataFusion Integration

PyIceberg already uses DataFusion as part of a standard multi-engine interop pattern (e.g. to_duckdb(), to_ray(), etc.), these are all read-only query operations. This proposal is for an orthogonal role and the two are not to be conflated. This new role is for that of an internal implementation, for example if a user calls table.compact(), they'll never see the actions of DataFusion under the hood, similar to how PyArrow is used today.

Why DataFusion and Why Not a Pluggable Pattern

This is in indeed an opinionated architectural choice. DataFusion has strong positive attributes that don't collectively exist in any alternative:

  • Apache-licensed (ASF ecosystem compatibility)
  • Arrow-native (zero-copy interop with PyArrow — no serialization)
  • Embeddable as a library (not a server/database)
  • Spill-to-disk for sort, join, and aggregate (the core requirement)
  • Python bindings exist and are maintained
    DuckDB has GPL-licensed extensions and copies at the Arrow boundary. Polars has no spill-to-disk. Spark requires a JVM. Velox has no Python bindings. PyArrow lacks the ability to temporarily spill-to-disk.

Why Not Make the Compute Engine Pluggable?

Building a pluggable compute engine abstraction layer right now introduces massive complexity with zero immediate benefit:

  1. No viable alternatives exist: With only one candidate satisfying the requirements, there is no real choice to make.
  2. Enormous abstraction surface: The surface area for memory configuration, SQL dialects, data registration, and object store integration differs drastically between engines. Standardizing this upfront is a massive undertaking.
  3. Hidden implementation detail: The engine is an internal utility hidden behind operations like table.compact(). Users do not interact with it directly.
  4. Existing escape hatches: Users who want to query data using alternative engines can already do so via the existing to_x() export methods.

If an alternative emerges, pyiceberg/execution/compute.py is a clean substitution point. Building pluggable infrastructure for theoretical future alternatives is a case of premature abstraction (YAGNI). If a viable alternative emerges in the future, the codebase contains a clean substitution point.

User Experience

DuckDB-style: sensible default memory budget, optional override via existing config mechanisms. No new initialization step required.

from pyiceberg.catalog import load_catalog

catalog = load_catalog("prod", uri="...")
table = catalog.load_table("db.events")

# Everything just works — default 512MB budget, spills to disk if needed
table.compact()                          # new method
table.delete("status = 'expired'")       # existing method, no longer OOMs on large files
df = table.scan().to_arrow()             # existing method, now resolves equality deletes

Memory budget is configurable through PyIceberg's existing config hierarchy:

# .pyiceberg.yaml
execution:
  memory-limit: 1GB
# Environment variable
export PYICEBERG_EXECUTION__MEMORY_LIMIT=2GB

Most users never configure this — the default handles typical workloads. When datafusion is not installed, PyArrow fallback is used unchanged. The install path:

pip install 'pyiceberg[datafusion]'

Implementation

We can create a new module pyiceberg/execution/ where we make it possible for the chosen query engine to resolve to either DataFusion or PyArrow, allow the memory limit to be configured, handle the mapping of PyIceberg storage properties, and encapsulate heavy data transformations safely away from the core table logic.

pyiceberg/execution/
├── __init__.py          # Re-exports
├── engine.py            # resolve_engine(), checks `import datafusion`
├── session.py           # create_bounded_session(), FairSpillPool + DiskManager
├── object_store.py      # Translate FileIO props -> DataFusion object store config
└── compute.py           # sort_batches(), anti_join(), filter_parquet()

Checklist

Foundation (no blockers)

  • Engine resolution module
  • Bounded-session helpers (configures RuntimeEnvBuilder.with_fair_spill_pool())
  • Object store bridge (translate FileIO properties to DataFusion)

Operations Unblocked

  • Equality delete read resolution
  • Streaming CoW delete/overwrite
  • Table compaction (sort + rewrite)
  • Orphan file deletion
  • Upsert via hash join
  • Equality-to-positional conversion
  • Position delete compaction
  • Full MoR compaction
  • Z-Order / Hilbert sorting
  • DV compaction
  • Incremental compaction
  • Sort-order enforcement on write
  • Dynamic partition overwrite (bounded memory)

Related Issues

PyIceberg

Our contributions (to iceberg-rust) (all closed due to direction pivot)

datafusion-python

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions