Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,7 @@ Expert Iceberg users may choose to commit existing parquet files to the Iceberg
### Example

Add files to Iceberg table:

```python
# Given that these parquet files have schema consistent with the Iceberg table

Expand All @@ -1047,6 +1048,7 @@ tbl.add_files(file_paths=file_paths)
```

Add files to Iceberg table with custom snapshot properties:

```python
# Assume an existing Iceberg table object `tbl`

Expand Down
148 changes: 147 additions & 1 deletion tests/integration/test_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.catalog.sql import SqlCatalog
from pyiceberg.exceptions import (
CommitFailedException,
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
NoSuchNamespaceError,
NoSuchTableError,
TableAlreadyExistsError,
)
from pyiceberg.io import WAREHOUSE
from pyiceberg.schema import Schema
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema
from pyiceberg.table.metadata import INITIAL_SPEC_ID
from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder
from pyiceberg.transforms import DayTransform, IdentityTransform
from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType, UUIDType
from tests.conftest import clean_up


Expand Down Expand Up @@ -218,6 +224,146 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa
assert test_catalog.table_exists((database_name, table_name)) is True


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)

test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema)
assert test_catalog.table_exists(identifier)

expected_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False),
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
NestedField(3, "new_col", IntegerType(), False),
)

expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "new_col"))

with table.transaction() as transaction:
with transaction.update_schema() as update_schema:
update_schema.add_column("new_col", IntegerType())

with transaction.update_spec() as update_spec:
update_spec.add_field("new_col", IdentityTransform())

table = test_catalog.load_table(identifier)
assert table.schema().as_struct() == expected_schema.as_struct()
assert table.spec().fields == expected_spec.fields


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
if isinstance(test_catalog, HiveCatalog):
pytest.skip("HiveCatalog fails in this test, need to investigate")

identifier = (database_name, table_name)

test_catalog.create_namespace(database_name)
table = test_catalog.create_table(identifier, test_schema)
assert test_catalog.table_exists(identifier)

original_update = table.update_schema().add_column("new_col", LongType())

# Update schema concurrently so that the original update fails
concurrent_update = test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
concurrent_update.commit()

expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False))

with pytest.raises(CommitFailedException):
original_update.commit()

table = test_catalog.load_table(identifier)
assert table.schema().as_struct() == expected_schema.as_struct()


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)

test_catalog.create_namespace(database_name)
table_transaction = test_catalog.create_table_transaction(identifier, test_schema)
assert not test_catalog.table_exists(identifier)

table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
assert not test_catalog.table_exists(identifier)

table_transaction.commit_transaction()
assert test_catalog.table_exists(identifier)

table = test_catalog.load_table(identifier)
assert table.schema().find_type("new_col").is_primitive


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_table_transaction_multiple_schemas(
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, table_name: str, database_name: str
) -> None:
identifier = (database_name, table_name)

test_catalog.create_namespace(database_name)
table_transaction = test_catalog.create_table_transaction(
identifier=identifier,
schema=test_schema,
partition_spec=test_partition_spec,
sort_order=SortOrder(SortField(source_id=1)),
)
assert not test_catalog.table_exists(identifier)

table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
assert not test_catalog.table_exists(identifier)

table_transaction.update_schema().add_column("new_col_1", UUIDType()).commit()
assert not test_catalog.table_exists(identifier)

table_transaction.update_spec().add_field("new_col", IdentityTransform()).commit()
assert not test_catalog.table_exists(identifier)

# TODO: test replace sort order when available

expected_schema = Schema(
NestedField(1, "VendorID", IntegerType(), False),
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
NestedField(3, "new_col", IntegerType(), False),
NestedField(4, "new_col_1", UUIDType(), False),
)

expected_spec = PartitionSpec(
PartitionField(1, 1000, IdentityTransform(), "VendorID"),
PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
PartitionField(3, 1002, IdentityTransform(), "new_col"),
)

table_transaction.commit_transaction()
assert test_catalog.table_exists(identifier)

table = test_catalog.load_table(identifier)
assert table.schema().as_struct() == expected_schema.as_struct()
assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2
assert table.spec().fields == expected_spec.fields
assert table.spec().spec_id == INITIAL_SPEC_ID + 1
assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
identifier = (database_name, table_name)

test_catalog.create_namespace(database_name)
table = test_catalog.create_table_transaction(identifier=identifier, schema=test_schema)
assert not test_catalog.table_exists(identifier)

test_catalog.create_table(identifier, test_schema)
with pytest.raises(CommitFailedException):
table.commit_transaction()


@pytest.mark.integration
@pytest.mark.parametrize("test_catalog", CATALOGS)
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:
Expand Down