Skip to content

Commit d5e57d5

Browse files
committed
Native kotlin suspending command gateway
1 parent d7d4bbc commit d5e57d5

File tree

8 files changed

+384
-0
lines changed

8 files changed

+384
-0
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package org.axonframework.extensions.kotlin.commandhandling.gateway
2+
3+
import kotlinx.coroutines.suspendCancellableCoroutine
4+
import org.axonframework.commandhandling.CommandBus
5+
import org.axonframework.commandhandling.CommandCallback
6+
import org.axonframework.commandhandling.CommandMessage
7+
import org.axonframework.commandhandling.CommandResultMessage
8+
import org.axonframework.commandhandling.GenericCommandMessage
9+
import org.axonframework.commandhandling.gateway.RetryScheduler
10+
import org.axonframework.commandhandling.gateway.RetryingCallback
11+
import org.axonframework.common.Registration
12+
import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptor
13+
import org.axonframework.extensions.kotlin.messaging.SuspendingResultHandlerInterceptor
14+
import java.util.concurrent.CopyOnWriteArrayList
15+
import kotlin.coroutines.Continuation
16+
import kotlin.coroutines.resume
17+
18+
class DefaultSuspendingCommandGateway(config: Builder.() -> Unit) : SuspendingCommandGateway {
19+
private val commandBus: CommandBus
20+
private val retryScheduler: RetryScheduler?
21+
private val dispatchInterceptors: MutableList<SuspendingMessageDispatchInterceptor<CommandMessage<*>>>
22+
private val resultInterceptors: MutableList<SuspendingResultHandlerInterceptor<CommandMessage<*>, CommandResultMessage<*>>>
23+
24+
init {
25+
Builder().apply(config).validate().let {
26+
commandBus = it.commandBus
27+
retryScheduler = it.retrySchedule
28+
dispatchInterceptors = CopyOnWriteArrayList(it.dispatchInterceptors)
29+
resultInterceptors = CopyOnWriteArrayList(it.resultInterceptors)
30+
}
31+
}
32+
33+
@Suppress("UNCHECKED_CAST")
34+
override suspend fun <R> send(command: Any): R {
35+
val commandMessage = GenericCommandMessage.asCommandMessage<Any?>(command)
36+
val processedCommandMessage = processDispatchInterceptors(commandMessage) as CommandMessage<Any?>
37+
val processedCommandResultMessage = processResultInterceptors(processedCommandMessage, dispatchCommand<Any?, Any?>(processedCommandMessage))
38+
39+
if (processedCommandResultMessage.isExceptional) {
40+
throw processedCommandResultMessage.exceptionResult()
41+
} else {
42+
return processedCommandResultMessage.payload as R
43+
}
44+
}
45+
46+
override fun registerDispatchInterceptor(interceptor: SuspendingMessageDispatchInterceptor<CommandMessage<*>>): Registration {
47+
dispatchInterceptors += interceptor
48+
return Registration { dispatchInterceptors.remove(interceptor) }
49+
}
50+
51+
override fun registerResultHandlerInterceptor(interceptor: SuspendingResultHandlerInterceptor<CommandMessage<*>, CommandResultMessage<*>>): Registration {
52+
resultInterceptors += interceptor
53+
return Registration { resultInterceptors.remove(interceptor) }
54+
}
55+
56+
private suspend fun processDispatchInterceptors(commandMessage: CommandMessage<*>): CommandMessage<*> {
57+
return dispatchInterceptors.fold(commandMessage) { acc, interceptor -> interceptor.intercept(acc) }
58+
}
59+
60+
private suspend fun <C> processResultInterceptors(commandMessage: CommandMessage<C>, commandResultMessage: CommandResultMessage<*>): CommandResultMessage<*> {
61+
return resultInterceptors.fold(commandResultMessage) { acc, interceptor -> interceptor.intercept(commandMessage, acc) }
62+
}
63+
64+
private suspend fun <C, R> dispatchCommand(commandMessage: CommandMessage<C>): CommandResultMessage<out R> {
65+
return suspendCancellableCoroutine { continuation ->
66+
var callback: CommandCallback<C, R> = SuspendingCommandCallback(continuation)
67+
68+
if (retryScheduler != null) {
69+
callback = RetryingCallback(callback, retryScheduler, commandBus)
70+
}
71+
72+
commandBus.dispatch(commandMessage, callback)
73+
}
74+
}
75+
76+
private class SuspendingCommandCallback<C, R>(
77+
private val continuation: Continuation<CommandResultMessage<out R>>
78+
) : CommandCallback<C, R> {
79+
override fun onResult(
80+
commandMessage: CommandMessage<out C>,
81+
commandResultMessage: CommandResultMessage<out R>
82+
) {
83+
continuation.resume(commandResultMessage)
84+
}
85+
}
86+
87+
class Builder internal constructor() {
88+
lateinit var commandBus: CommandBus
89+
var retrySchedule: RetryScheduler? = null
90+
val dispatchInterceptors: MutableList<SuspendingMessageDispatchInterceptor<CommandMessage<*>>> = CopyOnWriteArrayList()
91+
val resultInterceptors: MutableList<SuspendingResultHandlerInterceptor<CommandMessage<*>, CommandResultMessage<*>>> = CopyOnWriteArrayList()
92+
93+
fun validate() = apply {
94+
require(this::commandBus.isInitialized) { "The CommandBus is a hard requirement and should be provided"}
95+
}
96+
}
97+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.axonframework.extensions.kotlin.commandhandling.gateway
2+
3+
import org.axonframework.commandhandling.CommandMessage
4+
import org.axonframework.commandhandling.CommandResultMessage
5+
import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptorSupport
6+
import org.axonframework.extensions.kotlin.messaging.SuspendiongResultHandlerInterceptorSupport
7+
8+
interface SuspendingCommandGateway :SuspendingMessageDispatchInterceptorSupport<CommandMessage<*>>,
9+
SuspendiongResultHandlerInterceptorSupport<CommandMessage<*>, CommandResultMessage<*>> {
10+
11+
/**
12+
* Sends the given {@code command} and returns the result.
13+
* <p/>
14+
* The given {@code command} is wrapped as the payload of a {@link CommandMessage} that is eventually posted on the
15+
* {@link CommandBus}, unless the {@code command} already implements {@link Message}. In that case, a
16+
* {@code CommandMessage} is constructed from that message's payload and {@link MetaData}.
17+
*
18+
* @param command the command to dispatch
19+
* @param <R> the type of the command result
20+
* @return a {@link Mono} which is resolved when the command is executed
21+
*/
22+
suspend fun <R> send(command: Any): R
23+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package org.axonframework.extensions.kotlin.messaging
2+
3+
import kotlinx.coroutines.runBlocking
4+
import org.axonframework.messaging.Message
5+
import org.axonframework.messaging.MessageDispatchInterceptor
6+
import java.util.function.BiFunction
7+
8+
/**
9+
* Interceptor that allows messages to be intercepted and modified before they are dispatched.
10+
*
11+
* @param <M> the message type this interceptor can process
12+
* @author Joel Feinstein
13+
* @since 0.2.0
14+
*/
15+
interface SuspendingMessageDispatchInterceptor<M : Message<*>> : MessageDispatchInterceptor<M> {
16+
/**
17+
* Intercepts a message.
18+
*
19+
* @param message a message to be intercepted
20+
* @return the message to dispatch
21+
*/
22+
suspend fun intercept(message: M): M
23+
24+
override fun handle(messages: List<M>): BiFunction<Int, M, M> {
25+
return BiFunction { _, message -> runBlocking { intercept(message) } }
26+
}
27+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.axonframework.extensions.kotlin.messaging
2+
3+
import org.axonframework.common.Registration
4+
import org.axonframework.messaging.Message
5+
6+
/**
7+
* Interface marking components capable of registering a {@link SuspendingMessageDispatchInterceptor}.
8+
* Generally, these are messaging components injected into the sending end of the communication.
9+
*
10+
* @param <M> The type of the message to be intercepted
11+
* @author Joel Feinstein
12+
* @since 0.2.0
13+
*/
14+
fun interface SuspendingMessageDispatchInterceptorSupport<M : Message<*>> {
15+
/**
16+
* Register the given [SuspendingMessageDispatchInterceptor]. After registration, the interceptor will be
17+
* invoked for each message dispatched on the messaging component that it was registered to.
18+
*
19+
* @param interceptor The reactive interceptor to register
20+
* @return a [Registration], which may be used to unregister the interceptor
21+
*/
22+
fun registerDispatchInterceptor(interceptor: SuspendingMessageDispatchInterceptor<M>): Registration
23+
}
24+
25+
fun <M: Message<*>> SuspendingMessageDispatchInterceptorSupport<M>.registerDispatchInterceptor(
26+
interceptor: suspend (M) -> M
27+
) = registerDispatchInterceptor(
28+
object : SuspendingMessageDispatchInterceptor<M> {
29+
override suspend fun intercept(message: M) = interceptor(message)
30+
}
31+
)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.axonframework.extensions.kotlin.messaging
2+
3+
import kotlinx.coroutines.flow.Flow
4+
import org.axonframework.messaging.Message
5+
import org.axonframework.messaging.ResultMessage
6+
7+
/**
8+
* Interceptor that allows results to be intercepted and modified before they are handled. Implementations are required
9+
* to operate on a [Flow] of results or return a new [Flow] which will be passed down the interceptor chain.
10+
* Also, implementations may make decisions based on the message that was dispatched.
11+
*
12+
* @param <M> The type of the message for which the result is going to be intercepted
13+
* @param <R> The type of the result to be intercepted
14+
* @author Joel Feinstein
15+
* @since 0.2.0
16+
</R></M> */
17+
interface SuspendingResultHandlerInterceptor<M : Message<*>, R : ResultMessage<*>> {
18+
/**
19+
* Intercepts result messages.
20+
*
21+
* @param message a message that was dispatched (and caused these `results`)
22+
* @param results the outcome of the dispatched `message`
23+
* @return the intercepted `results`
24+
*/
25+
suspend fun intercept(message: M, result: R): R
26+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.axonframework.extensions.kotlin.messaging
2+
3+
import org.axonframework.common.Registration
4+
import org.axonframework.messaging.Message
5+
import org.axonframework.messaging.ResultMessage
6+
7+
/**
8+
* Interface marking components capable of registering a [SuspendingResultHandlerInterceptor].
9+
* Generally, these are messaging components injected into the receiving end of the communication.
10+
*
11+
* @param <M> The type of the message for which the result is going to be intercepted
12+
* @param <R> The type of the result to be intercepted
13+
* @author Joel Feinstein
14+
* @since 0.2.0
15+
*/
16+
fun interface SuspendiongResultHandlerInterceptorSupport<M : Message<*>, R : ResultMessage<*>> {
17+
/**
18+
* Register the given [SuspendingResultHandlerInterceptor]. After registration, the interceptor will be invoked
19+
* for each result message received on the messaging component that it was registered to.
20+
*
21+
* @param interceptor The reactive interceptor to register
22+
* @return a [Registration], which may be used to unregister the interceptor
23+
*/
24+
fun registerResultHandlerInterceptor(interceptor: SuspendingResultHandlerInterceptor<M, R>): Registration
25+
}
26+
27+
fun <M: Message<*>, R: ResultMessage<*>> SuspendiongResultHandlerInterceptorSupport<M, R>.registerResultHandlerInterceptor(
28+
interceptor: suspend (M, R) -> R
29+
) = registerResultHandlerInterceptor(
30+
object : SuspendingResultHandlerInterceptor<M, R> {
31+
override suspend fun intercept(message: M, result: R) = interceptor(message, result)
32+
}
33+
)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package org.axonframework.extensions.kotlin.commandhandling.gateway
2+
3+
import io.mockk.every
4+
import io.mockk.mockk
5+
import io.mockk.slot
6+
import io.mockk.verify
7+
import kotlinx.coroutines.runBlocking
8+
import org.axonframework.commandhandling.*
9+
import org.axonframework.extensions.kotlin.messaging.registerDispatchInterceptor
10+
import org.axonframework.extensions.kotlin.messaging.registerResultHandlerInterceptor
11+
import org.junit.jupiter.api.BeforeEach
12+
import org.junit.jupiter.api.Test
13+
import org.junit.jupiter.api.assertThrows
14+
import kotlin.test.assertEquals
15+
16+
class DefaultSuspendingCommandGatewayTests {
17+
private lateinit var commandBus: CommandBus
18+
private lateinit var gateway: SuspendingCommandGateway
19+
20+
@BeforeEach
21+
fun setUp() {
22+
commandBus = mockk {
23+
every { dispatch<Any?, Any?>(any(), any()) } answers {
24+
val commandMessage: CommandMessage<Any?> = firstArg()
25+
val commandCallback: CommandCallback<Any?, Any?> = lastArg()
26+
commandCallback.onResult(commandMessage, GenericCommandResultMessage(commandMessage.payload))
27+
}
28+
}
29+
30+
gateway = DefaultSuspendingCommandGateway {
31+
commandBus = this@DefaultSuspendingCommandGatewayTests.commandBus
32+
}
33+
}
34+
35+
@Test
36+
fun testInterceptorOrder(): Unit = runBlocking {
37+
val metadata1 = mapOf("k1" to "v1")
38+
gateway.registerDispatchInterceptor { it.andMetaData(metadata1) }
39+
gateway.registerResultHandlerInterceptor { command, _ -> GenericCommandResultMessage(command.metaData["k1"]) }
40+
41+
val metadata2 = mapOf("k1" to "v2")
42+
gateway.registerDispatchInterceptor { it.andMetaData(metadata2) }
43+
44+
gateway.send<String>("")
45+
46+
slot<CommandMessage<*>>().apply {
47+
verify { commandBus.dispatch(capture(this@apply), any<CommandCallback<Any?, Any?>>()) }
48+
assertEquals("v2", captured.metaData["k1"])
49+
}
50+
}
51+
52+
@Test
53+
fun testResultFiltering(): Unit = runBlocking {
54+
gateway.registerResultHandlerInterceptor { _, result -> GenericCommandResultMessage("K" in result.metaData) }
55+
assertEquals(false, gateway.send(""))
56+
}
57+
58+
@Test
59+
fun testCommandFiltering(): Unit = runBlocking {
60+
gateway.registerDispatchInterceptor { GenericCommandMessage("K" in it.metaData) }
61+
assertEquals(false, gateway.send(""))
62+
}
63+
64+
@Test
65+
fun testCommandDispatchAndResultHandlerInterceptor(): Unit = runBlocking {
66+
val principalMetadata = mapOf("username" to "admin")
67+
gateway.registerDispatchInterceptor { it.andMetaData(principalMetadata) }
68+
gateway.registerDispatchInterceptor { it.also { assertEquals("admin", it.metaData["username"]) } }
69+
gateway.registerResultHandlerInterceptor { _, result -> result.andMetaData(principalMetadata) }
70+
gateway.registerResultHandlerInterceptor { _, result -> result.also { assertEquals("admin", result.metaData["username"]) } }
71+
gateway.send<Unit>("")
72+
}
73+
74+
@Test
75+
fun testCommandResultHandlerChain(): Unit = runBlocking {
76+
val metadata1 = mapOf("k1" to "v1")
77+
gateway.registerResultHandlerInterceptor { _, result -> result.andMetaData(metadata1) }
78+
79+
val metadata2 = mapOf("k1" to "v2")
80+
gateway.registerResultHandlerInterceptor { _, result -> result.andMetaData(metadata2) }
81+
82+
val metadata3 = mapOf("k2" to "v3")
83+
gateway.registerResultHandlerInterceptor { _, result -> result.andMetaData(metadata3) }
84+
85+
gateway.registerResultHandlerInterceptor { _, result ->
86+
result.also {
87+
assertEquals("v2", result.metaData["k1"])
88+
assertEquals("v3", result.metaData["k2"])
89+
}
90+
}
91+
92+
gateway.send<Unit>("")
93+
}
94+
95+
@Test
96+
fun testResultErrorMapping(): Unit = runBlocking {
97+
every { commandBus.dispatch<Any?, Any?>(any(), any()) } answers {
98+
val commandMessage: CommandMessage<Any?> = firstArg()
99+
val commandCallback: CommandCallback<Any?, Any?> = lastArg()
100+
commandCallback.onResult(commandMessage, GenericCommandResultMessage(RuntimeException("oops")))
101+
}
102+
103+
gateway.registerResultHandlerInterceptor { _, result -> GenericCommandResultMessage(result.exceptionResult().message) }
104+
105+
assertEquals("oops", gateway.send(""))
106+
}
107+
108+
@Test
109+
fun testResultErrorThrowing(): Unit = runBlocking {
110+
every { commandBus.dispatch<Any?, Any?>(any(), any()) } answers {
111+
val commandMessage: CommandMessage<Any?> = firstArg()
112+
val commandCallback: CommandCallback<Any?, Any?> = lastArg()
113+
commandCallback.onResult(commandMessage, GenericCommandResultMessage(RuntimeException("oops")))
114+
}
115+
116+
assertThrows<RuntimeException> { gateway.send("") }
117+
}
118+
119+
@Test
120+
fun testCommandMessageAlteration(): Unit = runBlocking {
121+
gateway.registerResultHandlerInterceptor { command, result ->
122+
if ("kX" in command.metaData) {
123+
GenericCommandResultMessage("new Payload")
124+
} else {
125+
result
126+
}
127+
}
128+
129+
val commandMetadata = mapOf("kX" to "vX")
130+
assertEquals("", gateway.send(GenericCommandMessage("")))
131+
assertEquals("new Payload", gateway.send(GenericCommandMessage("").andMetaData(commandMetadata)))
132+
}
133+
134+
@Test
135+
fun testRegisterSuspendingInterceptors(): Unit = runBlocking {
136+
suspend fun <T> T.identity() = this
137+
gateway.registerDispatchInterceptor { it.identity() }
138+
gateway.registerResultHandlerInterceptor { _, result -> result.identity() }
139+
}
140+
}

0 commit comments

Comments
 (0)