Skip to content

Commit 12ab744

Browse files
dimas-bsnazy
authored andcommitted
Extensible pagination token implementation
Based on #1838, following up on #1555 * Allows multiple implementations of `Token` referencing the "next page", encapsulated in `PageToken`. No changes to `polaris-core` needed to add custom `Token` implementations. * Extensible to (later) support (cryptographic) signatures to prevent tampered page-token * Refactor pagination code to delineate API-level page tokens and internal "pointers to data" * Requests deal with the "previous" token, user-provided page size (optional) and the previous request's page size. * Concentrate the logic of combining page size requests and previous tokens in `PageTokenUtil` * `PageToken` subclasses are no longer necessary. * Serialzation of `PageToken` uses Jackson serialization (smile format) Since no (metastore level) implementation handling pagination existed before, no backwards compatibility is needed.
1 parent 7a086fc commit 12ab744

File tree

32 files changed

+1312
-489
lines changed

32 files changed

+1312
-489
lines changed

integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,24 @@ public List<Namespace> listNamespaces(String catalog, Namespace parent) {
101101
}
102102
}
103103

104+
public ListNamespacesResponse listNamespaces(
105+
String catalog, Namespace parent, String pageToken, String pageSize) {
106+
Map<String, String> queryParams = new HashMap<>();
107+
if (!parent.isEmpty()) {
108+
// TODO change this for Iceberg 1.7.2:
109+
// queryParams.put("parent", RESTUtil.encodeNamespace(parent));
110+
queryParams.put("parent", Joiner.on('\u001f').join(parent.levels()));
111+
}
112+
queryParams.put("pageToken", pageToken);
113+
queryParams.put("pageSize", pageSize);
114+
try (Response response =
115+
request("v1/{cat}/namespaces", Map.of("cat", catalog), queryParams).get()) {
116+
assertThat(response.getStatus()).isEqualTo(OK.getStatusCode());
117+
ListNamespacesResponse res = response.readEntity(ListNamespacesResponse.class);
118+
return res;
119+
}
120+
}
121+
104122
public List<Namespace> listAllNamespacesChildFirst(String catalog) {
105123
List<Namespace> result = new ArrayList<>();
106124
for (int idx = -1; idx < result.size(); idx++) {
@@ -142,6 +160,20 @@ public List<TableIdentifier> listTables(String catalog, Namespace namespace) {
142160
}
143161
}
144162

163+
public ListTablesResponse listTables(
164+
String catalog, Namespace namespace, String pageToken, String pageSize) {
165+
String ns = RESTUtil.encodeNamespace(namespace);
166+
Map<String, String> queryParams = new HashMap<>();
167+
queryParams.put("pageToken", pageToken);
168+
queryParams.put("pageSize", pageSize);
169+
try (Response res =
170+
request("v1/{cat}/namespaces/" + ns + "/tables", Map.of("cat", catalog), queryParams)
171+
.get()) {
172+
assertThat(res.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
173+
return res.readEntity(ListTablesResponse.class);
174+
}
175+
}
176+
145177
public void dropTable(String catalog, TableIdentifier id) {
146178
String ns = RESTUtil.encodeNamespace(id.namespace());
147179
try (Response res =

integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767
import org.apache.iceberg.rest.RESTUtil;
6868
import org.apache.iceberg.rest.requests.CreateTableRequest;
6969
import org.apache.iceberg.rest.responses.ErrorResponse;
70+
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
71+
import org.apache.iceberg.rest.responses.ListTablesResponse;
7072
import org.apache.iceberg.types.Types;
7173
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
7274
import org.apache.polaris.core.admin.model.Catalog;
@@ -164,7 +166,8 @@ public class PolarisRestCatalogIntegrationTest extends CatalogTests<RESTCatalog>
164166

165167
private static final String[] DEFAULT_CATALOG_PROPERTIES = {
166168
"polaris.config.allow.unstructured.table.location", "true",
167-
"polaris.config.allow.external.table.location", "true"
169+
"polaris.config.allow.external.table.location", "true",
170+
"polaris.config.list-pagination-enabled", "true"
168171
};
169172

170173
@Retention(RetentionPolicy.RUNTIME)
@@ -1559,4 +1562,72 @@ public void testUpdateTableWithReservedProperty() {
15591562
.hasMessageContaining("reserved prefix");
15601563
genericTableApi.purge(currentCatalogName, namespace);
15611564
}
1565+
1566+
@Test
1567+
public void testPaginatedListNamespaces() {
1568+
String prefix = "testPaginatedListNamespaces";
1569+
for (int i = 0; i < 20; i++) {
1570+
Namespace namespace = Namespace.of(prefix + i);
1571+
restCatalog.createNamespace(namespace);
1572+
}
1573+
1574+
try {
1575+
Assertions.assertThat(catalogApi.listNamespaces(currentCatalogName, Namespace.empty()))
1576+
.hasSize(20);
1577+
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
1578+
int total = 0;
1579+
String pageToken = null;
1580+
do {
1581+
ListNamespacesResponse response =
1582+
catalogApi.listNamespaces(
1583+
currentCatalogName, Namespace.empty(), pageToken, String.valueOf(pageSize));
1584+
Assertions.assertThat(response.namespaces().size()).isLessThanOrEqualTo(pageSize);
1585+
total += response.namespaces().size();
1586+
pageToken = response.nextPageToken();
1587+
} while (pageToken != null);
1588+
Assertions.assertThat(total)
1589+
.as("Total paginated results for pageSize = " + pageSize)
1590+
.isEqualTo(20);
1591+
}
1592+
} finally {
1593+
for (int i = 0; i < 20; i++) {
1594+
Namespace namespace = Namespace.of(prefix + i);
1595+
restCatalog.dropNamespace(namespace);
1596+
}
1597+
}
1598+
}
1599+
1600+
@Test
1601+
public void testPaginatedListTables() {
1602+
String prefix = "testPaginatedListTables";
1603+
Namespace namespace = Namespace.of(prefix);
1604+
restCatalog.createNamespace(namespace);
1605+
for (int i = 0; i < 20; i++) {
1606+
restCatalog.createTable(TableIdentifier.of(namespace, prefix + i), SCHEMA);
1607+
}
1608+
1609+
try {
1610+
Assertions.assertThat(catalogApi.listTables(currentCatalogName, namespace)).hasSize(20);
1611+
for (var pageSize : List.of(1, 2, 3, 9, 10, 11, 19, 20, 21, 2000)) {
1612+
int total = 0;
1613+
String pageToken = null;
1614+
do {
1615+
ListTablesResponse response =
1616+
catalogApi.listTables(
1617+
currentCatalogName, namespace, pageToken, String.valueOf(pageSize));
1618+
Assertions.assertThat(response.identifiers().size()).isLessThanOrEqualTo(pageSize);
1619+
total += response.identifiers().size();
1620+
pageToken = response.nextPageToken();
1621+
} while (pageToken != null);
1622+
Assertions.assertThat(total)
1623+
.as("Total paginated results for pageSize = " + pageSize)
1624+
.isEqualTo(20);
1625+
}
1626+
} finally {
1627+
for (int i = 0; i < 20; i++) {
1628+
restCatalog.dropTable(TableIdentifier.of(namespace, prefix + i));
1629+
}
1630+
restCatalog.dropNamespace(namespace);
1631+
}
1632+
}
15621633
}

persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkMetaStoreSessionImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
5454
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5555
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
56-
import org.apache.polaris.core.persistence.pagination.HasPageSize;
56+
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
5757
import org.apache.polaris.core.persistence.pagination.Page;
5858
import org.apache.polaris.core.persistence.pagination.PageToken;
5959
import org.apache.polaris.core.persistence.transactional.AbstractTransactionalPersistence;
@@ -468,6 +468,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
468468
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
469469
@Nonnull Function<PolarisBaseEntity, T> transformer,
470470
@Nonnull PageToken pageToken) {
471+
471472
// full range scan under the parent for that type
472473
Stream<PolarisBaseEntity> data =
473474
this.store
@@ -477,11 +478,7 @@ public List<EntityNameLookupRecord> lookupEntityActiveBatchInCurrentTxn(
477478
.map(ModelEntity::toEntity)
478479
.filter(entityFilter);
479480

480-
if (pageToken instanceof HasPageSize hasPageSize) {
481-
data = data.limit(hasPageSize.getPageSize());
482-
}
483-
484-
return Page.fromItems(data.map(transformer).collect(Collectors.toList()));
481+
return Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity);
485482
}
486483

487484
/** {@inheritDoc} */

persistence/eclipselink/src/main/java/org/apache/polaris/extension/persistence/impl/eclipselink/PolarisEclipseLinkStore.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.polaris.core.entity.PolarisEntityType;
3636
import org.apache.polaris.core.entity.PolarisGrantRecord;
3737
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
38+
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
3839
import org.apache.polaris.core.persistence.pagination.PageToken;
3940
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
4041
import org.apache.polaris.core.policy.PolicyEntity;
@@ -294,7 +295,17 @@ List<ModelEntity> lookupFullEntitiesActive(
294295

295296
// Currently check against ENTITIES not joining with ENTITIES_ACTIVE
296297
String hql =
297-
"SELECT m from ModelEntity m where m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
298+
"SELECT m from ModelEntity m where"
299+
+ " m.catalogId=:catalogId and m.parentId=:parentId and m.typeCode=:typeCode";
300+
301+
var entityIdToken = pageToken.valueAs(EntityIdToken.class);
302+
if (entityIdToken.isPresent()) {
303+
hql += " and m.id > :tokenId";
304+
}
305+
306+
if (pageToken.paginationRequested()) {
307+
hql += " order by m.id asc";
308+
}
298309

299310
TypedQuery<ModelEntity> query =
300311
session
@@ -303,6 +314,11 @@ List<ModelEntity> lookupFullEntitiesActive(
303314
.setParameter("parentId", parentId)
304315
.setParameter("typeCode", entityType.getCode());
305316

317+
if (entityIdToken.isPresent()) {
318+
long tokenId = entityIdToken.get().entityId();
319+
query = query.setParameter("tokenId", tokenId);
320+
}
321+
306322
return query.getResultList();
307323
}
308324

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.LinkedHashMap;
3232
import java.util.List;
3333
import java.util.Map;
34+
import java.util.concurrent.atomic.AtomicReference;
3435
import java.util.function.Function;
3536
import java.util.function.Predicate;
3637
import java.util.stream.Collectors;
@@ -50,7 +51,7 @@
5051
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
5152
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
5253
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
53-
import org.apache.polaris.core.persistence.pagination.HasPageSize;
54+
import org.apache.polaris.core.persistence.pagination.EntityIdToken;
5455
import org.apache.polaris.core.persistence.pagination.Page;
5556
import org.apache.polaris.core.persistence.pagination.PageToken;
5657
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -449,7 +450,7 @@ public <T> Page<T> listEntities(
449450
@Nonnull Predicate<PolarisBaseEntity> entityFilter,
450451
@Nonnull Function<PolarisBaseEntity, T> transformer,
451452
@Nonnull PageToken pageToken) {
452-
Map<String, Object> params =
453+
Map<String, Object> whereEquals =
453454
Map.of(
454455
"catalog_id",
455456
catalogId,
@@ -459,29 +460,41 @@ public <T> Page<T> listEntities(
459460
entityType.getCode(),
460461
"realm_id",
461462
realmId);
463+
Map<String, Object> whereGreater;
462464

463465
// Limit can't be pushed down, due to client side filtering
464466
// absence of transaction.
467+
String orderByColumnName = null;
468+
if (pageToken.paginationRequested()) {
469+
orderByColumnName = ModelEntity.ID_COLUMN;
470+
whereGreater =
471+
pageToken
472+
.valueAs(EntityIdToken.class)
473+
.map(
474+
entityIdToken ->
475+
Map.<String, Object>of(ModelEntity.ID_COLUMN, entityIdToken.entityId()))
476+
.orElse(Map.of());
477+
} else {
478+
whereGreater = Map.of();
479+
}
480+
465481
try {
466482
PreparedQuery query =
467483
QueryGenerator.generateSelectQuery(
468-
ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, params);
469-
List<PolarisBaseEntity> results = new ArrayList<>();
484+
ModelEntity.ALL_COLUMNS,
485+
ModelEntity.TABLE_NAME,
486+
whereEquals,
487+
whereGreater,
488+
orderByColumnName);
489+
AtomicReference<Page<T>> results = new AtomicReference<>();
470490
datasourceOperations.executeSelectOverStream(
471491
query,
472492
new ModelEntity(),
473493
stream -> {
474494
var data = stream.filter(entityFilter);
475-
if (pageToken instanceof HasPageSize hasPageSize) {
476-
data = data.limit(hasPageSize.getPageSize());
477-
}
478-
data.forEach(results::add);
495+
results.set(Page.mapped(pageToken, data, transformer, EntityIdToken::fromEntity));
479496
});
480-
List<T> resultsOrEmpty =
481-
results == null
482-
? Collections.emptyList()
483-
: results.stream().filter(entityFilter).map(transformer).collect(Collectors.toList());
484-
return Page.fromItems(resultsOrEmpty);
497+
return results.get();
485498
} catch (SQLException e) {
486499
throw new RuntimeException(
487500
String.format("Failed to retrieve polaris entities due to %s", e.getMessage()), e);

persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/QueryGenerator.java

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import jakarta.annotation.Nonnull;
23+
import jakarta.annotation.Nullable;
2324
import java.util.ArrayList;
2425
import java.util.Arrays;
2526
import java.util.Collections;
@@ -58,8 +59,27 @@ public static PreparedQuery generateSelectQuery(
5859
@Nonnull List<String> projections,
5960
@Nonnull String tableName,
6061
@Nonnull Map<String, Object> whereClause) {
61-
QueryFragment where = generateWhereClause(new HashSet<>(projections), whereClause);
62-
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql());
62+
return generateSelectQuery(projections, tableName, whereClause, Map.of(), null);
63+
}
64+
65+
/**
66+
* Generates a SELECT query with projection and filtering.
67+
*
68+
* @param projections List of columns to retrieve.
69+
* @param tableName Target table name.
70+
* @param whereEquals Column-value pairs used in WHERE filtering.
71+
* @return A parameterized SELECT query.
72+
* @throws IllegalArgumentException if any whereClause column isn't in projections.
73+
*/
74+
public static PreparedQuery generateSelectQuery(
75+
@Nonnull List<String> projections,
76+
@Nonnull String tableName,
77+
@Nonnull Map<String, Object> whereEquals,
78+
@Nonnull Map<String, Object> whereGreater,
79+
@Nullable String orderByColumn) {
80+
QueryFragment where =
81+
generateWhereClause(new HashSet<>(projections), whereEquals, whereGreater);
82+
PreparedQuery query = generateSelectQuery(projections, tableName, where.sql(), orderByColumn);
6383
return new PreparedQuery(query.sql(), where.parameters());
6484
}
6585

@@ -107,7 +127,8 @@ public static PreparedQuery generateSelectQueryWithEntityIds(
107127
params.add(realmId);
108128
String where = " WHERE (catalog_id, id) IN (" + placeholders + ") AND realm_id = ?";
109129
return new PreparedQuery(
110-
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where).sql(), params);
130+
generateSelectQuery(ModelEntity.ALL_COLUMNS, ModelEntity.TABLE_NAME, where, null).sql(),
131+
params);
111132
}
112133

113134
/**
@@ -156,7 +177,7 @@ public static PreparedQuery generateUpdateQuery(
156177
@Nonnull List<Object> values,
157178
@Nonnull Map<String, Object> whereClause) {
158179
List<Object> bindingParams = new ArrayList<>(values);
159-
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause);
180+
QueryFragment where = generateWhereClause(new HashSet<>(allColumns), whereClause, Map.of());
160181
String setClause = allColumns.stream().map(c -> c + " = ?").collect(Collectors.joining(", "));
161182
String sql =
162183
"UPDATE " + getFullyQualifiedTableName(tableName) + " SET " + setClause + where.sql();
@@ -176,34 +197,49 @@ public static PreparedQuery generateDeleteQuery(
176197
@Nonnull List<String> tableColumns,
177198
@Nonnull String tableName,
178199
@Nonnull Map<String, Object> whereClause) {
179-
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause);
200+
QueryFragment where = generateWhereClause(new HashSet<>(tableColumns), whereClause, Map.of());
180201
return new PreparedQuery(
181202
"DELETE FROM " + getFullyQualifiedTableName(tableName) + where.sql(), where.parameters());
182203
}
183204

184205
private static PreparedQuery generateSelectQuery(
185-
@Nonnull List<String> columnNames, @Nonnull String tableName, @Nonnull String filter) {
206+
@Nonnull List<String> columnNames,
207+
@Nonnull String tableName,
208+
@Nonnull String filter,
209+
@Nullable String orderByColumn) {
186210
String sql =
187211
"SELECT "
188212
+ String.join(", ", columnNames)
189213
+ " FROM "
190214
+ getFullyQualifiedTableName(tableName)
191215
+ filter;
216+
if (orderByColumn != null) {
217+
sql += " ORDER BY " + orderByColumn + " ASC";
218+
}
192219
return new PreparedQuery(sql, Collections.emptyList());
193220
}
194221

195222
@VisibleForTesting
196223
static QueryFragment generateWhereClause(
197-
@Nonnull Set<String> tableColumns, @Nonnull Map<String, Object> whereClause) {
224+
@Nonnull Set<String> tableColumns,
225+
@Nonnull Map<String, Object> whereEquals,
226+
@Nonnull Map<String, Object> whereGreater) {
198227
List<String> conditions = new ArrayList<>();
199228
List<Object> parameters = new ArrayList<>();
200-
for (Map.Entry<String, Object> entry : whereClause.entrySet()) {
229+
for (Map.Entry<String, Object> entry : whereEquals.entrySet()) {
201230
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
202231
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
203232
}
204233
conditions.add(entry.getKey() + " = ?");
205234
parameters.add(entry.getValue());
206235
}
236+
for (Map.Entry<String, Object> entry : whereGreater.entrySet()) {
237+
if (!tableColumns.contains(entry.getKey()) && !entry.getKey().equals("realm_id")) {
238+
throw new IllegalArgumentException("Invalid query column: " + entry.getKey());
239+
}
240+
conditions.add(entry.getKey() + " > ?");
241+
parameters.add(entry.getValue());
242+
}
207243
String clause = conditions.isEmpty() ? "" : " WHERE " + String.join(" AND ", conditions);
208244
return new QueryFragment(clause, parameters);
209245
}

0 commit comments

Comments
 (0)