Skip to content

Commit 5018efc

Browse files
authored
Hive Catalog: Add register table (#1580)
Added the register_table operation for Hive metastore. Had use case where there were existing tables in AWS S3, which needed to be registered to Hive metastore. Registering table with PyIceberg was not possible yet in Hive metastore. ## Testing Tested with the attached unit tests and also against remote hive metastore leveraging PyIceberg.
1 parent 1cdc585 commit 5018efc

File tree

2 files changed

+104
-1
lines changed

2 files changed

+104
-1
lines changed

pyiceberg/catalog/hive.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,22 @@ def register_table(self, identifier: Union[str, Identifier], metadata_location:
404404
Raises:
405405
TableAlreadyExistsError: If the table already exists
406406
"""
407-
raise NotImplementedError
407+
database_name, table_name = self.identifier_to_database_and_table(identifier)
408+
io = self._load_file_io(location=metadata_location)
409+
metadata_file = io.new_input(metadata_location)
410+
staged_table = StagedTable(
411+
identifier=(database_name, table_name),
412+
metadata=FromInputFile.table_metadata(metadata_file),
413+
metadata_location=metadata_location,
414+
io=io,
415+
catalog=self,
416+
)
417+
tbl = self._convert_iceberg_into_hive(staged_table)
418+
with self._client as open_client:
419+
self._create_hive_table(open_client, tbl)
420+
hive_table = open_client.get_table(dbname=database_name, tbl_name=table_name)
421+
422+
return self._convert_hive_into_iceberg(hive_table)
408423

409424
def list_views(self, namespace: Union[str, Identifier]) -> List[Identifier]:
410425
raise NotImplementedError
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
import pytest
18+
19+
from pyiceberg.catalog import Catalog
20+
from pyiceberg.exceptions import NoSuchTableError, TableAlreadyExistsError
21+
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
22+
from pyiceberg.schema import Schema
23+
from pyiceberg.table import Table
24+
from pyiceberg.types import (
25+
BooleanType,
26+
DateType,
27+
IntegerType,
28+
NestedField,
29+
StringType,
30+
)
31+
32+
TABLE_SCHEMA = Schema(
33+
NestedField(field_id=1, name="foo", field_type=BooleanType(), required=False),
34+
NestedField(field_id=2, name="bar", field_type=StringType(), required=False),
35+
NestedField(field_id=4, name="baz", field_type=IntegerType(), required=False),
36+
NestedField(field_id=10, name="qux", field_type=DateType(), required=False),
37+
)
38+
39+
40+
def _create_table(
41+
session_catalog: Catalog,
42+
identifier: str,
43+
format_version: int,
44+
location: str,
45+
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
46+
schema: Schema = TABLE_SCHEMA,
47+
) -> Table:
48+
try:
49+
session_catalog.drop_table(identifier=identifier)
50+
except NoSuchTableError:
51+
pass
52+
53+
return session_catalog.create_table(
54+
identifier=identifier,
55+
schema=schema,
56+
location=location,
57+
properties={"format-version": str(format_version)},
58+
partition_spec=partition_spec,
59+
)
60+
61+
62+
@pytest.mark.integration
63+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
64+
def test_register_table(
65+
catalog: Catalog,
66+
) -> None:
67+
identifier = "default.register_table"
68+
location = "s3a://warehouse/default/register_table"
69+
tbl = _create_table(catalog, identifier, 2, location)
70+
assert catalog.table_exists(identifier=identifier)
71+
catalog.drop_table(identifier=identifier)
72+
assert not catalog.table_exists(identifier=identifier)
73+
catalog.register_table(("default", "register_table"), metadata_location=tbl.metadata_location)
74+
assert catalog.table_exists(identifier=identifier)
75+
76+
77+
@pytest.mark.integration
78+
@pytest.mark.parametrize("catalog", [pytest.lazy_fixture("session_catalog_hive"), pytest.lazy_fixture("session_catalog")])
79+
def test_register_table_existing(
80+
catalog: Catalog,
81+
) -> None:
82+
identifier = "default.register_table_existing"
83+
location = "s3a://warehouse/default/register_table_existing"
84+
tbl = _create_table(catalog, identifier, 2, location)
85+
assert catalog.table_exists(identifier=identifier)
86+
# Assert that registering the table again raises TableAlreadyExistsError
87+
with pytest.raises(TableAlreadyExistsError):
88+
catalog.register_table(("default", "register_table_existing"), metadata_location=tbl.metadata_location)

0 commit comments

Comments
 (0)