-
Notifications
You must be signed in to change notification settings - Fork 305
AWS CloudWatch Event Sink Implementation #1965
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AWS CloudWatch Event Sink Implementation #1965
Conversation
Didn't look into the whole PR, but two things I noticed:
|
Hi @snazy,
Yes, I'm assuming you're referring to the creation of one new event (
I didn't want to sink (no pun intended) more time trying to setup LocalStack for this repository if this approach was wildly off-the-mark. If you think the general approach is reasonable and the main things that will need change are implementation details, I will be glad to add that in the next revision. IIRC, this is also something that we only get with AWS - both Azure and GCP equivalents of CloudWatch do not have equivalent testing solutions (this could be outdated information but I can't find anything in a quick Google search). If having parity across CSPs on testing is important, maybe we settle for unit tests instead? Let me know your thoughts. |
I've added a test using LocalStack, which should help emulate CloudWatch. This PR is ready for a full review. |
service/common/src/main/java/org/apache/polaris/service/events/EventListenerConfiguration.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/PolarisEventListener.java
Outdated
Show resolved
Hide resolved
...e/common/src/test/java/org/apache/polaris/service/events/AwsCloudWatchEventListenerTest.java
Outdated
Show resolved
Hide resolved
...java/org/apache/polaris/service/quarkus/events/QuarkusPolarisEventListenerConfiguration.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are some thread-safety/concurrency issues and memory leaks in this change, which have to be addressed.
The implementation can also be simplified.
I'd also recommend to move this to its own module to streamline CI and testing in general.
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
service/common/src/main/java/org/apache/polaris/service/events/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
...e/common/src/test/java/org/apache/polaris/service/events/AwsCloudWatchEventListenerTest.java
Outdated
Show resolved
Hide resolved
.../polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListenerTest.java
Outdated
Show resolved
Hide resolved
Hey @snazy, can you take another pass over this one? It's changed a lot since we initially reviewed.
* #transformAndSendEvent(HashMap)} method to define how the JSON event data should be transmitted | ||
* or stored. | ||
*/ | ||
public abstract class JsonEventListener extends PolarisEventListener { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note, that I really like having this as a public class because if someone does want to write a plugin that doesn't interact with any of the Iceberg or other based libraries, they can always utilize this instead and only depend on Polaris.
} | ||
|
||
@Test | ||
void shouldCreateLogGroupAndStream() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are missing the negative test here, we also ned to test that it doesn't fail if the log group and stream already exists. You could just start up again after the the first asserts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are just missing one test of the "already exists" branch but other than that I think this is ready to go
...ice/src/main/java/org/apache/polaris/service/events/jsonEventListener/JsonEventListener.java
Outdated
Show resolved
Hide resolved
properties.put("realm", callContext.getRealmContext().getRealmIdentifier()); | ||
properties.put("principal", securityContext.getUserPrincipal().getName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit but I would recommend putting these into constants somewhere, e.g. AwsCloudWatchEventEventListener.KEY_REALM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting them as class-wide constants isn't a great idea IMO. These variables are RequestScoped, while the class as a whole is ApplicationScoped. The best we can probably do is to keep them as variables within the transformAndSendEvent
function (which will run per request) - but I'm not sure what that helps to clarify.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, I'm talking about the string literals like "realm"
here. Having a constant will be useful if someone else wants to extract the value from the event, e.g. they won't have to hard-code properties.get("realm")
@WithName("log-group") | ||
@WithDefault("polaris-cloudwatch-default-group") | ||
@Override | ||
String awsCloudwatchlogGroup(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cloudwatch
or CloudWatch
? We are using a mix of both casings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changed.
...ache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
Show resolved
Hide resolved
...ache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
...rvice/quarkus/events/jsonEventListener/aws/cloudwatch/QuarkusAwsCloudWatchConfiguration.java
Outdated
Show resolved
Hide resolved
...ache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
Outdated
Show resolved
Hide resolved
| `polaris.config.rollback.compaction.on-conflicts.enabled` | `false` | When set to true Polaris will apply the deconfliction by rollbacking those REPLACE operations snapshots which have the property of `polaris.internal.rollback.compaction.on-conflict` in their snapshot summary set to `rollback`, to resolve conflicts at the server end. | | ||
| `polaris.config.rollback.compaction.on-conflicts.enabled` | `false` | When set to true Polaris will apply the deconfliction by rollbacking those REPLACE operations snapshots which have the property of `polaris.internal.rollback.compaction.on-conflict` in their snapshot summary set to `rollback`, to resolve conflicts at the server end. | | ||
| `polaris.event-listener.type` | `no-op` | Define the Polaris event listener type. Supported values are `no-op`, `aws-cloudwatch`. | | ||
| `polaris.event-listener.aws-cloudwatch.log-group` | | Define the AWS CloudWatch log group name for the event listener. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we would IAM that polaris is running with have permissions to write to this log group right ? i don't know where to best write it ? thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it to this page, let me know your thoughts!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks a lot @adnanhemani for all the work !
I have added some suggestions, which are optional to address and are good to have !
String resourceType, | ||
String resourceName) { | ||
if (existsCheck.get()) { | ||
LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOGGER.debug("Log {} [{}] already exists", resourceType, resourceName); | |
LOGGER.debug("Resource {} [{}] already exists", resourceType, resourceName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - the resourceType
here will be "stream" or "group" to make a final log line stating "Log stream [xyz] already exists"
...ache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
Show resolved
Hide resolved
...ache/polaris/service/events/jsonEventListener/aws/cloudwatch/AwsCloudWatchEventListener.java
Show resolved
Hide resolved
| `polaris.config.rollback.compaction.on-conflicts.enabled` | `false` | When set to true Polaris will apply the deconfliction by rollbacking those REPLACE operations snapshots which have the property of `polaris.internal.rollback.compaction.on-conflict` in their snapshot summary set to `rollback`, to resolve conflicts at the server end. | | ||
| `polaris.event-listener.type` | `no-op` | Define the Polaris event listener type. Supported values are `no-op`, `aws-cloudwatch`. | | ||
| `polaris.event-listener.aws-cloudwatch.log-group` | `polaris-cloudwatch-default-group` | Define the AWS CloudWatch log group name for the event listener. | | ||
| `polaris.event-listener.aws-cloudwatch.log-stream` | `polaris-cloudwatch-default-stream`| Define the AWS CloudWatch log stream name for the event listener. Ensure that Polaris' IAM credentials have the following actions: "PutLogEvents", "DescribeLogStreams", and "DescribeLogGroups" on the specified log stream/group. If the specified log stream/group does not exist, then "CreateLogStream" and "CreateLogGroup" will also be required. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[not a blocker] IMHO adding IAM requirement might not be right place, is there a place where IAM with Polaris runs with requirements are there ? let me check too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see one other than the Getting Started - and I don't think that page is relevant here.
public void onAfterTableRefreshed(AfterTableRefreshedEvent event) { | ||
HashMap<String, Object> properties = new HashMap<>(); | ||
properties.put("event_type", event.getClass().getSimpleName()); | ||
properties.put("table_identifier", event.tableIdentifier().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[optional] wondering if we need catalog name too in this ? lets say i have a same namespace and table in the different catalogs in the realm ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this but we should put this as part of the instrumentation of the event itself. I will look into this for #2480.
Working on rebase for this - seems like it is now failing in main |
As per discussion at this morning's Polaris Community Sync, I am introducing this event listener that will allow users to sink events to AWS CloudWatch Logs - where they can search through events for auditing purposes, if they wish. I do intend to introduce similar changes for Azure and GCP once this is merged so that we have parity across all CSPs - but this will help nail down the exact pattern to make this work.
Below is a screenshot of a sample event sent to CloudWatch: