Skip to content

Add Polaris Events to Persistence #1844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ protected PolarisTestMetaStoreManager createPolarisTestMetaStoreManager() {
session,
diagServices,
new PolarisConfigurationStore() {},
timeSource.withZone(ZoneId.systemDefault())));
timeSource.withZone(ZoneId.systemDefault()),
null));
}

@ParameterizedTest
Expand Down
3 changes: 3 additions & 0 deletions persistence/relational-jdbc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ dependencies {
implementation(libs.smallrye.common.annotation) // @Identifier
implementation(libs.postgresql)

compileOnly(project(":polaris-immutables"))
annotationProcessor(project(":polaris-immutables", configuration = "processor"))

testImplementation(libs.mockito.junit.jupiter)
testImplementation(libs.h2)
testImplementation(testFixtures(project(":polaris-core")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public InputStream openInitScriptResource(@Nonnull SchemaOptions schemaOptions)
} else {
final String schemaSuffix;
switch (schemaOptions.schemaVersion()) {
case null -> schemaSuffix = "schema-v2.sql";
case null -> schemaSuffix = "schema-v3.sql";
case 1 -> schemaSuffix = "schema-v1.sql";
case 2 -> schemaSuffix = "schema-v2.sql";
case 3 -> schemaSuffix = "schema-v3.sql";
default ->
throw new IllegalArgumentException(
"Unknown schema version " + schemaOptions.schemaVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.sql.DataSource;
Expand Down Expand Up @@ -193,6 +195,63 @@ public int executeUpdate(QueryGenerator.PreparedQuery preparedQuery) throws SQLE
});
}

/**
* Executes the INSERT/UPDATE Queries in batches. Requires that all SQL queries have the same
* parameterized form.
*
* @param preparedQueries : queries to be executed
* @return : Number of rows modified / inserted.
* @throws SQLException : Exception during Query Execution.
*/
public int executeBatchUpdate(QueryGenerator.PreparedBatchQuery preparedQueries)
throws SQLException {
if (preparedQueries.parametersList().isEmpty() || preparedQueries.sql().isEmpty()) {
return 0;
}
int batchSize = 100;
AtomicInteger successCount = new AtomicInteger();
return withRetries(
() -> {
try (Connection connection = borrowConnection();
PreparedStatement statement = connection.prepareStatement(preparedQueries.sql())) {
boolean autoCommit = connection.getAutoCommit();
boolean success = false;
connection.setAutoCommit(false);

try {
for (int i = 1; i <= preparedQueries.parametersList().size(); i++) {
List<Object> params = preparedQueries.parametersList().get(i - 1);
for (int j = 0; j < params.size(); j++) {
statement.setObject(j + 1, params.get(j));
}

statement.addBatch(); // Add to batch

if (i % batchSize == 0) {
successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum());
}
}

// Execute remaining queries in the batch
successCount.addAndGet(Arrays.stream(statement.executeBatch()).sum());
success = true;
} finally {
try {
if (success) {
connection.commit();
} else {
connection.rollback();
successCount.set(0);
}
} finally {
connection.setAutoCommit(autoCommit);
}
}
}
return successCount.get();
});
}

/**
* Transaction callback to be executed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.polaris.core.entity.PolarisEntityCore;
import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisEvent;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
Expand All @@ -64,6 +65,7 @@
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
import org.apache.polaris.core.storage.StorageLocation;
import org.apache.polaris.persistence.relational.jdbc.models.ModelEntity;
import org.apache.polaris.persistence.relational.jdbc.models.ModelEvent;
import org.apache.polaris.persistence.relational.jdbc.models.ModelGrantRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelPolicyMappingRecord;
import org.apache.polaris.persistence.relational.jdbc.models.ModelPrincipalAuthenticationData;
Expand Down Expand Up @@ -235,6 +237,63 @@ public void writeToGrantRecords(
}
}

@Override
public void writeEvents(@Nonnull List<PolarisEvent> events) {
if (events.isEmpty()) {
return; // or throw if empty list is invalid
}

try {
// Generate the SQL using the first event as the reference
PreparedQuery firstPreparedQuery =
QueryGenerator.generateInsertQuery(
ModelEvent.ALL_COLUMNS,
ModelEvent.TABLE_NAME,
ModelEvent.fromEvent(events.get(0))
.toMap(datasourceOperations.getDatabaseType())
.values()
.stream()
.toList(),
realmId);
String expectedSql = firstPreparedQuery.sql();

List<List<Object>> parametersList = new ArrayList<>();
parametersList.add(firstPreparedQuery.parameters());

// Process remaining events and verify SQL consistency
for (int i = 1; i < events.size(); i++) {
PolarisEvent event = events.get(i);
PreparedQuery pq =
QueryGenerator.generateInsertQuery(
ModelEvent.ALL_COLUMNS,
ModelEvent.TABLE_NAME,
ModelEvent.fromEvent(event)
.toMap(datasourceOperations.getDatabaseType())
.values()
.stream()
.toList(),
realmId);

if (!expectedSql.equals(pq.sql())) {
throw new RuntimeException("All events did not generate the same SQL");
}

parametersList.add(pq.parameters());
}

int totalUpdated =
datasourceOperations.executeBatchUpdate(
new QueryGenerator.PreparedBatchQuery(expectedSql, parametersList));

if (totalUpdated == 0) {
throw new SQLException("No events were inserted.");
}
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to write events due to %s", e.getMessage()), e);
}
}

@Override
public void deleteEntity(@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
ModelEntity modelEntity = ModelEntity.fromEntity(entity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class QueryGenerator {
/** A container for the SQL string and the ordered parameter values. */
public record PreparedQuery(String sql, List<Object> parameters) {}

/** A container for the SQL string and a list of the ordered parameter values. */
public record PreparedBatchQuery(String sql, List<List<Object>> parametersList) {}

/** A container for the query fragment SQL string and the ordered parameter values. */
record QueryFragment(String sql, List<Object> parameters) {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.polaris.persistence.relational.jdbc.models;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.polaris.core.entity.PolarisEvent;
import org.apache.polaris.immutables.PolarisImmutable;
import org.apache.polaris.persistence.relational.jdbc.DatabaseType;

@PolarisImmutable
public interface ModelEvent extends Converter<PolarisEvent> {
String TABLE_NAME = "EVENTS";

List<String> ALL_COLUMNS =
List.of(
"catalog_id",
"event_id",
"request_id",
"event_type",
"timestamp_ms",
"principal_name",
"resource_type",
"resource_identifier",
"additional_parameters");

// catalog id
String getCatalogId();

// event id
String getEventId();

// id of the request that generated this event
String getRequestId();

// event type that was created
String getEventType();

// timestamp in epoch milliseconds of when this event was emitted
long getTimestampMs();

// polaris principal who took this action
String getPrincipalName();

// Enum that states the type of resource was being operated on
PolarisEvent.ResourceType getResourceType();

// Which resource was operated on
String getResourceIdentifier();

// Additional parameters that were not earlier recorded
String getAdditionalParameters();

@Override
default PolarisEvent fromResultSet(ResultSet rs) throws SQLException {
var modelEvent =
ImmutableModelEvent.builder()
.catalogId(rs.getString("catalog_id"))
.eventId(rs.getString("event_id"))
.requestId(rs.getString("request_id"))
.eventType(rs.getString("event_type"))
.timestampMs(rs.getLong("timestamp_ms"))
.principalName(rs.getString("actor"))
.resourceType(PolarisEvent.ResourceType.valueOf(rs.getString("resource_type")))
.resourceIdentifier(rs.getString("resource_identifier"))
.additionalParameters(rs.getString("additional_parameters"))
.build();
return toEvent(modelEvent);
}

@Override
default Map<String, Object> toMap(DatabaseType databaseType) {
Map<String, Object> map = new LinkedHashMap<>();
map.put("catalog_id", getCatalogId());
map.put("event_id", getEventId());
map.put("request_id", getRequestId());
map.put("event_type", getEventType());
map.put("timestamp_ms", getTimestampMs());
map.put("principal_name", getPrincipalName());
map.put("resource_type", getResourceType().toString());
map.put("resource_identifier", getResourceIdentifier());
if (databaseType.equals(DatabaseType.POSTGRES)) {
map.put("additional_parameters", toJsonbPGobject(getAdditionalParameters()));
} else {
map.put("additional_parameters", getAdditionalParameters());
}
return map;
}

static ModelEvent fromEvent(PolarisEvent event) {
if (event == null) return null;

return ImmutableModelEvent.builder()
.catalogId(event.getCatalogId())
.eventId(event.getId())
.requestId(event.getRequestId())
.eventType(event.getEventType())
.timestampMs(event.getTimestampMs())
.principalName(event.getPrincipalName())
.resourceType(event.getResourceType())
.resourceIdentifier(event.getResourceIdentifier())
.additionalParameters(event.getAdditionalParameters())
.build();
}

static PolarisEvent toEvent(ModelEvent model) {
if (model == null) return null;

PolarisEvent polarisEvent =
new PolarisEvent(
model.getCatalogId(),
model.getEventId(),
model.getRequestId(),
model.getEventType(),
model.getTimestampMs(),
model.getPrincipalName(),
model.getResourceType(),
model.getResourceIdentifier());
polarisEvent.setAdditionalParameters(model.getAdditionalParameters());
return polarisEvent;
}
}
14 changes: 14 additions & 0 deletions persistence/relational-jdbc/src/main/resources/h2/schema-v1.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,17 @@ CREATE TABLE IF NOT EXISTS policy_mapping_record (
);

CREATE INDEX IF NOT EXISTS idx_policy_mapping_record ON policy_mapping_record (realm_id, policy_type_code, policy_catalog_id, policy_id, target_catalog_id, target_id);

CREATE TABLE IF NOT EXISTS events (
realm_id TEXT NOT NULL,
catalog_id TEXT NOT NULL,
event_id TEXT NOT NULL,
request_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp_ms BIGINT NOT NULL,
principal_name TEXT,
resource_type TEXT NOT NULL,
resource_identifier TEXT NOT NULL,
additional_parameters TEXT NOT NULL DEFAULT '{}',
PRIMARY KEY (event_id)
);
Loading
Loading