Skip to content

Commit 3bdd458

Browse files
authored
Remove initial_change when CreateTableTransaction apply table updates on an empty metadata (#1219)
* make table metadata without validaiton * update deletes test * remove info * add deprecation message * revert lib version updates * remove initial_changes usage in code * move test to integration * fix typo * update error string
1 parent d559e53 commit 3bdd458

File tree

6 files changed

+114
-21
lines changed

6 files changed

+114
-21
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1011,4 +1011,4 @@ def _empty_table_metadata() -> TableMetadata:
10111011
Returns:
10121012
TableMetadata: An empty TableMetadata instance.
10131013
"""
1014-
return TableMetadataV1(location="", last_column_id=-1, schema=Schema())
1014+
return TableMetadataV1.model_construct(last_column_id=-1, schema=Schema())

pyiceberg/table/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -703,22 +703,22 @@ def _initial_changes(self, table_metadata: TableMetadata) -> None:
703703

704704
schema: Schema = table_metadata.schema()
705705
self._updates += (
706-
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id, initial_change=True),
706+
AddSchemaUpdate(schema_=schema, last_column_id=schema.highest_field_id),
707707
SetCurrentSchemaUpdate(schema_id=-1),
708708
)
709709

710710
spec: PartitionSpec = table_metadata.spec()
711711
if spec.is_unpartitioned():
712-
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC, initial_change=True),)
712+
self._updates += (AddPartitionSpecUpdate(spec=UNPARTITIONED_PARTITION_SPEC),)
713713
else:
714-
self._updates += (AddPartitionSpecUpdate(spec=spec, initial_change=True),)
714+
self._updates += (AddPartitionSpecUpdate(spec=spec),)
715715
self._updates += (SetDefaultSpecUpdate(spec_id=-1),)
716716

717717
sort_order: Optional[SortOrder] = table_metadata.sort_order_by_id(table_metadata.default_sort_order_id)
718718
if sort_order is None or sort_order.is_unsorted:
719-
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER, initial_change=True),)
719+
self._updates += (AddSortOrderUpdate(sort_order=UNSORTED_SORT_ORDER),)
720720
else:
721-
self._updates += (AddSortOrderUpdate(sort_order=sort_order, initial_change=True),)
721+
self._updates += (AddSortOrderUpdate(sort_order=sort_order),)
722722
self._updates += (SetDefaultSortOrderUpdate(sort_order_id=-1),)
723723

724724
self._updates += (

pyiceberg/table/metadata.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,5 +587,21 @@ def parse_obj(data: Dict[str, Any]) -> TableMetadata:
587587
else:
588588
raise ValidationError(f"Unknown format version: {format_version}")
589589

590+
@staticmethod
591+
def _construct_without_validation(table_metadata: TableMetadata) -> TableMetadata:
592+
"""Construct table metadata from an existing table without performing validation.
593+
594+
This method is useful during a sequence of table updates when the model needs to be re-constructed but is not yet ready for validation.
595+
"""
596+
if table_metadata.format_version is None:
597+
raise ValidationError(f"Missing format-version in TableMetadata: {table_metadata}")
598+
599+
if table_metadata.format_version == 1:
600+
return TableMetadataV1.model_construct(**dict(table_metadata))
601+
elif table_metadata.format_version == 2:
602+
return TableMetadataV2.model_construct(**dict(table_metadata))
603+
else:
604+
raise ValidationError(f"Unknown format version: {table_metadata.format_version}")
605+
590606

591607
TableMetadata = Annotated[Union[TableMetadataV1, TableMetadataV2], Field(discriminator="format_version")] # type: ignore

pyiceberg/table/update/__init__.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import uuid
2020
from abc import ABC, abstractmethod
21-
from copy import copy
2221
from datetime import datetime
2322
from functools import singledispatch
2423
from typing import TYPE_CHECKING, Any, Dict, Generic, List, Literal, Optional, Tuple, TypeVar, Union
@@ -45,6 +44,7 @@
4544
transform_dict_value_to_str,
4645
)
4746
from pyiceberg.utils.datetime import datetime_to_millis
47+
from pyiceberg.utils.deprecated import deprecation_notice
4848
from pyiceberg.utils.properties import property_as_int
4949

5050
if TYPE_CHECKING:
@@ -90,7 +90,13 @@ class AddSchemaUpdate(IcebergBaseModel):
9090
# This field is required: https://github.com/apache/iceberg/pull/7445
9191
last_column_id: int = Field(alias="last-column-id")
9292

93-
initial_change: bool = Field(default=False, exclude=True)
93+
initial_change: bool = Field(
94+
default=False,
95+
exclude=True,
96+
deprecated=deprecation_notice(
97+
deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field"
98+
),
99+
)
94100

95101

96102
class SetCurrentSchemaUpdate(IcebergBaseModel):
@@ -104,7 +110,13 @@ class AddPartitionSpecUpdate(IcebergBaseModel):
104110
action: Literal["add-spec"] = Field(default="add-spec")
105111
spec: PartitionSpec
106112

107-
initial_change: bool = Field(default=False, exclude=True)
113+
initial_change: bool = Field(
114+
default=False,
115+
exclude=True,
116+
deprecated=deprecation_notice(
117+
deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field"
118+
),
119+
)
108120

109121

110122
class SetDefaultSpecUpdate(IcebergBaseModel):
@@ -118,7 +130,13 @@ class AddSortOrderUpdate(IcebergBaseModel):
118130
action: Literal["add-sort-order"] = Field(default="add-sort-order")
119131
sort_order: SortOrder = Field(alias="sort-order")
120132

121-
initial_change: bool = Field(default=False, exclude=True)
133+
initial_change: bool = Field(
134+
default=False,
135+
exclude=True,
136+
deprecated=deprecation_notice(
137+
deprecated_in="0.8.0", removed_in="0.9.0", help_message="CreateTableTransaction can work without this field"
138+
),
139+
)
122140

123141

124142
class SetDefaultSortOrderUpdate(IcebergBaseModel):
@@ -267,11 +285,10 @@ def _(
267285
elif update.format_version == base_metadata.format_version:
268286
return base_metadata
269287

270-
updated_metadata_data = copy(base_metadata.model_dump())
271-
updated_metadata_data["format-version"] = update.format_version
288+
updated_metadata = base_metadata.model_copy(update={"format_version": update.format_version})
272289

273290
context.add_update(update)
274-
return TableMetadataUtil.parse_obj(updated_metadata_data)
291+
return TableMetadataUtil._construct_without_validation(updated_metadata)
275292

276293

277294
@_apply_table_update.register(SetPropertiesUpdate)
@@ -306,7 +323,7 @@ def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: _TableMeta
306323

307324
metadata_updates: Dict[str, Any] = {
308325
"last_column_id": update.last_column_id,
309-
"schemas": [update.schema_] if update.initial_change else base_metadata.schemas + [update.schema_],
326+
"schemas": base_metadata.schemas + [update.schema_],
310327
}
311328

312329
context.add_update(update)
@@ -336,11 +353,11 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta
336353
@_apply_table_update.register(AddPartitionSpecUpdate)
337354
def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata:
338355
for spec in base_metadata.partition_specs:
339-
if spec.spec_id == update.spec.spec_id and not update.initial_change:
356+
if spec.spec_id == update.spec.spec_id:
340357
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")
341358

342359
metadata_updates: Dict[str, Any] = {
343-
"partition_specs": [update.spec] if update.initial_change else base_metadata.partition_specs + [update.spec],
360+
"partition_specs": base_metadata.partition_specs + [update.spec],
344361
"last_partition_id": max(
345362
max([field.field_id for field in update.spec.fields], default=0),
346363
base_metadata.last_partition_id or PARTITION_FIELD_ID_START - 1,
@@ -448,7 +465,7 @@ def _(update: AddSortOrderUpdate, base_metadata: TableMetadata, context: _TableM
448465
context.add_update(update)
449466
return base_metadata.model_copy(
450467
update={
451-
"sort_orders": [update.sort_order] if update.initial_change else base_metadata.sort_orders + [update.sort_order],
468+
"sort_orders": base_metadata.sort_orders + [update.sort_order],
452469
}
453470
)
454471

pyiceberg/utils/deprecated.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,17 @@ def new_func(*args: Any, **kwargs: Any) -> Any:
4141
return decorator
4242

4343

44+
def deprecation_notice(deprecated_in: str, removed_in: str, help_message: Optional[str]) -> str:
45+
"""Return a deprecation notice."""
46+
return f"Deprecated in {deprecated_in}, will be removed in {removed_in}. {help_message}"
47+
48+
4449
def deprecation_message(deprecated_in: str, removed_in: str, help_message: Optional[str]) -> None:
4550
"""Mark properties or behaviors as deprecated.
4651
4752
Adding this will result in a warning being emitted.
4853
"""
49-
message = f"Deprecated in {deprecated_in}, will be removed in {removed_in}. {help_message}"
50-
51-
_deprecation_warning(message)
54+
_deprecation_warning(deprecation_notice(deprecated_in, removed_in, help_message))
5255

5356

5457
def _deprecation_warning(message: str) -> None:

tests/integration/test_writes/test_writes.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
from pyiceberg.partitioning import PartitionField, PartitionSpec
4646
from pyiceberg.schema import Schema
4747
from pyiceberg.table import TableProperties
48+
from pyiceberg.table.sorting import SortDirection, SortField, SortOrder
4849
from pyiceberg.transforms import DayTransform, HourTransform, IdentityTransform
4950
from pyiceberg.types import (
5051
DateType,
@@ -738,7 +739,7 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None
738739
def test_create_table_transaction(catalog: Catalog, format_version: int) -> None:
739740
if format_version == 1 and isinstance(catalog, RestCatalog):
740741
pytest.skip(
741-
"There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table"
742+
"There is a bug in the REST catalog image (https://github.com/apache/iceberg/issues/8756) that prevents create and commit a staged version 1 table"
742743
)
743744

744745
identifier = f"default.arrow_create_table_transaction_{catalog.name}_{format_version}"
@@ -787,6 +788,62 @@ def test_create_table_transaction(catalog: Catalog, format_version: int) -> None
787788
assert len(tbl.scan().to_arrow()) == 6
788789

789790

791+
@pytest.mark.integration
792+
@pytest.mark.parametrize("format_version", [1, 2])
793+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
794+
def test_create_table_with_non_default_values(catalog: Catalog, table_schema_with_all_types: Schema, format_version: int) -> None:
795+
if format_version == 1 and isinstance(catalog, RestCatalog):
796+
pytest.skip(
797+
"There is a bug in the REST catalog image (https://github.com/apache/iceberg/issues/8756) that prevents create and commit a staged version 1 table"
798+
)
799+
800+
identifier = f"default.arrow_create_table_transaction_with_non_default_values_{catalog.name}_{format_version}"
801+
identifier_ref = f"default.arrow_create_table_transaction_with_non_default_values_ref_{catalog.name}_{format_version}"
802+
803+
try:
804+
catalog.drop_table(identifier=identifier)
805+
except NoSuchTableError:
806+
pass
807+
808+
try:
809+
catalog.drop_table(identifier=identifier_ref)
810+
except NoSuchTableError:
811+
pass
812+
813+
iceberg_spec = PartitionSpec(*[
814+
PartitionField(source_id=2, field_id=1001, transform=IdentityTransform(), name="integer_partition")
815+
])
816+
817+
sort_order = SortOrder(*[SortField(source_id=2, transform=IdentityTransform(), direction=SortDirection.ASC)])
818+
819+
txn = catalog.create_table_transaction(
820+
identifier=identifier,
821+
schema=table_schema_with_all_types,
822+
partition_spec=iceberg_spec,
823+
sort_order=sort_order,
824+
properties={"format-version": format_version},
825+
)
826+
txn.commit_transaction()
827+
828+
tbl = catalog.load_table(identifier)
829+
830+
tbl_ref = catalog.create_table(
831+
identifier=identifier_ref,
832+
schema=table_schema_with_all_types,
833+
partition_spec=iceberg_spec,
834+
sort_order=sort_order,
835+
properties={"format-version": format_version},
836+
)
837+
838+
assert tbl.format_version == tbl_ref.format_version
839+
assert tbl.schema() == tbl_ref.schema()
840+
assert tbl.schemas() == tbl_ref.schemas()
841+
assert tbl.spec() == tbl_ref.spec()
842+
assert tbl.specs() == tbl_ref.specs()
843+
assert tbl.sort_order() == tbl_ref.sort_order()
844+
assert tbl.sort_orders() == tbl_ref.sort_orders()
845+
846+
790847
@pytest.mark.integration
791848
@pytest.mark.parametrize("format_version", [1, 2])
792849
def test_table_properties_int_value(

0 commit comments

Comments
 (0)