Skip to content

capralifecycle/liflig-messaging

Repository files navigation

liflig-messaging

Library for applications that process messages asynchronously. Provides interfaces and implementations for message queues, topics, pollers and processors.

The library is split into modules:

  • liflig-messaging-core provides the Queue, Topic and MessageProcessor interfaces, as well as the MessagePoller class for polling messages from a queue.
  • liflig-messaging-awssdk implements the Queue interface for AWS SQS and the Topic interface for AWS SNS, using the AWS SDK.
  • liflig-messaging-sqs-lambda provides a function for processing messages in AWS Lambda functions that use SQS as the event source. It improves failure handling for individual messages in a batch, and allows you to use the same MessageProcessor interface as in long-running services.

Contents:

Usage

Processing messages from a queue

The main thing that your application has to concern itself with, is implementing the MessageProcessor interface. This is where your application-specific message processing logic goes.

Example:

import no.liflig.messaging.Message
import no.liflig.messaging.MessageProcessor
import no.liflig.messaging.ProcessingResult

class ExampleEventProcessor : MessageProcessor {
  override fun process(message: Message): ProcessingResult {
    val event = ExampleEvent.fromJson(message.body)

    handleEvent(event)

    return ProcessingResult.Success
  }
}

How you use your MessageProcessor depends on if your application is a long-running service, or an AWS Lambda function.

Long-running services

For a long-running service, you'll use MessagePoller. You pass your MessageProcessor and a Queue implementation to its constructor, and call messagePoller.start() on application start-up. This spawns a thread that runs side-by-side with your service, polling messages from the queue and passing them to your processor.

If you use liflig-messaging-awssdk, you can use the SqsQueue implementation for AWS SQS (Simple Queue Service).

import no.liflig.messaging.MessagePoller
import no.liflig.messaging.Queue
import no.liflig.messaging.awssdk.SqsQueue
import software.amazon.awssdk.services.sqs.SqsClient

class App(config: Config) {
  val inputQueue: Queue = SqsQueue(SqsClient.create(), config.inputQueueUrl)

  val messagePoller = MessagePoller(
    queue = inputQueue,
    messageProcessor = ExampleEventProcessor(),
  )

  fun start() {
    messagePoller.start()
  }
}

fun main() {
  App(Config.load()).start()
}

AWS Lambda functions

For an AWS Lambda function, you'll construct your MessageProcessor implementation in your LambdaHandler, and then call handleLambdaSqsEvent (from liflig-messaging-sqs-lambda) in your handler method:

import no.liflig.messaging.lambda.handleLambdaSqsEvent

class LambdaHandler(
  private val eventProcessor: MessageProcessor = ExampleEventProcessor(),
) {
  fun handle(sqsEvent: SQSEvent): SQSBatchResponse {
    return handleLambdaSqsEvent(sqsEvent, messageProcessor)
  }
}

Important

In order for handleLambdaSqsEvent to work correctly, your handler method must return SQSBatchResponse. And in order for that to work, you have to configure batch item failures on your Lambda <-> SQS integration. In AWS CDK, you do this on the SqsEventSource:

myLambda.addEventSource(
  new SqsEventSource(myQueue, { reportBatchItemFailures: true })
);

For more on the reason behind this, see the docstring on handleLambdaSqsEvent.

Sending messages to a queue

Construct a Queue like you would for the MessagePoller example above. You can then call send on it to send a message.

import no.liflig.messaging.Queue
import no.liflig.messaging.awssdk.SqsQueue
import software.amazon.awssdk.services.sqs.SqsClient

class ExampleEventSender(
  private val outputQueue: Queue = SqsQueue(SqsClient.create(), queueUrl = "..."),
) {
  fun sendEvent(event: ExampleEvent) {
    outputQueue.send(event.toJson())
  }
}

Publishing to a message topic

The Topic interface from liflig-messaging-core represents a pub-sub message topic. liflig-messaging-awssdk provides SnsTopic, an implementation of this interface for AWS SNS (Simple Notification Service).

import no.liflig.messaging.Topic
import no.liflig.messaging.awssdk.SnsTopic
import software.amazon.awssdk.services.sns.SnsClient

class ExampleEventPublisher(
  private val eventTopic: Topic = SnsTopic(SnsClient.create(), topicArn = "..."),
) {
  fun publishEvent(event: ExampleEvent) {
    eventTopic.publish(ExampleEvent().toJson())
  }
}

Adding to your project

We use Maven as the example build system here.

First, add the core module:

<dependency>
  <groupId>no.liflig</groupId>
  <artifactId>liflig-messaging-core</artifactId>
  <version>${liflig-messaging.version}</version>
</dependency>

Then, add extra modules depending on your use-case:

  • If your application is a long-running service, and you want to use the AWS SDK implementations:
    <dependency>
      <groupId>no.liflig</groupId>
      <artifactId>liflig-messaging-awssdk</artifactId>
      <version>${liflig-messaging.version}</version>
    </dependency>
  • If your application is an AWS Lambda function with an SQS event source:
    <dependency>
      <groupId>no.liflig</groupId>
      <artifactId>liflig-messaging-sqs-lambda</artifactId>
      <version>${liflig-messaging.version}</version>
    </dependency>

Build & Test

mvn clean install

Lint code

mvn spotless:check

Format code

mvn spotless:apply

About

Kotlin library for applications that process messages asynchronously.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors 7

Languages