1+ package org.axonframework.extensions.kotlin.queryhandling.gateway
2+
3+ import kotlinx.coroutines.flow.Flow
4+ import kotlinx.coroutines.flow.map
5+ import kotlinx.coroutines.future.await
6+ import kotlinx.coroutines.reactive.asFlow
7+ import kotlinx.coroutines.reactive.awaitSingleOrNull
8+ import kotlinx.coroutines.stream.consumeAsFlow
9+ import org.axonframework.common.BuilderUtils
10+ import org.axonframework.common.Registration
11+ import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptor
12+ import org.axonframework.extensions.kotlin.messaging.SuspendingResultHandlerInterceptor
13+ import org.axonframework.extensions.kotlin.messaging.payloadOrThrowException
14+ import org.axonframework.messaging.GenericMessage
15+ import org.axonframework.messaging.ResultMessage
16+ import org.axonframework.messaging.responsetypes.ResponseType
17+ import org.axonframework.queryhandling.*
18+ import java.time.Duration
19+ import java.util.*
20+ import java.util.concurrent.CopyOnWriteArrayList
21+ import java.util.concurrent.TimeUnit
22+
23+ /* *
24+ * Implementation of the [SuspendingQueryGateway].
25+ *
26+ * @author Joel Feinstein
27+ * @since 0.2.0
28+ */
29+ class DefaultSuspendingQueryGateway (config : Builder .() -> Unit ) : SuspendingQueryGateway {
30+ private val queryBus: QueryBus
31+ private val dispatchInterceptors: MutableList <SuspendingMessageDispatchInterceptor <QueryMessage <* , * >>>
32+ private val resultInterceptors: MutableList <SuspendingResultHandlerInterceptor <QueryMessage <* , * >, ResultMessage <* >>>
33+
34+ init {
35+ Builder ().apply (config).validate().let {
36+ queryBus = it.queryBus
37+ dispatchInterceptors = CopyOnWriteArrayList (it.dispatchInterceptors)
38+ resultInterceptors = CopyOnWriteArrayList (it.resultInterceptors)
39+ }
40+ }
41+
42+ override fun registerDispatchInterceptor (interceptor : SuspendingMessageDispatchInterceptor <QueryMessage <* , * >>): Registration {
43+ dispatchInterceptors + = interceptor
44+ return Registration { dispatchInterceptors.remove(interceptor) }
45+ }
46+
47+ override fun registerResultHandlerInterceptor (interceptor : SuspendingResultHandlerInterceptor <QueryMessage <* , * >, ResultMessage <* >>): Registration {
48+ resultInterceptors + = interceptor
49+ return Registration { resultInterceptors.remove(interceptor) }
50+ }
51+
52+ @Suppress(" UNCHECKED_CAST" )
53+ override suspend fun <R , Q > query (queryName : String , query : Q , responseType : ResponseType <R >): R {
54+ val queryMessage = GenericQueryMessage (GenericMessage .asMessage(query), queryName, responseType)
55+ val processedQueryMessage = processDispatchInterceptors(queryMessage)
56+ val queryResultMessage = dispatchQuery(processedQueryMessage)
57+ val processedQueryResultMessage = processResultsInterceptors(processedQueryMessage, queryResultMessage)
58+ return processedQueryResultMessage.payloadOrThrowException()
59+ }
60+
61+ @Suppress(" UNCHECKED_CAST" )
62+ override suspend fun <R , Q > scatterGather (queryName : String , query : Q , responseType : ResponseType <R >, timeout : Duration ): Flow <R > {
63+ val queryMessage = GenericQueryMessage (GenericMessage .asMessage(query), queryName, responseType)
64+ val processedQueryMessage = processDispatchInterceptors(queryMessage)
65+ return dispatchScatterGatherQuery(processedQueryMessage, timeout.toMillis(), TimeUnit .MILLISECONDS )
66+ .map { processResultsInterceptors(processedQueryMessage, it).payloadOrThrowException() }
67+ }
68+
69+ @Suppress(" UNCHECKED_CAST" )
70+ override suspend fun <Q , I , U > subscriptionQuery (
71+ queryName : String ,
72+ query : Q ,
73+ initialResponseType : ResponseType <I >,
74+ updateResponseType : ResponseType <U >,
75+ backpressure : SubscriptionQueryBackpressure ,
76+ updateBufferSize : Int
77+ ): SuspendingSubscriptionQueryResult <I , U > {
78+ val subscriptionQueryMessage = GenericSubscriptionQueryMessage (query, initialResponseType, updateResponseType)
79+ val processedSubscriptionQueryMessage = processDispatchInterceptors(subscriptionQueryMessage)
80+ val subscriptionDelegate = dispatchSubscriptionQuery(processedSubscriptionQueryMessage as SubscriptionQueryMessage <Q , I , U >, backpressure, updateBufferSize)
81+
82+ return object : SuspendingSubscriptionQueryResult <I , U > {
83+ override suspend fun initialResult (): I {
84+ return processResultsInterceptors(processedSubscriptionQueryMessage, subscriptionDelegate.initialResult().awaitSingleOrNull()).payloadOrThrowException()
85+ }
86+
87+ override fun updates (): Flow <U > {
88+ return subscriptionDelegate.updates().asFlow().map { it.payloadOrThrowException() }
89+ }
90+
91+ override fun cancel (): Boolean {
92+ return subscriptionDelegate.cancel()
93+ }
94+ }
95+ }
96+
97+ private suspend fun dispatchQuery (queryMessage : QueryMessage <* , * >): ResultMessage <* > {
98+ return queryBus.query(queryMessage).await()
99+ }
100+
101+ private fun dispatchScatterGatherQuery (queryMessage : QueryMessage <* , * >, timeout : Long , timeUnit : TimeUnit ): Flow <ResultMessage <* >> {
102+ return queryBus.scatterGather(queryMessage, timeout, timeUnit).consumeAsFlow()
103+ }
104+
105+ private fun <Q , I , U > dispatchSubscriptionQuery (
106+ queryMessage : SubscriptionQueryMessage <Q , I , U >,
107+ backpressure : SubscriptionQueryBackpressure ,
108+ updateBufferSize : Int
109+ ): SubscriptionQueryResult <QueryResponseMessage <I >, SubscriptionQueryUpdateMessage<U>> {
110+ return queryBus.subscriptionQuery(queryMessage, backpressure, updateBufferSize)
111+ }
112+
113+ private suspend fun processDispatchInterceptors (queryMessage : QueryMessage <* , * >): QueryMessage <* , * > {
114+ return dispatchInterceptors.fold(queryMessage) { acc, interceptor -> interceptor.intercept(acc) }
115+ }
116+
117+ private suspend fun processResultsInterceptors (queryMessage : QueryMessage <* , * >, queryResultMessage : ResultMessage <* >): ResultMessage <* > {
118+ return resultInterceptors.fold(queryResultMessage) { acc, interceptor -> interceptor.intercept(queryMessage, acc) }
119+ }
120+
121+ /* *
122+ * Builder class to instantiate [DefaultReactorQueryGateway].
123+ *
124+ *
125+ * The `dispatchInterceptors` are defaulted to an empty list.
126+ * The [QueryBus] is a **hard requirement** and as such should be provided.
127+ *
128+ */
129+ class Builder internal constructor() {
130+ lateinit var queryBus: QueryBus
131+ val dispatchInterceptors: List <SuspendingMessageDispatchInterceptor <QueryMessage <* , * >>> = CopyOnWriteArrayList ()
132+ val resultInterceptors: List <SuspendingResultHandlerInterceptor <QueryMessage <* , * >, ResultMessage <* >>> = CopyOnWriteArrayList ()
133+
134+ /* *
135+ * Validate whether the fields contained in this Builder as set accordingly.
136+ *
137+ * @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
138+ * specifications
139+ */
140+ fun validate () = apply {
141+ BuilderUtils .assertThat(this ::queryBus.isInitialized, { it == true }, " The QueryBus is a hard requirement and should be provided" )
142+ }
143+ }
144+ }
0 commit comments