Skip to content

Add Polaris Events to Persistence #1844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

adnanhemani
Copy link
Contributor

@adnanhemani adnanhemani commented Jun 10, 2025

As per my proposal on the Dev ML, this is the implementation of persisting Polaris Events to a persistence. The overall goal for this PR is to store Events in the Polaris persistence layer - which will later be used to power the Iceberg Events API.

High-level overview of the changes:

  • Created a new persistence table and model for Events. Table schema was publicized on the Dev ML here.
  • Implemented a new persistence method to flush events to said table
  • Implemented an in-memory and file-based buffer implementation for flushing events to the persistence
  • Created configurations for enabling all of these features
  • Modified PolarisEventListener and PolarisEvents to optimally achieve this goal
  • Created new sample events for CreateTable (explicitly part of the Iceberg Events API spec) and CreateCatalog (which is not explicitly part of the Iceberg Events API spec and will be served as a CustomOperation).

Here are the following upcoming items that will be updated in this PR soon:

  • Testing for the new code paths added

Upcoming items that will be tackled in subsequent PRs (and were excluded to ensure that this PR contains only the end-to-end MVP):

  • Support for multiple datasources to store the events
  • Allowing multiple event listeners to be set
  • Additional event types and related instrumentation

@snazy
Copy link
Member

snazy commented Jun 10, 2025

Thanks @adnanhemani for tackling the effort.

It looks that this change introduces another serialization implementation, although we already have Jackson in place. Can you explain why that's needed?

It's a bit unclear why these "buffers" are needed. Would you mind elaborating on this?

I have some concerns around how the approach could be implemented in the NoSQL world, via #1189. There we already have a way to iterate over recent changes (aka: events). Do you have some idea around how both could live together? Would be nice to consider NoSQL in this new approach, because we already agreed that NoSQL will become part of Polaris. I think it's better to have a way that satisfies both persistence worlds.

@adnanhemani
Copy link
Contributor Author

Thanks for the high-level comments, @snazy!

It looks that this change introduces another serialization implementation, although we already have Jackson in place. Can you explain why that's needed?

My anecdotal experience is that Jackson is a much slower serialization library than Kryo and given that we want to make these commands quick so that the user does not notice a difference, I went ahead with a quicker implementation. I do understand that this brings additional overhead/maintainance - so I'm okay to shift the code to use Jackson instead if that's a hard requirement. WDYT?

It's a bit unclear why these "buffers" are needed. Would you mind elaborating on this?

This was earlier discussed on the ML thread. TL;DR on this is: if we aim to support some read activities in the near future, flushing to persistence constantly per call will become quite heavy on the persistence. To ensure we are not hammering the persistence (which in the current case is the same as the metastore) and causing an accidentally DDoS, these buffers will help.

I have some concerns around how the approach could be implemented in the NoSQL world, via #1189. There we already have a way to iterate over recent changes (aka: events). Do you have some idea around how both could live together? Would be nice to consider NoSQL in this new approach, because we already agreed that NoSQL will become part of Polaris. I think it's better to have a way that satisfies both persistence worlds.

I took a brief look at the linked PR for NoSQL - but to be fair, it is a massive PR, so please correct and/or augment my knowledge as required.

Per my understanding, the changes are stored to commit to the persistence. I'm in agreement that this is very similar to events in terms of what they are representing, but not in terms of the way they are being used. Events (which I am only modifying in this PR) are to be used as administrator-facing representation of customer-triggered actions, whereas Changes (being introduced in the NoSQL PR) are a way for committing actions to the persistence. Given that Events should not be solely for changes to the persistence, I don't really see how Changes can be used to power and/or replace Events.

The way I imagine things would be that Events can (and should) be stored in the NoSQL persistence as well - and any calls to the future Events API should understand which type of persistence layer was used for Events storage and delegate the call to that persistence type. That is why I have introduced an append-only writeEvents method in the BasePersistence interface - NoSQL should also implement that.

Base line to state: I 100% agree that the Events functionality should exist in NoSQL as well as JDBC-Relational - and I'm happy to help contribute towards this once NoSQL finalizes and merges. But I'm not sure that Changes is helpful in this journey - unless we'd like to evolve that object into something that can represent all events as well.

@snazy
Copy link
Member

snazy commented Jun 11, 2025

I don't think we're in a rush with this change, because the mandatory support/spec in Iceberg is not there yet. I think it's safer to wait for the Iceberg change before adding something to Polaris. WDYT?

Copy link
Contributor

@adutra adutra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adnanhemani I must say I'm not at all satisfied with the current state of FileBufferPolarisPersistenceEventListener.

In fact, I am not at all convinced that we need file-based buffers here.

Have you thought about the event bus in Quarkus / Vertx? This mechanism allows to asynchronously process events in a dedicated thread pool, without hurting the original request latencies.

https://quarkus.io/guides/vertx-reference#eventbus

Copy link
Contributor Author

@adnanhemani adnanhemani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adnanhemani I must say I'm not at all satisfied with the current state of FileBufferPolarisPersistenceEventListener.

I would be glad to hear further feedback on this if there is anything further than the comments you've left on the review so far!

Have you thought about the event bus in Quarkus / Vertx? This mechanism allows to asynchronously process events in a dedicated thread pool, without hurting the original request latencies.

I did think about Vertx and similar solutions but the largest issue we have is there is no durability guarantees we can provide when using those. If Polaris crashes at any point after which the event is placed on the event bus but not processed, the event is lost and there is no way to recover. Please correct me if I'm wrong here.

@snazy
Copy link
Member

snazy commented Jun 18, 2025

From yesterday's discussion:
We should go with the minimal viable change.
There are strong concerns about the "buffering/spilling" introduced here.

Query patterns & event payload, considering the Iceberg proposal, are still not set in stone - I have mentioned concerns about the IRC proposal as it stands. Considering especially the event payload size issues in that proposal, I think it is way too early to push this one.

@adutra
Copy link
Contributor

adutra commented Jun 18, 2025

From yesterday's discussion: We should go with the minimal viable change. There are strong concerns about the "buffering/spilling" introduced here.

Query patterns & event payload, considering the Iceberg proposal, are still not set in stone - I have mentioned concerns about the IRC proposal as it stands. Considering especially the event payload size issues in that proposal, I think it is way too early to push this one.

+1 to the above.

Also I would like to stress that the need for a commitlog-like structure on disk stems from the design choice to isolate events persistence from catalog persistence. If all writes (catalog and events) were done within the same transaction, we would be leveraging the transactional backend to do the heavy lifting of maintaining those two consistent with each other. And we would get "exactly once" semantics for free. For this reason, I think we need to take a step back and rethink the persistence model.

@adnanhemani
Copy link
Contributor Author

@snazy @adutra I think all comments you've both made have been addressed on the ML thread here: https://lists.apache.org/thread/mnr3ryb78qtkn6g3m40g86621bmok253

I would encourage that we keep our notes in that thread for the sake of continuity of thought and then we can update this PR with major updates from that thread.

But to be clear - it would be great to continue reviewing this PR. That email thread should not mean this PR is stale while the thread is still active; unless things change significantly in the Iceberg Events API proposal, this PR should still be considered valid and living.

Copy link
Contributor

@adutra adutra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm -1 of the current design, first because of the concurrency and resource leak issues that I alluded to in the discussions above; and second, because I think that the introduction of a commitlog-like structure on disk is not necessary.

@adnanhemani
Copy link
Contributor Author

Based on our discussion at the latest Polaris Community sync, please take a look at the new implementation. A variety of changes were made based on the suggestions:

  • File Buffer was removed
  • Re-did configuration reading
  • Changed all events back to Java record. This comes with the overhead of needing to create the Event ID manually - and passing in applicable contexts to the Event Listeners directly.
    • Having the PolarisCallContext helps ensure we can always get the time for the event using the Java Clock.
    • Added EventId to all events as well.
  • Implemented a request ID in the PolarisCallContext. This will allow us to efficiently (and correctly) determine the request ID for any generated event.
  • Implemented JDBC Batch Inserts rather than using a single SQL statement

cc @adutra @snazy

@adnanhemani adnanhemani requested a review from adutra July 1, 2025 06:03
@@ -85,6 +91,10 @@ public Clock getClock() {
return clock;
}

public String getRequestId() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not thrilled about the idea of adding more stuff to this class, which is already a bag of unrelated beans.

Also, we already have the notion of a request ID for logging purposes:

I would prefer to use the same request ID coming from HTTP headers if available, and if not, use a random UUID. Wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this class is combining a lot of unrelated things...but I can't think of any better other place to keep the request ID. If you have a suggestion, I'm happy to investigate it!

I would prefer to use the same request ID coming from HTTP headers if available, and if not, use a random UUID. Wdyt?

I did not see this when I searched through the repository - but this is a great call. Let me see if I can integrate this through.

/** Event listener that stores all emitted events forever. Not recommended for use in production. */
@ApplicationScoped
@Identifier("test")
public class TestPolarisEventListener extends PolarisEventListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this wasn't introduced in this PR, but I'm wondering what usage a Polaris user could possibly make of this listener? The default one is no-op which imho is fine. If this one is only used in our own test suite, and has no value for users, could we move it to the src/test or src/testFixtures folder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was discussed on the original PR here: https://github.com/apache/polaris/pull/922/files#r2000612762

I'll keep the same stance as the original thread - we can take this as a followup task (by creating a GH issue) to clean this and all other similarly implemented beans. But I don't want to cross the wires for that effort with this PR, which is already quite large as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's test code which should not live in main/src

AtomicInteger successCount = new AtomicInteger();
return withRetries(
() -> {
String sql = preparedQueries.get(0).sql();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct that this is relying on the assumption that all PreparedQuery instances have the exact same SQL query?

The issue is that nothing in the method signature prevents a caller from calling this method with different SQL queries. I think at the very least, you should add a check for this invariant. But this hints at a badly designed signature. A proper signature would be something like:

public int executeBatchUpdate(QueryGenerator.PreparedBatchQuery preparedBatchQuery)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct that this is relying on the assumption that all PreparedQuery instances have the exact same SQL query?

Yes, that is written in the Javadoc above this method.

A proper signature would be something like: public int executeBatchUpdate(QueryGenerator.PreparedBatchQuery preparedBatchQuery)

I've implemented it in the next revision, but makes the code a bit less clean imo. Please do check and let me know if there are any suggestions on this.

Copy link
Member

@snazy snazy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned a bunch of issues in the review comments.
Most importantly:

  • an easy to exploit denial-of-service issue
  • heap issues (GC and OOM)
  • database issues (huge payloads)

package org.apache.polaris.service.events;

/** Emitted when Polaris intends to create a table. */
public record BeforeCatalogCreatedEvent(String eventId, String catalogName)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are new events added in this PR, which is out of scope (not mentioned in the PR summary) of the already big PR. These are not "samples", but used in production code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured that if we are adding the "After" version of the event, then we might as well add the "Before" to keep the pairing of the events together. But if we are being pedantic about sticking specifically to the explicitly mentioned scope, then I can remove this and reintroduce it in a different PR.

*/
public record BeforeTaskAttemptedEvent(long taskEntityId, CallContext callContext, int attempt)
public record BeforeTaskAttemptedEvent(
String eventId, long taskEntityId, CallContext callContext, int attempt)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize before that some records for events contain a CallContext - that should really be removed, as it is runtime specific information referencing resources, not pure event related information. There is no way to re-construct an event without the original CallContext.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we should not have a CallContext as part of the event record itself. But while this change is technically not in the scope of this PR, I will make it anyways as we are sending in the CallContext into the event listener with this PR.


/** {@link BeforeCatalogCreatedEvent} */
public void onBeforeCatalogCreated(
BeforeCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird - there's a CallContext here and also in this and other events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is intentional - CallContext is sent along the event record rather than part of it. We need constructs and information from within the CallContext to be able to process these events. Let me know if there is a question about this here after seeing the usage pattern of the CallContext.

/** Event listener that stores all emitted events forever. Not recommended for use in production. */
@ApplicationScoped
@Identifier("test")
public class TestPolarisEventListener extends PolarisEventListener {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's test code which should not live in main/src

return callCtx.getPolarisCallContext().getRequestId();
}

private String getUsername(SecurityContext securityContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Principal.getName() is not guaranteed to be a username

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that is a requirement that it must be the username. Any unique identifier for that Principal should be fine. But if there's a better way to represent that principal, please do suggest.

Comment on lines +129 to +130
private long getTimestamp(CallContext callCtx) {
return callCtx.getPolarisCallContext().getClock().millis();
}

private String getRequestId(CallContext callCtx) {
return callCtx.getPolarisCallContext().getRequestId();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do these values become different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if the question is why did these two methods get extracted specifically? If that's the question, the answer is just that these were longer, repetitive code that it was much easier to exact into these utility methods to maintain DRYness.

Comment on lines +89 to +122
@Override
public void onAfterTableCreated(
AfterTableCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {
org.apache.polaris.core.entity.PolarisEvent polarisEvent =
new org.apache.polaris.core.entity.PolarisEvent(
event.catalogName(),
event.eventId(),
getRequestId(callCtx),
event.getClass().getSimpleName(),
getTimestamp(callCtx),
getUsername(securityContext),
PolarisEvent.ResourceType.TABLE,
event.identifier().toString());
Map<String, String> additionalParameters =
Map.of(
"table-uuid",
event.metadata().uuid(),
"metadata",
TableMetadataParser.toJson(event.metadata()));
polarisEvent.setAdditionalParameters(additionalParameters);

addToBuffer(polarisEvent, callCtx);
}

@Override
public void onAfterCatalogCreated(
AfterCatalogCreatedEvent event, CallContext callCtx, SecurityContext securityContext) {
org.apache.polaris.core.entity.PolarisEvent polarisEvent =
new PolarisEvent(
event.catalogName(),
event.eventId(),
getRequestId(callCtx),
event.getClass().getSimpleName(),
getTimestamp(callCtx),
getUsername(securityContext),
PolarisEvent.ResourceType.CATALOG,
event.catalogName());
addToBuffer(polarisEvent, callCtx);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear why these are special implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just part of what's mentioned in the scope of the PR. The remaining/new events will be instrumented accordingly in a future PR - I did not want to add all events immediately to ensure that the focus of the PR stays on the main logic instead of the implementation of all events.

Comment on lines 59 to 66
polarisEventListener.onBeforeRequestRateLimited(
new BeforeRequestRateLimitedEvent(
ctx.getMethod(), ctx.getUriInfo().getAbsolutePath().toString()));
PolarisEvent.createEventId(),
ctx.getMethod(),
ctx.getUriInfo().getAbsolutePath().toString()),
CallContext.getCurrentContext(),
ctx.getSecurityContext());
ctx.abortWith(Response.status(Response.Status.TOO_MANY_REQUESTS).build());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite a dangerous call site. An attacker can easily overload the system when rate-limiting kicks in.
I consider this a legit security issue that would justify a CVE.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Referring back to the original discussion on this: https://github.com/apache/polaris/pull/922/files#r2001535434

I'm not taking any sides on that discussion - but will say that this PR is not aiming to change anything about this event from before and is therefore definitely not in the scope of this PR. Please feel free to discuss this event outside this PR.

If it helps - I can confirm, I will not be attempting to persist this event in this event listener implementation.

@adnanhemani adnanhemani force-pushed the ahemani/add_polaris_events_to_persistence branch from d9ac602 to b62dfbc Compare July 15, 2025 00:28
}

@Produces
@RequestScoped
public RealmConfig realmContext(CallContext callContext) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, fixed this during rebasing changes. It's a quick typo fix.

@adnanhemani adnanhemani requested review from snazy and adutra July 15, 2025 00:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants