Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add event based processing #22

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

SteffanPerry
Copy link

@SteffanPerry SteffanPerry commented Mar 13, 2025

Issue #: 21

Description of changes:
Added the ability to process events not enqueued by ActiveJob, see issues/21

By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache 2.0 license.

@mullermp
Copy link
Contributor

I haven't forgotten about this. @alextwoods What are your initial impressions?

@alextwoods
Copy link
Contributor

Thanks for submitting this and sorry I've been slow to review - I've been swamped with other work.

At surface review, this looks like a reasonable direction. I should be able to do a deeper review next week.

@@ -7,20 +7,68 @@ module SQS
class JobRunner
attr_reader :id, :class_name

def self.queue_event_handlers
@@queue_handlers ||= {}.tap do |handlers|
Aws::ActiveJob::SQS.config.queues.values.each do |queue_config|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users can override config values through ENV or cli options, so Aws::ActiveJob::SQS.config is not always a reflection of the actual configuration the poller/executor are using.

I think what we'll need to do is:

  1. Add job_class to the list of QUEUE_ENV_CONFIGS
  2. Plumb through the resolved queue config that is being used from the poller to the runner. I'm not quite sure the best way to do that - we may need to add an additional, optional parameter to the execute method on the executor.

end

# Active job messages will have message_attributes key 'aws_sqs_active_job_class'
def is_active_job_message?(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit, can drop the is and just have active_job_message?

```yaml
# config/aws_active_job_sqs.yml
queues:
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Lets use a different queue name than default here. While you can combine events + active job in one queue, I think we'd generally encourage separate usage (at least not conflating the default queue with event processing).

job_class: 'MyEventJob' # Job processor class
```

When defined as an event job, you will receive a message object with attributes required to process a raw sqs message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use some clarification - what does it mean to be "defined as an event job"?

I think we could say, if the message is NOT an active job (has the active job class attribute), then it will be treated like an event and the defined job processor class will be called with attributes to process the raw sqs message.

queues:
default:
url: 'https://my-queue-url.amazon.aws'
job_class: 'MyEventJob' # Job processor class
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not certain about the name job_class for this. To me that implies either: 1) only jobs of this class can be queued on this queue or 2) that all messages received on this queue will be processed using this job.

I'm not quite sure what the right name for this is - some ideas:

  • event_message_class - hopefully implies that this is used only for event messages
  • unknown_job_handler - I like this a little less, but this is effectively the behavior (ie, when we can't identity what job class to use, this is what gets used).
  • event_processor_job_class

@mullermp - any thoughts on this naming?


#### Event Processing: Manual handle sqs messages.

In the event you need more time to process a job, or you would like to delete the job manually, you have all necessary data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The job will get deleted already once it is processed. We should potentially not mention that as an option here as I think we'd not want to encourage that.

We could use some more documentation on how error handling interacts with this (I think error handling is fairly well described in the readme, but we probably want to repeat some of it here). In particular around how retires and dlq ect work with this.

sqs_message.change_visibility(visibility_timeout: 500)
process_message(message['body'])
rescue => e
received_count = message.dig('attributes', 'ApproximateReceiveCount').to_i
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to think through how retries/errors work with this feature a bit more.

Since we are using an ActiveJob class to process the event, we should be relying on ActiveJob's retry and error handling I think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants