Skip to content

Commit 6020f24

Browse files
authored
Add transaction tests to catalog integration tests (#2371)
Just adding more tests to the catalog tests.
1 parent 517b930 commit 6020f24

File tree

1 file changed

+147
-1
lines changed

1 file changed

+147
-1
lines changed

tests/integration/test_catalog.py

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,20 @@
2727
from pyiceberg.catalog.rest import RestCatalog
2828
from pyiceberg.catalog.sql import SqlCatalog
2929
from pyiceberg.exceptions import (
30+
CommitFailedException,
3031
NamespaceAlreadyExistsError,
3132
NamespaceNotEmptyError,
3233
NoSuchNamespaceError,
3334
NoSuchTableError,
3435
TableAlreadyExistsError,
3536
)
3637
from pyiceberg.io import WAREHOUSE
37-
from pyiceberg.schema import Schema
38+
from pyiceberg.partitioning import PartitionField, PartitionSpec
39+
from pyiceberg.schema import INITIAL_SCHEMA_ID, Schema
40+
from pyiceberg.table.metadata import INITIAL_SPEC_ID
41+
from pyiceberg.table.sorting import INITIAL_SORT_ORDER_ID, SortField, SortOrder
42+
from pyiceberg.transforms import DayTransform, IdentityTransform
43+
from pyiceberg.types import IntegerType, LongType, NestedField, TimestampType, UUIDType
3844
from tests.conftest import clean_up
3945

4046

@@ -259,6 +265,146 @@ def test_table_exists(test_catalog: Catalog, table_schema_nested: Schema, databa
259265
assert test_catalog.table_exists((database_name, table_name)) is True
260266

261267

268+
@pytest.mark.integration
269+
@pytest.mark.parametrize("test_catalog", CATALOGS)
270+
def test_update_table_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
271+
identifier = (database_name, table_name)
272+
273+
test_catalog.create_namespace(database_name)
274+
table = test_catalog.create_table(identifier, test_schema)
275+
assert test_catalog.table_exists(identifier)
276+
277+
expected_schema = Schema(
278+
NestedField(1, "VendorID", IntegerType(), False),
279+
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
280+
NestedField(3, "new_col", IntegerType(), False),
281+
)
282+
283+
expected_spec = PartitionSpec(PartitionField(3, 1000, IdentityTransform(), "new_col"))
284+
285+
with table.transaction() as transaction:
286+
with transaction.update_schema() as update_schema:
287+
update_schema.add_column("new_col", IntegerType())
288+
289+
with transaction.update_spec() as update_spec:
290+
update_spec.add_field("new_col", IdentityTransform())
291+
292+
table = test_catalog.load_table(identifier)
293+
assert table.schema().as_struct() == expected_schema.as_struct()
294+
assert table.spec().fields == expected_spec.fields
295+
296+
297+
@pytest.mark.integration
298+
@pytest.mark.parametrize("test_catalog", CATALOGS)
299+
def test_update_schema_conflict(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
300+
if isinstance(test_catalog, HiveCatalog):
301+
pytest.skip("HiveCatalog fails in this test, need to investigate")
302+
303+
identifier = (database_name, table_name)
304+
305+
test_catalog.create_namespace(database_name)
306+
table = test_catalog.create_table(identifier, test_schema)
307+
assert test_catalog.table_exists(identifier)
308+
309+
original_update = table.update_schema().add_column("new_col", LongType())
310+
311+
# Update schema concurrently so that the original update fails
312+
concurrent_update = test_catalog.load_table(identifier).update_schema().delete_column("VendorID")
313+
concurrent_update.commit()
314+
315+
expected_schema = Schema(NestedField(2, "tpep_pickup_datetime", TimestampType(), False))
316+
317+
with pytest.raises(CommitFailedException):
318+
original_update.commit()
319+
320+
table = test_catalog.load_table(identifier)
321+
assert table.schema().as_struct() == expected_schema.as_struct()
322+
323+
324+
@pytest.mark.integration
325+
@pytest.mark.parametrize("test_catalog", CATALOGS)
326+
def test_create_table_transaction_simple(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
327+
identifier = (database_name, table_name)
328+
329+
test_catalog.create_namespace(database_name)
330+
table_transaction = test_catalog.create_table_transaction(identifier, test_schema)
331+
assert not test_catalog.table_exists(identifier)
332+
333+
table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
334+
assert not test_catalog.table_exists(identifier)
335+
336+
table_transaction.commit_transaction()
337+
assert test_catalog.table_exists(identifier)
338+
339+
table = test_catalog.load_table(identifier)
340+
assert table.schema().find_type("new_col").is_primitive
341+
342+
343+
@pytest.mark.integration
344+
@pytest.mark.parametrize("test_catalog", CATALOGS)
345+
def test_create_table_transaction_multiple_schemas(
346+
test_catalog: Catalog, test_schema: Schema, test_partition_spec: PartitionSpec, table_name: str, database_name: str
347+
) -> None:
348+
identifier = (database_name, table_name)
349+
350+
test_catalog.create_namespace(database_name)
351+
table_transaction = test_catalog.create_table_transaction(
352+
identifier=identifier,
353+
schema=test_schema,
354+
partition_spec=test_partition_spec,
355+
sort_order=SortOrder(SortField(source_id=1)),
356+
)
357+
assert not test_catalog.table_exists(identifier)
358+
359+
table_transaction.update_schema().add_column("new_col", IntegerType()).commit()
360+
assert not test_catalog.table_exists(identifier)
361+
362+
table_transaction.update_schema().add_column("new_col_1", UUIDType()).commit()
363+
assert not test_catalog.table_exists(identifier)
364+
365+
table_transaction.update_spec().add_field("new_col", IdentityTransform()).commit()
366+
assert not test_catalog.table_exists(identifier)
367+
368+
# TODO: test replace sort order when available
369+
370+
expected_schema = Schema(
371+
NestedField(1, "VendorID", IntegerType(), False),
372+
NestedField(2, "tpep_pickup_datetime", TimestampType(), False),
373+
NestedField(3, "new_col", IntegerType(), False),
374+
NestedField(4, "new_col_1", UUIDType(), False),
375+
)
376+
377+
expected_spec = PartitionSpec(
378+
PartitionField(1, 1000, IdentityTransform(), "VendorID"),
379+
PartitionField(2, 1001, DayTransform(), "tpep_pickup_day"),
380+
PartitionField(3, 1002, IdentityTransform(), "new_col"),
381+
)
382+
383+
table_transaction.commit_transaction()
384+
assert test_catalog.table_exists(identifier)
385+
386+
table = test_catalog.load_table(identifier)
387+
assert table.schema().as_struct() == expected_schema.as_struct()
388+
assert table.schema().schema_id == INITIAL_SCHEMA_ID + 2
389+
assert table.spec().fields == expected_spec.fields
390+
assert table.spec().spec_id == INITIAL_SPEC_ID + 1
391+
assert table.sort_order().order_id == INITIAL_SORT_ORDER_ID
392+
393+
394+
@pytest.mark.integration
395+
@pytest.mark.parametrize("test_catalog", CATALOGS)
396+
def test_concurrent_create_transaction(test_catalog: Catalog, test_schema: Schema, table_name: str, database_name: str) -> None:
397+
identifier = (database_name, table_name)
398+
399+
test_catalog.create_namespace(database_name)
400+
table = test_catalog.create_table_transaction(identifier=identifier, schema=test_schema)
401+
assert not test_catalog.table_exists(identifier)
402+
403+
test_catalog.create_table(identifier, test_schema)
404+
with pytest.raises(CommitFailedException):
405+
table.commit_transaction()
406+
407+
262408
@pytest.mark.integration
263409
@pytest.mark.parametrize("test_catalog", CATALOGS)
264410
def test_create_namespace(test_catalog: Catalog, database_name: str) -> None:

0 commit comments

Comments
 (0)