Skip to content

Add support for Bodo DataFrame #2167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,52 @@ print(ray_dataset.take(2))
]
```

### Bodo

PyIceberg interfaces closely with Bodo Dataframes (see [Bodo Iceberg Quick Start](https://docs.bodo.ai/latest/quick_start/quickstart_local_iceberg/)),
which provides a drop-in replacement for Pandas that applies query, compiler and HPC optimizations automatically.
Bodo accelerates and scales Python code from single laptops to large clusters without code rewrites.

<!-- prettier-ignore-start -->

!!! note "Requirements"
This requires [`bodo` to be installed](index.md).

```python
pip install pyiceberg['bodo']
```
<!-- prettier-ignore-end -->

A table can be read easily into a Bodo Dataframe to perform Pandas operations:

```python
df = table.to_bodo() # equivalent to `bodo.pandas.read_iceberg_table(table)`
df = df[df["trip_distance"] >= 10.0]
df = df[["VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"]]
print(df)
```

This creates a lazy query, optimizes it, and runs it on all available cores (print triggers execution):

```python
VendorID tpep_pickup_datetime tpep_dropoff_datetime
0 2 2023-01-01 00:27:12 2023-01-01 00:49:56
1 2 2023-01-01 00:09:29 2023-01-01 00:29:23
2 1 2023-01-01 00:13:30 2023-01-01 00:44:00
3 2 2023-01-01 00:41:41 2023-01-01 01:19:32
4 2 2023-01-01 00:22:39 2023-01-01 01:30:45
... ... ... ...
245478 2 2023-01-31 22:32:57 2023-01-31 23:01:48
245479 2 2023-01-31 22:03:26 2023-01-31 22:46:13
245480 2 2023-01-31 23:25:56 2023-02-01 00:05:42
245481 2 2023-01-31 23:18:00 2023-01-31 23:46:00
245482 2 2023-01-31 23:18:00 2023-01-31 23:41:00

[245483 rows x 3 columns]
```

Bodo is optimized to take advantage of Iceberg features such as hidden partitioning and various statistics for efficient reads.

### Daft

PyIceberg interfaces closely with Daft Dataframes (see also: [Daft integration with Iceberg](https://docs.daft.ai/en/stable/io/iceberg/)) which provides a full lazily optimized query engine interface on top of PyIceberg tables.
Expand Down
1 change: 1 addition & 0 deletions mkdocs/docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ You can mix and match optional dependencies depending on your needs:
| pandas | Installs both PyArrow and Pandas |
| duckdb | Installs both PyArrow and DuckDB |
| ray | Installs PyArrow, Pandas, and Ray |
| bodo | Installs Bodo |
| daft | Installs Daft |
| polars | Installs Polars |
| s3fs | S3FS as a FileIO implementation to interact with the object store |
Expand Down
1,340 changes: 779 additions & 561 deletions poetry.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
from pyiceberg.utils.properties import property_as_bool

if TYPE_CHECKING:
import bodo.pandas as bd
import daft
import pandas as pd
import polars as pl
Expand Down Expand Up @@ -1485,6 +1486,16 @@ def to_daft(self) -> daft.DataFrame:

return daft.read_iceberg(self)

def to_bodo(self) -> bd.DataFrame:
"""Read a bodo DataFrame lazily from this Iceberg table.

Returns:
bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
"""
import bodo.pandas as bd

return bd.read_iceberg_table(self)

def to_polars(self) -> pl.LazyFrame:
"""Lazily read from this Apache Iceberg table.

Expand Down
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ gcsfs = { version = ">=2023.1.0", optional = true }
huggingface-hub = { version = ">=0.24.0", optional = true }
psycopg2-binary = { version = ">=2.9.6", optional = true }
sqlalchemy = { version = "^2.0.18", optional = true }
bodo = { version = ">=2025.7.4", optional = true }
daft = { version = ">=0.5.0", optional = true }
cachetools = ">=5.5,<7.0"
pyiceberg-core = { version = "^0.5.1", optional = true }
Expand Down Expand Up @@ -298,6 +299,7 @@ pyarrow = ["pyarrow", "pyiceberg-core"]
pandas = ["pandas", "pyarrow"]
duckdb = ["duckdb", "pyarrow"]
ray = ["ray", "pyarrow", "pandas"]
bodo = ["bodo"]
daft = ["daft"]
polars = ["polars"]
snappy = ["python-snappy"]
Expand Down Expand Up @@ -485,6 +487,10 @@ ignore_missing_imports = true
module = "daft.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "bodo.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "pyparsing.*"
ignore_missing_imports = true
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,16 @@ def test_daft_nan_rewritten(catalog: Catalog) -> None:
assert math.isnan(df.to_pydict()["col_numeric"][0])


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore")
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
def test_bodo_nan(catalog: Catalog) -> None:
table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten")
df = table_test_null_nan_rewritten.to_bodo()
assert len(df) == 3
assert math.isnan(df.col_numeric.iloc[0])


@pytest.mark.integration
@pytest.mark.filterwarnings("ignore")
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/test_writes/test_partitioned_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,27 +546,31 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro
"total-data-files": "6",
"total-records": "6",
}
assert "removed-files-size" in summaries[5]
assert "total-files-size" in summaries[5]
assert summaries[5] == {
"removed-files-size": "16174",
"removed-files-size": summaries[5]["removed-files-size"],
"changed-partition-count": "2",
"total-equality-deletes": "0",
"deleted-data-files": "4",
"total-position-deletes": "0",
"total-delete-files": "0",
"deleted-records": "4",
"total-files-size": "8884",
"total-files-size": summaries[5]["total-files-size"],
"total-data-files": "2",
"total-records": "2",
}
assert "added-files-size" in summaries[6]
assert "total-files-size" in summaries[6]
assert summaries[6] == {
"changed-partition-count": "2",
"added-data-files": "2",
"total-equality-deletes": "0",
"added-records": "2",
"total-position-deletes": "0",
"added-files-size": "8087",
"added-files-size": summaries[6]["added-files-size"],
"total-delete-files": "0",
"total-files-size": "16971",
"total-files-size": summaries[6]["total-files-size"],
"total-data-files": "4",
"total-records": "4",
}
Expand Down
15 changes: 10 additions & 5 deletions tests/integration/test_writes/test_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,17 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal
assert file_size > 0

# APPEND
assert "added-files-size" in summaries[0]
assert "total-files-size" in summaries[0]
assert summaries[0] == {
"added-data-files": "3",
"added-files-size": "2618",
"added-files-size": summaries[0]["added-files-size"],
"added-records": "5",
"changed-partition-count": "3",
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2618",
"total-files-size": summaries[0]["total-files-size"],
"total-position-deletes": "0",
"total-records": "5",
}
Expand All @@ -344,18 +346,21 @@ def test_summaries_partial_overwrite(spark: SparkSession, session_catalog: Catal
# }
files = tbl.inspect.data_files()
assert len(files) == 3
assert "added-files-size" in summaries[1]
assert "removed-files-size" in summaries[1]
assert "total-files-size" in summaries[1]
assert summaries[1] == {
"added-data-files": "1",
"added-files-size": "875",
"added-files-size": summaries[1]["added-files-size"],
"added-records": "2",
"changed-partition-count": "1",
"deleted-data-files": "1",
"deleted-records": "3",
"removed-files-size": "882",
"removed-files-size": summaries[1]["removed-files-size"],
"total-data-files": "3",
"total-delete-files": "0",
"total-equality-deletes": "0",
"total-files-size": "2611",
"total-files-size": summaries[1]["total-files-size"],
"total-position-deletes": "0",
"total-records": "4",
}
Expand Down