Skip to content

Commit 7af6d7a

Browse files
committed
Native kotlin suspending event gateway
1 parent f7857d2 commit 7af6d7a

File tree

3 files changed

+178
-0
lines changed

3 files changed

+178
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package org.axonframework.extensions.kotlin.eventhandling.gateway
2+
3+
import org.axonframework.common.BuilderUtils
4+
import org.axonframework.common.Registration
5+
import org.axonframework.eventhandling.EventBus
6+
import org.axonframework.eventhandling.EventMessage
7+
import org.axonframework.eventhandling.GenericEventMessage
8+
import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptor
9+
import java.util.*
10+
import java.util.concurrent.CopyOnWriteArrayList
11+
12+
/**
13+
* Implementation of the [SuspendingEventGateway].
14+
*
15+
* @author Joel Feinstein
16+
* @since 0.2.0
17+
*/
18+
class DefaultSuspendingEventGateway(config: Builder.() -> Unit) : SuspendingEventGateway {
19+
private val eventBus: EventBus
20+
private val dispatchInterceptors: MutableList<SuspendingMessageDispatchInterceptor<EventMessage<*>>>
21+
22+
init {
23+
Builder().apply(config).validate().let {
24+
eventBus = it.eventBus
25+
dispatchInterceptors = CopyOnWriteArrayList(it.dispatchInterceptors)
26+
}
27+
}
28+
29+
/**
30+
* This implementation will process interceptors and dispatch each event before moving on to the next.
31+
*/
32+
override suspend fun publish(events: Iterable<*>): List<EventMessage<*>> {
33+
return events.map {
34+
GenericEventMessage.asEventMessage<Any>(it).processEventInterceptors().apply { publish() }
35+
}
36+
}
37+
38+
override fun registerDispatchInterceptor(interceptor: SuspendingMessageDispatchInterceptor<EventMessage<*>>): Registration {
39+
dispatchInterceptors += interceptor
40+
return Registration { dispatchInterceptors.remove(interceptor) }
41+
}
42+
43+
private suspend fun EventMessage<*>.processEventInterceptors(): EventMessage<*> {
44+
return dispatchInterceptors.fold(this) { acc, interceptor -> interceptor.intercept(acc) }
45+
}
46+
47+
private fun EventMessage<*>.publish() {
48+
eventBus.publish(this)
49+
}
50+
51+
/**
52+
* Builder class to instantiate [DefaultSuspendingEventGateway].
53+
*
54+
* The `dispatchInterceptors` are defaulted to an empty list.
55+
* The [EventBus] is a **hard requirement**
56+
*
57+
*/
58+
class Builder {
59+
lateinit var eventBus: EventBus
60+
val dispatchInterceptors: List<SuspendingMessageDispatchInterceptor<EventMessage<*>>> = CopyOnWriteArrayList()
61+
62+
/**
63+
* Validate whether the fields contained in this Builder as set accordingly.
64+
*
65+
* @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
66+
* specifications
67+
*/
68+
fun validate() = apply {
69+
BuilderUtils.assertThat(this::eventBus.isInitialized, { it == true }, "The EventBus is a hard requirement and should be provided")
70+
}
71+
}
72+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.axonframework.extensions.kotlin.eventhandling.gateway
2+
3+
import org.axonframework.eventhandling.EventMessage
4+
import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptor
5+
import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptorSupport
6+
import java.util.*
7+
8+
/**
9+
* Variation of the [EventGateway], wrapping a [EventBus] for a friendlier API.
10+
* Provides support for Kotlin coroutines.
11+
*
12+
* @author Joel Feinstein
13+
* @since 0.2.0
14+
*/
15+
interface SuspendingEventGateway : SuspendingMessageDispatchInterceptorSupport<EventMessage<*>> {
16+
/**
17+
* Publishes given `events`.
18+
*
19+
* Given `events` are wrapped as payloads of a [EventMessage] that are eventually published on the
20+
* [EventBus], unless `event` already implements [Message]. In that case, a `EventMessage`
21+
* is constructed from that message's payload and [org.axonframework.messaging.MetaData].
22+
*
23+
* @param events events to be published
24+
* @return events that were published. DO NOTE: if there were some [SuspendingMessageDispatchInterceptor]s
25+
* registered to this `gateway`, they will be processed first, before returning events to the caller. The
26+
* order of returned events is the same as one provided as the input parameter.
27+
*/
28+
suspend fun publish(vararg events: Any?): List<EventMessage<*>> {
29+
return publish(events.toList())
30+
}
31+
32+
/**
33+
* Publishes the given `events`.
34+
*
35+
* Given `events` are wrapped as payloads of a [EventMessage] that are eventually published on the
36+
* [EventBus], unless `event` already implements [Message]. In that case, a `EventMessage`
37+
* is constructed from that message's payload and [org.axonframework.messaging.MetaData].
38+
*
39+
* @param events the list of events to be published
40+
* @return events that were published. DO NOTE: if there were some [SuspendingMessageDispatchInterceptor]s
41+
* registered to this `gateway`, they will be processed first, before returning events to the caller. The
42+
* order of returned events is the same as one provided as the input parameter.
43+
*/
44+
suspend fun publish(events: Iterable<*>): List<EventMessage<*>>
45+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package org.axonframework.extensions.kotlin.eventhandling.gateway
2+
3+
import io.mockk.every
4+
import io.mockk.justRun
5+
import io.mockk.mockk
6+
import io.mockk.verify
7+
import kotlinx.coroutines.runBlocking
8+
import org.axonframework.eventhandling.EventBus
9+
import org.axonframework.eventhandling.EventMessage
10+
import org.axonframework.eventhandling.GenericEventMessage
11+
import org.axonframework.extensions.kotlin.messaging.andMetaData
12+
import org.axonframework.extensions.kotlin.messaging.registerDispatchInterceptor
13+
import org.junit.jupiter.api.BeforeEach
14+
import org.junit.jupiter.api.Test
15+
import org.junit.jupiter.api.assertThrows
16+
import kotlin.test.assertEquals
17+
18+
class DefaultSuspendingEventGatewayTests {
19+
private lateinit var eventBus: EventBus
20+
private lateinit var gateway: SuspendingEventGateway
21+
22+
@BeforeEach
23+
fun setUp() {
24+
eventBus = mockk {
25+
justRun { publish(any<EventMessage<*>>()) }
26+
}
27+
28+
gateway = DefaultSuspendingEventGateway {
29+
eventBus = this@DefaultSuspendingEventGatewayTests.eventBus
30+
}
31+
}
32+
33+
@Test
34+
fun testPublish(): Unit = runBlocking {
35+
gateway.publish("event", "event")
36+
verify(exactly = 2) { eventBus.publish(any<EventMessage<*>>()) }
37+
}
38+
39+
@Test
40+
fun testPublishWithError(): Unit = runBlocking {
41+
every { eventBus.publish(any<EventMessage<*>>()) } throws RuntimeException("oops")
42+
assertThrows<RuntimeException> {
43+
gateway.publish("event", "event")
44+
}
45+
}
46+
47+
@Test
48+
fun testDispatchInterceptor(): Unit = runBlocking {
49+
gateway.registerDispatchInterceptor {
50+
GenericEventMessage.asEventMessage<Any?>(it.payload).andMetaData("key" to "value")
51+
}
52+
assertEquals("value", gateway.publish("event").first().metaData["key"])
53+
}
54+
55+
@Test
56+
fun testPublishOrder(): Unit = runBlocking {
57+
val event1 = GenericEventMessage.asEventMessage<Any>("event1")
58+
val event2 = GenericEventMessage.asEventMessage<Any>("event2")
59+
assertEquals(listOf(event1, event2), gateway.publish(event1, event2))
60+
}
61+
}

0 commit comments

Comments
 (0)