Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 53 additions & 48 deletions poetry.lock

Large diffs are not rendered by default.

57 changes: 43 additions & 14 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -853,18 +853,47 @@ def fetch_manifest_entry(self, io: FileIO, discard_deleted: bool = True) -> List
Returns:
An Iterator of manifest entries.
"""
input_file = io.new_input(self.manifest_path)
with AvroFile[ManifestEntry](
input_file,
MANIFEST_ENTRY_SCHEMAS[DEFAULT_READ_VERSION],
read_types={-1: ManifestEntry, 2: DataFile},
read_enums={0: ManifestEntryStatus, 101: FileFormat, 134: DataFileContent},
) as reader:
return [
_inherit_from_manifest(entry, self)
for entry in reader
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]
from pyiceberg_core import manifest

bs = io.new_input(self.manifest_path).open().read()
manifest = manifest.read_manifest_entries(bs)

# TODO: Don't convert the types
# but this is the easiest for now until we
# have the write part in there as well
def _convert_entry(entry: Any) -> ManifestEntry:
data_file = DataFile(
DataFileContent(entry.data_file.content),
entry.data_file.file_path,
FileFormat(entry.data_file.file_format),
[p.value() if p is not None else None for p in entry.data_file.partition],
entry.data_file.record_count,
entry.data_file.file_size_in_bytes,
entry.data_file.column_sizes,
entry.data_file.value_counts,
entry.data_file.null_value_counts,
entry.data_file.nan_value_counts,
entry.data_file.lower_bounds,
entry.data_file.upper_bounds,
entry.data_file.key_metadata,
entry.data_file.split_offsets,
entry.data_file.equality_ids,
entry.data_file.sort_order_id,
)

return ManifestEntry(
ManifestEntryStatus(entry.status),
entry.snapshot_id,
entry.sequence_number,
entry.file_sequence_number,
data_file,
)

return [
_inherit_from_manifest(_convert_entry(entry), self)
for entry in manifest.entries()
if not discard_deleted or entry.status != ManifestEntryStatus.DELETED
]

def __eq__(self, other: Any) -> bool:
"""Return the equality of two instances of the ManifestFile class."""
Expand Down Expand Up @@ -925,12 +954,12 @@ def _inherit_from_manifest(entry: ManifestEntry, manifest: ManifestFile) -> Mani

# in v1 tables, the sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the sequence number should be inherited iff the entry status is ADDED
if entry.sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
if entry.sequence_number is None or (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
entry.sequence_number = manifest.sequence_number

# in v1 tables, the file sequence number is not persisted and can be safely defaulted to 0
# in v2 tables, the file sequence number should be inherited iff the entry status is ADDED
if entry.file_sequence_number is None and (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
if entry.file_sequence_number is None or (manifest.sequence_number == 0 or entry.status == ManifestEntryStatus.ADDED):
# Only available in V2, always 0 in V1
entry.file_sequence_number = manifest.sequence_number

Expand Down
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ sqlalchemy = { version = "^2.0.18", optional = true }
bodo = { version = ">=2025.7.4", optional = true }
daft = { version = ">=0.5.0", optional = true }
cachetools = ">=5.5,<7.0"
pyiceberg-core = { version = ">=0.5.1,<0.7.0", optional = true }
pyiceberg-core = { version = "==0.7.0.dev20250921000225", optional = true }
polars = { version = "^1.21.0", optional = true }
thrift-sasl = { version = ">=0.4.3", optional = true }
kerberos = {version = "^1.3.1", optional = true}
Expand Down Expand Up @@ -122,6 +122,12 @@ mkdocs-material = "9.6.20"
mkdocs-material-extensions = "1.3.1"
mkdocs-section-index = "0.3.10"


[[tool.poetry.source]]
name = "testpypi"
url = "https://test.pypi.org/simple/"
priority = "supplemental"

[[tool.mypy.overrides]]
module = "pytest_mock.*"
ignore_missing_imports = true
Expand Down
2 changes: 1 addition & 1 deletion tests/avro/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_
{
"field-id": 1001,
"default": None,
"name": "tpep_pickup_datetime",
"name": "tpep_pickup_day",
"type": ["null", {"type": "int", "logicalType": "date"}],
},
],
Expand Down
32 changes: 25 additions & 7 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1178,7 +1178,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
"data_file": {
"file_path": "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet",
"file_format": "PARQUET",
"partition": {"VendorID": 1, "tpep_pickup_datetime": 1925},
"partition": {"VendorID": 1, "tpep_pickup_day": 1925},
"record_count": 19513,
"file_size_in_bytes": 388872,
"block_size_in_bytes": 67108864,
Expand Down Expand Up @@ -1298,7 +1298,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
"data_file": {
"file_path": "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=1/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00002.parquet",
"file_format": "PARQUET",
"partition": {"VendorID": 1, "tpep_pickup_datetime": None},
"partition": {"VendorID": 1, "tpep_pickup_day": None},
"record_count": 95050,
"file_size_in_bytes": 1265950,
"block_size_in_bytes": 67108864,
Expand Down Expand Up @@ -1383,7 +1383,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
{"key": 3, "value": b"\x01\x00\x00\x00\x00\x00\x00\x00"},
{"key": 4, "value": b"\x00\x00\x00\x00"},
{"key": 5, "value": b"\x01\x00\x00\x00"},
{"key": 6, "value": b"N"},
{"key": 6, "value": b"\x01\x00\x00\x00"},
{"key": 7, "value": b"\x01\x00\x00\x00"},
{"key": 8, "value": b"\x01\x00\x00\x00"},
{"key": 9, "value": b"\x01\x00\x00\x00"},
Expand All @@ -1403,7 +1403,7 @@ def metadata_location_gz(tmp_path_factory: pytest.TempPathFactory) -> str:
{"key": 3, "value": b"\x06\x00\x00\x00\x00\x00\x00\x00"},
{"key": 4, "value": b"\x06\x00\x00\x00"},
{"key": 5, "value": b"c\x00\x00\x00"},
{"key": 6, "value": b"Y"},
{"key": 6, "value": b"c\x00\x00\x00"},
{"key": 7, "value": b"\t\x01\x00\x00"},
{"key": 8, "value": b"\t\x01\x00\x00"},
{"key": 9, "value": b"\x04\x00\x00\x00"},
Expand Down Expand Up @@ -1677,7 +1677,7 @@ def avro_schema_manifest_entry() -> Dict[str, Any]:
{
"field-id": 1001,
"default": None,
"name": "tpep_pickup_datetime",
"name": "tpep_pickup_day",
"type": ["null", {"type": "int", "logicalType": "date"}],
},
],
Expand Down Expand Up @@ -1863,7 +1863,25 @@ def simple_map() -> MapType:
@pytest.fixture(scope="session")
def test_schema() -> Schema:
return Schema(
NestedField(1, "VendorID", IntegerType(), False), NestedField(2, "tpep_pickup_datetime", TimestampType(), False)
NestedField(1, "VendorID", IntegerType(), False),
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
NestedField(3, "tpep_dropoff_datetime", TimestampType(), False),
NestedField(4, "passenger_count", LongType(), False),
NestedField(5, "trip_distance", DoubleType(), False),
NestedField(6, "RatecodeID", DoubleType(), False),
NestedField(7, "store_and_fwd_flag", StringType(), False),
NestedField(8, "PULocationID", IntegerType(), False),
NestedField(9, "DOLocationID", IntegerType(), False),
NestedField(10, "payment_type", LongType(), False),
NestedField(11, "fare_amount", DoubleType(), False),
NestedField(12, "extra", DoubleType(), False),
NestedField(13, "mta_tax", DoubleType(), False),
NestedField(14, "tip_amount", DoubleType(), False),
NestedField(15, "tolls_amount", DoubleType(), False),
NestedField(16, "improvement_surcharge", DoubleType(), False),
NestedField(17, "total_amount", DoubleType(), False),
NestedField(18, "congestion_surcharge", DoubleType(), False),
NestedField(19, "Airport_fee", DoubleType(), False),
)


Expand Down Expand Up @@ -1969,7 +1987,7 @@ def iceberg_manifest_entry_schema() -> Schema:
),
NestedField(
field_id=1001,
name="tpep_pickup_datetime",
name="tpep_pickup_day",
field_type=DateType(),
required=False,
),
Expand Down
10 changes: 5 additions & 5 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table.snapshots import Operation, Snapshot, Summary
from pyiceberg.typedef import Record, TableVersion
from pyiceberg.typedef import TableVersion
from pyiceberg.types import IntegerType, NestedField


Expand Down Expand Up @@ -85,7 +85,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert repr(data_file.partition) == "Record[1, 1925]"
assert data_file.partition == [1, 1925]
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
Expand Down Expand Up @@ -184,7 +184,7 @@ def test_read_manifest_entry(generated_manifest_entry_file: str) -> None:
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.equality_ids == []
assert data_file.sort_order_id == 0


Expand Down Expand Up @@ -422,7 +422,7 @@ def test_write_manifest(
== "/home/iceberg/warehouse/nyc/taxis_partitioned/data/VendorID=null/00000-633-d8a4223e-dc97-45a1-86e1-adaba6e8abd7-00001.parquet"
)
assert data_file.file_format == FileFormat.PARQUET
assert data_file.partition == Record(1, 1925)
assert data_file.partition == [1, 1925]
assert data_file.record_count == 19513
assert data_file.file_size_in_bytes == 388872
assert data_file.column_sizes == {
Expand Down Expand Up @@ -521,7 +521,7 @@ def test_write_manifest(
}
assert data_file.key_metadata is None
assert data_file.split_offsets == [4]
assert data_file.equality_ids is None
assert data_file.equality_ids == []
assert data_file.sort_order_id == 0


Expand Down
Loading