Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -375,23 +375,14 @@ public Response createTable(
prefix,
TableIdentifier.of(namespace, createTableRequest.name()));
if (createTableRequest.stageCreate()) {
if (delegationModes.isEmpty()) {
return Response.ok(catalog.createTableStaged(ns, createTableRequest)).build();
} else {
return Response.ok(
catalog.createTableStagedWithWriteDelegation(
ns, createTableRequest, refreshCredentialsEndpoint))
.build();
}
} else if (delegationModes.isEmpty()) {
LoadTableResponse response = catalog.createTableDirect(ns, createTableRequest);
return tryInsertETagHeader(
Response.ok(response), response, namespace, createTableRequest.name())
return Response.ok(
catalog.createTableStaged(
ns, createTableRequest, delegationModes, refreshCredentialsEndpoint))
.build();
} else {
LoadTableResponse response =
catalog.createTableDirectWithWriteDelegation(
ns, createTableRequest, refreshCredentialsEndpoint);
catalog.createTableDirect(
ns, createTableRequest, delegationModes, refreshCredentialsEndpoint);
return tryInsertETagHeader(
Response.ok(response), response, namespace, createTableRequest.name())
.build();
Expand Down Expand Up @@ -439,17 +430,13 @@ public Response loadTable(
securityContext,
prefix,
catalog -> {
Optional<LoadTableResponse> response;

if (delegationModes.isEmpty()) {
response = catalog.loadTableIfStale(tableIdentifier, ifNoneMatch, snapshots);
} else {
Optional<String> refreshCredentialsEndpoint =
getRefreshCredentialsEndpoint(delegationModes, prefix, tableIdentifier);
response =
catalog.loadTableWithAccessDelegationIfStale(
tableIdentifier, ifNoneMatch, snapshots, refreshCredentialsEndpoint);
}
Optional<LoadTableResponse> response =
catalog.loadTable(
tableIdentifier,
snapshots,
ifNoneMatch,
delegationModes,
getRefreshCredentialsEndpoint(delegationModes, prefix, tableIdentifier));

if (response.isEmpty()) {
return Response.notModified().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.polaris.service.catalog.iceberg;

import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED;
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
Expand All @@ -32,6 +33,7 @@
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -98,6 +100,7 @@
import org.apache.polaris.core.secrets.UserSecretsManager;
import org.apache.polaris.core.storage.AccessConfig;
import org.apache.polaris.core.storage.PolarisStorageActions;
import org.apache.polaris.service.catalog.AccessDelegationMode;
import org.apache.polaris.service.catalog.SupportsNotifications;
import org.apache.polaris.service.catalog.common.CatalogHandler;
import org.apache.polaris.service.config.ReservedProperties;
Expand Down Expand Up @@ -374,25 +377,8 @@ public ListTablesResponse listTables(Namespace namespace) {
* @return ETagged {@link LoadTableResponse} to uniquely identify the table metadata
*/
public LoadTableResponse createTableDirect(Namespace namespace, CreateTableRequest request) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_DIRECT;
TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(op, identifier);

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
}
CreateTableRequest requestWithoutReservedProperties =
CreateTableRequest.builder()
.withName(request.name())
.withLocation(request.location())
.withPartitionSpec(request.spec())
.withSchema(request.schema())
.withWriteOrder(request.writeOrder())
.setProperties(reservedProperties.removeReservedProperties(request.properties()))
.build();
return catalogHandlerUtils.createTable(
baseCatalog, namespace, requestWithoutReservedProperties);
return createTableDirect(
namespace, request, EnumSet.noneOf(AccessDelegationMode.class), Optional.empty());
}

/**
Expand All @@ -406,10 +392,32 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
Namespace namespace,
CreateTableRequest request,
Optional<String> refreshCredentialsEndpoint) {
PolarisAuthorizableOperation op =
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
op, TableIdentifier.of(namespace, request.name()));
return createTableDirect(
namespace, request, EnumSet.of(VENDED_CREDENTIALS), refreshCredentialsEndpoint);
}

public void authorizeCreateTableDirect(
Namespace namespace,
CreateTableRequest request,
EnumSet<AccessDelegationMode> delegationModes) {
if (delegationModes.isEmpty()) {
TableIdentifier identifier = TableIdentifier.of(namespace, request.name());
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT, identifier);
} else {
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION,
TableIdentifier.of(namespace, request.name()));
}
}

public LoadTableResponse createTableDirect(
Namespace namespace,
CreateTableRequest request,
EnumSet<AccessDelegationMode> delegationModes,
Optional<String> refreshCredentialsEndpoint) {

authorizeCreateTableDirect(namespace, request, delegationModes);

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
Expand Down Expand Up @@ -440,11 +448,11 @@ public LoadTableResponse createTableDirectWithWriteDelegation(
return buildLoadTableResponseWithDelegationCredentials(
tableIdentifier,
tableMetadata,
delegationModes,
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
PolarisStorageActions.LIST),
SNAPSHOTS_ALL,
refreshCredentialsEndpoint)
.build();
} else if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -500,26 +508,40 @@ private TableMetadata stageTableCreateHelper(Namespace namespace, CreateTableReq
}

public LoadTableResponse createTableStaged(Namespace namespace, CreateTableRequest request) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.CREATE_TABLE_STAGED;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
op, TableIdentifier.of(namespace, request.name()));
return createTableStaged(
namespace, request, EnumSet.noneOf(AccessDelegationMode.class), Optional.empty());
}

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
public LoadTableResponse createTableStagedWithWriteDelegation(
Namespace namespace,
CreateTableRequest request,
Optional<String> refreshCredentialsEndpoint) {
return createTableStaged(
namespace, request, EnumSet.of(VENDED_CREDENTIALS), refreshCredentialsEndpoint);
}

private void authorizeCreateTableStaged(
Namespace namespace,
CreateTableRequest request,
EnumSet<AccessDelegationMode> delegationModes) {
if (delegationModes.isEmpty()) {
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
PolarisAuthorizableOperation.CREATE_TABLE_STAGED,
TableIdentifier.of(namespace, request.name()));
} else {
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION,
TableIdentifier.of(namespace, request.name()));
}
TableMetadata metadata = stageTableCreateHelper(namespace, request);
return LoadTableResponse.builder().withTableMetadata(metadata).build();
}

public LoadTableResponse createTableStagedWithWriteDelegation(
public LoadTableResponse createTableStaged(
Namespace namespace,
CreateTableRequest request,
EnumSet<AccessDelegationMode> delegationModes,
Optional<String> refreshCredentialsEndpoint) {
PolarisAuthorizableOperation op =
PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION;
authorizeCreateTableLikeUnderNamespaceOperationOrThrow(
op, TableIdentifier.of(namespace, request.name()));

authorizeCreateTableStaged(namespace, request, delegationModes);

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
Expand All @@ -531,8 +553,8 @@ public LoadTableResponse createTableStagedWithWriteDelegation(
return buildLoadTableResponseWithDelegationCredentials(
ident,
metadata,
EnumSet.of(VENDED_CREDENTIALS),
Set.of(PolarisStorageActions.ALL),
SNAPSHOTS_ALL,
refreshCredentialsEndpoint)
.build();
}
Expand Down Expand Up @@ -616,32 +638,12 @@ public LoadTableResponse loadTable(TableIdentifier tableIdentifier, String snaps
*/
public Optional<LoadTableResponse> loadTableIfStale(
TableIdentifier tableIdentifier, IfNoneMatch ifNoneMatch, String snapshots) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.LOAD_TABLE;
authorizeBasicTableLikeOperationOrThrow(
op, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier);

if (ifNoneMatch != null) {
// Perform freshness-aware table loading if caller specified ifNoneMatch.
IcebergTableLikeEntity tableEntity = getTableEntity(tableIdentifier);
if (tableEntity == null || tableEntity.getMetadataLocation() == null) {
LOGGER
.atWarn()
.addKeyValue("tableIdentifier", tableIdentifier)
.addKeyValue("tableEntity", tableEntity)
.log("Failed to getMetadataLocation to generate ETag when loading table");
} else {
// TODO: Refactor null-checking into the helper method once we create a more canonical
// interface for associate etags with entities.
String tableEntityTag =
IcebergHttpUtil.generateETagForMetadataFileLocation(tableEntity.getMetadataLocation());
if (ifNoneMatch.anyMatch(tableEntityTag)) {
return Optional.empty();
}
}
}

LoadTableResponse rawResponse = catalogHandlerUtils.loadTable(baseCatalog, tableIdentifier);
return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
return loadTable(
tableIdentifier,
snapshots,
ifNoneMatch,
EnumSet.noneOf(AccessDelegationMode.class),
Optional.empty());
}

public LoadTableResponse loadTableWithAccessDelegation(
Expand All @@ -668,6 +670,24 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
IfNoneMatch ifNoneMatch,
String snapshots,
Optional<String> refreshCredentialsEndpoint) {
return loadTable(
tableIdentifier,
snapshots,
ifNoneMatch,
EnumSet.of(VENDED_CREDENTIALS),
refreshCredentialsEndpoint);
}

private Set<PolarisStorageActions> authorizeLoadTable(
TableIdentifier tableIdentifier, EnumSet<AccessDelegationMode> delegationModes) {
if (delegationModes.isEmpty()) {
authorizeBasicTableLikeOperationOrThrow(
PolarisAuthorizableOperation.LOAD_TABLE,
PolarisEntitySubType.ICEBERG_TABLE,
tableIdentifier);
return Set.of();
}

// Here we have a single method that falls through multiple candidate
// PolarisAuthorizableOperations because instead of identifying the desired operation up-front
// and
Expand Down Expand Up @@ -709,6 +729,19 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig());
}

return actionsRequested;
}

public Optional<LoadTableResponse> loadTable(
TableIdentifier tableIdentifier,
String snapshots,
IfNoneMatch ifNoneMatch,
EnumSet<AccessDelegationMode> delegationModes,
Optional<String> refreshCredentialsEndpoint) {

Set<PolarisStorageActions> actionsRequested =
authorizeLoadTable(tableIdentifier, delegationModes);

if (ifNoneMatch != null) {
// Perform freshness-aware table loading if caller specified ifNoneMatch.
IcebergTableLikeEntity tableEntity = getTableEntity(tableIdentifier);
Expand All @@ -735,14 +768,15 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(

if (table instanceof BaseTable baseTable) {
TableMetadata tableMetadata = baseTable.operations().current();
return Optional.of(
LoadTableResponse response =
buildLoadTableResponseWithDelegationCredentials(
tableIdentifier,
tableMetadata,
delegationModes,
actionsRequested,
snapshots,
refreshCredentialsEndpoint)
.build());
.build();
return Optional.of(filterResponseToSnapshots(response, snapshots));
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return NoSuchTableException for now
throw new NoSuchTableException("Table does not exist: %s", tableIdentifier.toString());
Expand All @@ -754,11 +788,16 @@ public Optional<LoadTableResponse> loadTableWithAccessDelegationIfStale(
private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredentials(
TableIdentifier tableIdentifier,
TableMetadata tableMetadata,
EnumSet<AccessDelegationMode> delegationModes,
Set<PolarisStorageActions> actions,
String snapshots,
Optional<String> refreshCredentialsEndpoint) {
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);

if (!delegationModes.contains(VENDED_CREDENTIALS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably safer when more delegation modes get supported:

Suggested change
if (!delegationModes.contains(VENDED_CREDENTIALS)) {
if (delegationModes.isEmpty()) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used .isEmpty() in other cases, but in this method the code below specifically performs credential vending, so I think this check is correct :)

We can certainly refactor when we support remote S3 request signing. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, WFM.

return responseBuilder;
}

if (baseCatalog instanceof SupportsCredentialDelegation credentialDelegation) {
LOGGER
.atDebug()
Expand Down