Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion paimon-python/pypaimon/api/rest_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class RESTApi:
PAGE_TOKEN = "pageToken"
DATABASE_NAME_PATTERN = "databaseNamePattern"
TABLE_NAME_PATTERN = "tableNamePattern"
TABLE_TYPE = "tableType"
TOKEN_EXPIRATION_SAFE_TIME_MILLIS = 3_600_000

def __init__(self, options: Union[Options, Dict[str, str]], config_required: bool = True):
Expand Down Expand Up @@ -228,14 +229,20 @@ def list_tables_paged(
max_results: Optional[int] = None,
page_token: Optional[str] = None,
table_name_pattern: Optional[str] = None,
table_type: Optional[str] = None,
) -> PagedList[str]:
if not database_name or not database_name.strip():
raise ValueError("Database name cannot be empty")

response = self.client.get_with_params(
self.resource_paths.tables(database_name),
self.__build_paged_query_params(
max_results, page_token, {self.TABLE_NAME_PATTERN: table_name_pattern}
max_results,
page_token,
{
self.TABLE_NAME_PATTERN: table_name_pattern,
self.TABLE_TYPE: table_type,
},
),
ListTablesResponse,
self.rest_auth_function,
Expand Down
6 changes: 4 additions & 2 deletions paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,16 @@ def list_tables_paged(
database_name: str,
max_results: Optional[int] = None,
page_token: Optional[str] = None,
table_name_pattern: Optional[str] = None
table_name_pattern: Optional[str] = None,
table_type: Optional[str] = None
) -> PagedList[str]:
try:
return self.rest_api.list_tables_paged(
database_name,
max_results,
page_token,
table_name_pattern
table_name_pattern,
table_type
)
except NoSuchResourceException as e:
raise DatabaseNotExistException(database_name) from e
Expand Down
130 changes: 130 additions & 0 deletions paimon-python/pypaimon/tests/rest/api_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,133 @@ def test_rest_api_parameter_validation(self):
with self.assertRaises(ValueError) as context:
rest_api.commit_snapshot(Mock(), "uuid", Mock(), None)
self.assertIn("Statistics cannot be None", str(context.exception))

def test_list_tables_paged_with_table_type_param(self):
config = ConfigResponse(defaults={"prefix": "mock-test"})
token = str(uuid.uuid4())
server = RESTCatalogServer(
data_path="/tmp/test_warehouse",
auth_provider=BearTokenAuthProvider(token),
config=config,
warehouse="test_warehouse"
)
try:
server.start()

server.database_store.update({
"default": server.mock_database("default", {"env": "test"})
})

data_fields = [
DataField(0, "id", AtomicType("INT"), "id"),
]
table_schema = TableSchema(
TableSchema.CURRENT_VERSION,
len(data_fields),
data_fields,
len(data_fields),
[],
[],
{"type": "table"},
"",
)
format_table_schema = TableSchema(
TableSchema.CURRENT_VERSION,
len(data_fields),
data_fields,
len(data_fields),
[],
[],
{"type": "format-table"},
"",
)
iceberg_table_schema = TableSchema(
TableSchema.CURRENT_VERSION,
len(data_fields),
data_fields,
len(data_fields),
[],
[],
{"type": "iceberg-table"},
"",
)
server.table_metadata_store.update({
"default.normal_table_1": TableMetadata(
uuid=str(uuid.uuid4()),
is_external=True,
schema=table_schema
),
"default.format_table_1": TableMetadata(
uuid=str(uuid.uuid4()),
is_external=True,
schema=format_table_schema
),
"default.iceberg_table_1": TableMetadata(
uuid=str(uuid.uuid4()),
is_external=True,
schema=iceberg_table_schema
),
"default.normal_table_2": TableMetadata(
uuid=str(uuid.uuid4()),
is_external=True,
schema=table_schema
),
"default.format_table_2": TableMetadata(
uuid=str(uuid.uuid4()),
is_external=True,
schema=format_table_schema
),
"default.iceberg_table_2": TableMetadata(
uuid=str(uuid.uuid4()),
is_external=True,
schema=iceberg_table_schema
),
})

options = {
'uri': f"http://localhost:{server.port}",
'warehouse': 'test_warehouse',
'dlf.region': 'cn-hangzhou',
"token.provider": "bear",
'token': token
}
rest_api = RESTApi(options)

all_result = rest_api.list_tables_paged("default")
table_result = rest_api.list_tables_paged("default", table_type="table")
format_table_result = rest_api.list_tables_paged("default", table_type="format-table")
iceberg_table_result = rest_api.list_tables_paged("default", table_type="iceberg-table")

self.assertEqual(
[
"format_table_1",
"format_table_2",
"iceberg_table_1",
"iceberg_table_2",
"normal_table_1",
"normal_table_2",
],
all_result.elements,
)
self.assertEqual(["normal_table_1", "normal_table_2"], table_result.elements)
self.assertEqual(["format_table_1", "format_table_2"], format_table_result.elements)
self.assertEqual(["iceberg_table_1", "iceberg_table_2"], iceberg_table_result.elements)

filtered_with_pattern = rest_api.list_tables_paged(
"default", table_type="table", table_name_pattern="%_2"
)
self.assertEqual(["normal_table_2"], filtered_with_pattern.elements)

first_page = rest_api.list_tables_paged(
"default", max_results=1, table_type="table"
)
self.assertEqual(["normal_table_1"], first_page.elements)
self.assertIsNotNone(first_page.next_page_token)

second_page = rest_api.list_tables_paged(
"default", max_results=1, page_token=first_page.next_page_token, table_type="table"
)
self.assertEqual(["normal_table_2"], second_page.elements)
self.assertEqual("normal_table_2", second_page.next_page_token)
finally:
server.shutdown()
12 changes: 12 additions & 0 deletions paimon-python/pypaimon/tests/rest/rest_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class ErrorResponse(RESTResponse):
# REST API parameter constants
DATABASE_NAME_PATTERN = "databaseNamePattern"
TABLE_NAME_PATTERN = "tableNamePattern"
TABLE_TYPE = "tableType"
VIEW_NAME_PATTERN = "viewNamePattern"
FUNCTION_NAME_PATTERN = "functionNamePattern"
PARTITION_NAME_PATTERN = "partitionNamePattern"
Expand Down Expand Up @@ -871,11 +872,22 @@ def _create_table_metadata(self, identifier: Identifier, schema_id: int,
def _list_tables(self, database_name: str, parameters: Dict[str, str]) -> List[str]:
"""List tables in database"""
table_name_pattern = parameters.get(TABLE_NAME_PATTERN)
table_type = parameters.get(TABLE_TYPE)
tables = []

for full_name, metadata in self.table_metadata_store.items():
identifier = Identifier.from_string(full_name)
metadata_table_type = (
metadata.schema.options.get(TYPE, "table")
if metadata and metadata.schema and metadata.schema.options
else "table"
)
table_type_matches = (
not table_type
or metadata_table_type == table_type
)
if (identifier.get_database_name() == database_name and
table_type_matches and
(not table_name_pattern or self._match_name_pattern(identifier.get_table_name(),
table_name_pattern))):
tables.append(identifier.get_table_name())
Expand Down