Skip to content

Commit d03c717

Browse files
authored
Add Events for Iceberg REST APIs (#2480)
1 parent 9648582 commit d03c717

30 files changed

+861
-549
lines changed

runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.polaris.service.admin.api.PolarisPrincipalRolesApiService;
7777
import org.apache.polaris.service.admin.api.PolarisPrincipalsApiService;
7878
import org.apache.polaris.service.config.ReservedProperties;
79-
import org.apache.polaris.service.events.AfterCatalogCreatedEvent;
8079
import org.apache.polaris.service.events.listeners.PolarisEventListener;
8180
import org.apache.polaris.service.types.PolicyIdentifier;
8281
import org.slf4j.Logger;
@@ -131,7 +130,6 @@ public Response createCatalog(
131130
validateExternalCatalog(catalog);
132131
Catalog newCatalog = CatalogEntity.of(adminService.createCatalog(request)).asCatalog();
133132
LOGGER.info("Created new catalog {}", newCatalog);
134-
polarisEventListener.onAfterCatalogCreated(new AfterCatalogCreatedEvent(newCatalog.getName()));
135133
return Response.status(Response.Status.CREATED).build();
136134
}
137135

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,7 @@
132132
import org.apache.polaris.service.catalog.io.FileIOFactory;
133133
import org.apache.polaris.service.catalog.io.FileIOUtil;
134134
import org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation;
135-
import org.apache.polaris.service.events.AfterTableCommitedEvent;
136-
import org.apache.polaris.service.events.AfterTableRefreshedEvent;
137-
import org.apache.polaris.service.events.AfterViewCommitedEvent;
138-
import org.apache.polaris.service.events.AfterViewRefreshedEvent;
139-
import org.apache.polaris.service.events.BeforeTableCommitedEvent;
140-
import org.apache.polaris.service.events.BeforeTableRefreshedEvent;
141-
import org.apache.polaris.service.events.BeforeViewCommitedEvent;
142-
import org.apache.polaris.service.events.BeforeViewRefreshedEvent;
135+
import org.apache.polaris.service.events.IcebergRestCatalogEvents;
143136
import org.apache.polaris.service.events.listeners.PolarisEventListener;
144137
import org.apache.polaris.service.task.TaskExecutor;
145138
import org.apache.polaris.service.types.NotificationRequest;
@@ -1446,8 +1439,8 @@ public void doRefresh() {
14461439
if (latestLocation == null) {
14471440
disableRefresh();
14481441
} else {
1449-
polarisEventListener.onBeforeTableRefreshed(
1450-
new BeforeTableRefreshedEvent(catalogName, tableIdentifier));
1442+
polarisEventListener.onBeforeRefreshTable(
1443+
new IcebergRestCatalogEvents.BeforeRefreshTableEvent(catalogName, tableIdentifier));
14511444
refreshFromMetadataLocation(
14521445
latestLocation,
14531446
SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1467,14 +1460,15 @@ public void doRefresh() {
14671460
Set.of(PolarisStorageActions.READ, PolarisStorageActions.LIST));
14681461
return TableMetadataParser.read(fileIO, metadataLocation);
14691462
});
1470-
polarisEventListener.onAfterTableRefreshed(
1471-
new AfterTableRefreshedEvent(catalogName, tableIdentifier));
1463+
polarisEventListener.onAfterRefreshTable(
1464+
new IcebergRestCatalogEvents.AfterRefreshTableEvent(catalogName, tableIdentifier));
14721465
}
14731466
}
14741467

14751468
public void doCommit(TableMetadata base, TableMetadata metadata) {
1476-
polarisEventListener.onBeforeTableCommited(
1477-
new BeforeTableCommitedEvent(tableIdentifier, base, metadata));
1469+
polarisEventListener.onBeforeCommitTable(
1470+
new IcebergRestCatalogEvents.BeforeCommitTableEvent(
1471+
catalogName, tableIdentifier, base, metadata));
14781472

14791473
LOGGER.debug(
14801474
"doCommit for table {} with metadataBefore {}, metadataAfter {}",
@@ -1618,8 +1612,9 @@ public void doCommit(TableMetadata base, TableMetadata metadata) {
16181612
updateTableLike(tableIdentifier, entity);
16191613
}
16201614

1621-
polarisEventListener.onAfterTableCommited(
1622-
new AfterTableCommitedEvent(catalogName, tableIdentifier, base, metadata));
1615+
polarisEventListener.onAfterCommitTable(
1616+
new IcebergRestCatalogEvents.AfterCommitTableEvent(
1617+
catalogName, tableIdentifier, base, metadata));
16231618
}
16241619

16251620
@Override
@@ -1810,8 +1805,8 @@ public void doRefresh() {
18101805
if (latestLocation == null) {
18111806
disableRefresh();
18121807
} else {
1813-
polarisEventListener.onBeforeViewRefreshed(
1814-
new BeforeViewRefreshedEvent(catalogName, identifier));
1808+
polarisEventListener.onBeforeRefreshView(
1809+
new IcebergRestCatalogEvents.BeforeRefreshViewEvent(catalogName, identifier));
18151810
refreshFromMetadataLocation(
18161811
latestLocation,
18171812
SHOULD_RETRY_REFRESH_PREDICATE,
@@ -1833,14 +1828,15 @@ public void doRefresh() {
18331828

18341829
return ViewMetadataParser.read(fileIO.newInputFile(metadataLocation));
18351830
});
1836-
polarisEventListener.onAfterViewRefreshed(
1837-
new AfterViewRefreshedEvent(catalogName, identifier));
1831+
polarisEventListener.onAfterRefreshView(
1832+
new IcebergRestCatalogEvents.AfterRefreshViewEvent(catalogName, identifier));
18381833
}
18391834
}
18401835

18411836
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
1842-
polarisEventListener.onBeforeViewCommited(
1843-
new BeforeViewCommitedEvent(catalogName, identifier, base, metadata));
1837+
polarisEventListener.onBeforeCommitView(
1838+
new IcebergRestCatalogEvents.BeforeCommitViewEvent(
1839+
catalogName, identifier, base, metadata));
18441840

18451841
// TODO: Maybe avoid writing metadata if there's definitely a transaction conflict
18461842
LOGGER.debug(
@@ -1940,8 +1936,9 @@ public void doCommit(ViewMetadata base, ViewMetadata metadata) {
19401936
updateTableLike(identifier, entity);
19411937
}
19421938

1943-
polarisEventListener.onAfterViewCommited(
1944-
new AfterViewCommitedEvent(catalogName, identifier, base, metadata));
1939+
polarisEventListener.onAfterCommitView(
1940+
new IcebergRestCatalogEvents.AfterCommitViewEvent(
1941+
catalogName, identifier, base, metadata));
19451942
}
19461943

19471944
protected String writeNewMetadataIfRequired(ViewMetadata metadata) {

runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@
102102
import org.apache.polaris.service.catalog.common.CatalogHandler;
103103
import org.apache.polaris.service.config.ReservedProperties;
104104
import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
105-
import org.apache.polaris.service.events.AfterTableCreatedEvent;
106105
import org.apache.polaris.service.events.listeners.PolarisEventListener;
107106
import org.apache.polaris.service.http.IcebergHttpUtil;
108107
import org.apache.polaris.service.http.IfNoneMatch;
@@ -392,11 +391,8 @@ public LoadTableResponse createTableDirect(Namespace namespace, CreateTableReque
392391
.withWriteOrder(request.writeOrder())
393392
.setProperties(reservedProperties.removeReservedProperties(request.properties()))
394393
.build();
395-
LoadTableResponse resp =
396-
catalogHandlerUtils.createTable(baseCatalog, namespace, requestWithoutReservedProperties);
397-
polarisEventListener.onAfterTableCreated(
398-
new AfterTableCreatedEvent(catalogName, identifier, resp.tableMetadata()));
399-
return resp;
394+
return catalogHandlerUtils.createTable(
395+
baseCatalog, namespace, requestWithoutReservedProperties);
400396
}
401397

402398
/**

0 commit comments

Comments
 (0)