Skip to content

Commit 5f10bbc

Browse files
authored
Fix add_files with non-identity transforms (#1925)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> # Rationale for this change Found out I broke this myself after doing a `git bisect`: ``` 36d383d is the first bad commit commit 36d383d Author: Fokko Driesprong <[email protected]> Date: Thu Jan 23 07:50:54 2025 +0100 PyArrow: Avoid buffer-overflow by avoid doing a sort (#1555) Second attempt of #1539 This was already being discussed back here: #208 (comment) This PR changes from doing a sort, and then a single pass over the table to the approach where we determine the unique partition tuples filter on them individually. Fixes #1491 Because the sort caused buffers to be joined where it would overflow in Arrow. I think this is an issue on the Arrow side, and it should automatically break up into smaller buffers. The `combine_chunks` method does this correctly. Now: ``` 0.42877754200890195 Run 1 took: 0.2507691659993725 Run 2 took: 0.24833179199777078 Run 3 took: 0.24401691700040828 Run 4 took: 0.2419595829996979 Average runtime of 0.28 seconds ``` Before: ``` Run 0 took: 1.0768639159941813 Run 1 took: 0.8784021250030492 Run 2 took: 0.8486490420036716 Run 3 took: 0.8614017910003895 Run 4 took: 0.8497851670108503 Average runtime of 0.9 seconds ``` So it comes with a nice speedup as well :) --------- Co-authored-by: Kevin Liu <[email protected]> pyiceberg/io/pyarrow.py | 129 ++- pyiceberg/partitioning.py | 39 +- pyiceberg/table/__init__.py | 6 +- pyproject.toml | 1 + tests/benchmark/test_benchmark.py | 72 ++ tests/integration/test_partitioning_key.py | 1299 ++++++++++++++-------------- tests/table/test_locations.py | 2 +- 7 files changed, 805 insertions(+), 743 deletions(-) create mode 100644 tests/benchmark/test_benchmark.py ``` Closes #1917 # Are these changes tested? # Are there any user-facing changes? <!-- In the case of user-facing changes, please add the changelog label. -->
1 parent eb8756a commit 5f10bbc

File tree

2 files changed

+49
-14
lines changed

2 files changed

+49
-14
lines changed

pyiceberg/io/pyarrow.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2241,29 +2241,36 @@ def _partition_value(self, partition_field: PartitionField, schema: Schema) -> A
22412241
if partition_field.source_id not in self.column_aggregates:
22422242
return None
22432243

2244-
if not partition_field.transform.preserves_order:
2244+
source_field = schema.find_field(partition_field.source_id)
2245+
iceberg_transform = partition_field.transform
2246+
2247+
if not iceberg_transform.preserves_order:
22452248
raise ValueError(
22462249
f"Cannot infer partition value from parquet metadata for a non-linear Partition Field: {partition_field.name} with transform {partition_field.transform}"
22472250
)
22482251

2249-
lower_value = partition_record_value(
2250-
partition_field=partition_field,
2251-
value=self.column_aggregates[partition_field.source_id].current_min,
2252-
schema=schema,
2252+
transform_func = iceberg_transform.transform(source_field.field_type)
2253+
2254+
lower_value = transform_func(
2255+
partition_record_value(
2256+
partition_field=partition_field,
2257+
value=self.column_aggregates[partition_field.source_id].current_min,
2258+
schema=schema,
2259+
)
22532260
)
2254-
upper_value = partition_record_value(
2255-
partition_field=partition_field,
2256-
value=self.column_aggregates[partition_field.source_id].current_max,
2257-
schema=schema,
2261+
upper_value = transform_func(
2262+
partition_record_value(
2263+
partition_field=partition_field,
2264+
value=self.column_aggregates[partition_field.source_id].current_max,
2265+
schema=schema,
2266+
)
22582267
)
22592268
if lower_value != upper_value:
22602269
raise ValueError(
22612270
f"Cannot infer partition value from parquet metadata as there are more than one partition values for Partition Field: {partition_field.name}. {lower_value=}, {upper_value=}"
22622271
)
22632272

2264-
source_field = schema.find_field(partition_field.source_id)
2265-
transform = partition_field.transform.transform(source_field.field_type)
2266-
return transform(lower_value)
2273+
return lower_value
22672274

22682275
def partition(self, partition_spec: PartitionSpec, schema: Schema) -> Record:
22692276
return Record(**{field.name: self._partition_value(field, schema) for field in partition_spec.fields})

tests/integration/test_add_files.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,21 @@
3333
from pyiceberg.catalog import Catalog
3434
from pyiceberg.exceptions import NoSuchTableError
3535
from pyiceberg.io import FileIO
36-
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException
36+
from pyiceberg.io.pyarrow import UnsupportedPyArrowTypeException, schema_to_pyarrow
3737
from pyiceberg.manifest import DataFile
3838
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec
3939
from pyiceberg.schema import Schema
4040
from pyiceberg.table import Table
4141
from pyiceberg.table.metadata import TableMetadata
42-
from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform
42+
from pyiceberg.transforms import BucketTransform, HourTransform, IdentityTransform, MonthTransform
4343
from pyiceberg.types import (
4444
BooleanType,
4545
DateType,
4646
IntegerType,
4747
LongType,
4848
NestedField,
4949
StringType,
50+
TimestampType,
5051
TimestamptzType,
5152
)
5253

@@ -898,3 +899,30 @@ def test_add_files_that_referenced_by_current_snapshot_with_check_duplicate_file
898899
with pytest.raises(ValueError) as exc_info:
899900
tbl.add_files(file_paths=[existing_files_in_table], check_duplicate_files=True)
900901
assert f"Cannot add files that are already referenced by table, files: {existing_files_in_table}" in str(exc_info.value)
902+
903+
904+
@pytest.mark.integration
905+
def test_add_files_hour_transform(session_catalog: Catalog) -> None:
906+
identifier = "default.test_add_files_hour_transform"
907+
908+
schema = Schema(NestedField(1, "hourly", TimestampType()))
909+
schema_arrow = schema_to_pyarrow(schema, include_field_ids=False)
910+
spec = PartitionSpec(PartitionField(source_id=1, field_id=1000, transform=HourTransform(), name="spec_hour"))
911+
912+
tbl = _create_table(session_catalog, identifier, format_version=1, schema=schema, partition_spec=spec)
913+
914+
file_path = "s3://warehouse/default/test_add_files_hour_transform/test.parquet"
915+
916+
from pyiceberg.utils.datetime import micros_to_timestamp
917+
918+
arrow_table = pa.Table.from_pylist(
919+
[{"hourly": micros_to_timestamp(1743465600155254)}, {"hourly": micros_to_timestamp(1743469198047855)}],
920+
schema=schema_arrow,
921+
)
922+
923+
fo = tbl.io.new_output(file_path)
924+
with fo.create(overwrite=True) as fos:
925+
with pq.ParquetWriter(fos, schema=schema_arrow) as writer:
926+
writer.write_table(arrow_table)
927+
928+
tbl.add_files(file_paths=[file_path])

0 commit comments

Comments
 (0)