1+ package org.axonframework.extensions.kotlin.queryhandling.gateway
2+
3+ import kotlinx.coroutines.flow.Flow
4+ import kotlinx.coroutines.flow.map
5+ import kotlinx.coroutines.future.await
6+ import kotlinx.coroutines.reactive.asFlow
7+ import kotlinx.coroutines.reactive.awaitSingleOrNull
8+ import kotlinx.coroutines.stream.consumeAsFlow
9+ import org.axonframework.common.BuilderUtils
10+ import org.axonframework.common.Registration
11+ import org.axonframework.extensions.kotlin.messaging.SuspendingMessageDispatchInterceptor
12+ import org.axonframework.extensions.kotlin.messaging.SuspendingResultHandlerInterceptor
13+ import org.axonframework.extensions.kotlin.messaging.payloadOrThrowException
14+ import org.axonframework.messaging.GenericMessage
15+ import org.axonframework.messaging.ResultMessage
16+ import org.axonframework.messaging.responsetypes.ResponseType
17+ import org.axonframework.queryhandling.*
18+ import java.time.Duration
19+ import java.util.*
20+ import java.util.concurrent.CopyOnWriteArrayList
21+ import java.util.concurrent.TimeUnit
22+
23+ /* *
24+ * Implementation of the [SuspendingQueryGateway].
25+ *
26+ * @author Joel Feinstein
27+ * @since 0.2.0
28+ */
29+ class DefaultSuspendingQueryGateway (config : Builder .() -> Unit ) : SuspendingQueryGateway {
30+ private val queryBus: QueryBus
31+ private val dispatchInterceptors: MutableList <SuspendingMessageDispatchInterceptor <QueryMessage <* , * >>>
32+ private val resultInterceptors: MutableList <SuspendingResultHandlerInterceptor <QueryMessage <* , * >, ResultMessage <* >>>
33+
34+ init {
35+ Builder ().apply (config).validate().let {
36+ queryBus = it.queryBus
37+ dispatchInterceptors = CopyOnWriteArrayList (it.dispatchInterceptors)
38+ resultInterceptors = CopyOnWriteArrayList (it.resultInterceptors)
39+ }
40+ }
41+
42+ override fun registerDispatchInterceptor (interceptor : SuspendingMessageDispatchInterceptor <QueryMessage <* , * >>): Registration {
43+ dispatchInterceptors + = interceptor
44+ return Registration { dispatchInterceptors.remove(interceptor) }
45+ }
46+
47+ override fun registerResultHandlerInterceptor (interceptor : SuspendingResultHandlerInterceptor <QueryMessage <* , * >, ResultMessage <* >>): Registration {
48+ resultInterceptors + = interceptor
49+ return Registration { resultInterceptors.remove(interceptor) }
50+ }
51+
52+ override suspend fun <R , Q > query (queryName : String , query : Q , responseType : ResponseType <R >): R {
53+ val queryMessage = GenericQueryMessage (GenericMessage .asMessage(query), queryName, responseType).processDispatchInterceptors()
54+ return processResultsInterceptors(queryMessage, queryMessage.dispatch()).payloadOrThrowException()
55+ }
56+
57+ override suspend fun <R , Q > scatterGather (queryName : String , query : Q , responseType : ResponseType <R >, timeout : Duration ): Flow <R > {
58+ val queryMessage = GenericQueryMessage (GenericMessage .asMessage(query), queryName, responseType).processDispatchInterceptors()
59+ return queryMessage.dispatchScatterGather(timeout.toMillis(), TimeUnit .MILLISECONDS ).map {
60+ processResultsInterceptors(queryMessage, it).payloadOrThrowException()
61+ }
62+ }
63+
64+ @Suppress(" UNCHECKED_CAST" )
65+ override suspend fun <Q , I , U > subscriptionQuery (
66+ queryName : String ,
67+ query : Q ,
68+ initialResponseType : ResponseType <I >,
69+ updateResponseType : ResponseType <U >,
70+ backpressure : SubscriptionQueryBackpressure ,
71+ updateBufferSize : Int
72+ ): SuspendingSubscriptionQueryResult <I , U > {
73+ val subscriptionQueryMessage = GenericSubscriptionQueryMessage (query, initialResponseType, updateResponseType).processDispatchInterceptors() as SubscriptionQueryMessage <Q , I , U >
74+ val subscriptionDelegate = subscriptionQueryMessage.dispatch(backpressure, updateBufferSize)
75+
76+ return object : SuspendingSubscriptionQueryResult <I , U > {
77+ override suspend fun initialResult (): I {
78+ return processResultsInterceptors(subscriptionQueryMessage, subscriptionDelegate.initialResult().awaitSingleOrNull()).payloadOrThrowException()
79+ }
80+
81+ override fun updates (): Flow <U > {
82+ return subscriptionDelegate.updates().asFlow().map { processResultsInterceptors(subscriptionQueryMessage, it).payloadOrThrowException() }
83+ }
84+
85+ override fun cancel (): Boolean {
86+ return subscriptionDelegate.cancel()
87+ }
88+ }
89+ }
90+
91+ private suspend fun QueryMessage <* , * >.dispatch (): ResultMessage <* > {
92+ return queryBus.query(this ).await()
93+ }
94+
95+ private fun QueryMessage <* , * >.dispatchScatterGather (timeout : Long , timeUnit : TimeUnit ): Flow <ResultMessage <* >> {
96+ return queryBus.scatterGather(this , timeout, timeUnit).consumeAsFlow()
97+ }
98+
99+ private fun <Q , I , U > SubscriptionQueryMessage <Q , I , U >.dispatch (
100+ backpressure : SubscriptionQueryBackpressure ,
101+ updateBufferSize : Int
102+ ): SubscriptionQueryResult <QueryResponseMessage <I >, SubscriptionQueryUpdateMessage<U>> {
103+ return queryBus.subscriptionQuery(this , backpressure, updateBufferSize)
104+ }
105+
106+ private suspend fun QueryMessage <* , * >.processDispatchInterceptors (): QueryMessage <* , * > {
107+ return dispatchInterceptors.fold(this ) { acc, interceptor -> interceptor.intercept(acc) }
108+ }
109+
110+ private suspend fun processResultsInterceptors (queryMessage : QueryMessage <* , * >, queryResultMessage : ResultMessage <* >): ResultMessage <* > {
111+ return resultInterceptors.fold(queryResultMessage) { acc, interceptor -> interceptor.intercept(queryMessage, acc) }
112+ }
113+
114+ /* *
115+ * Builder class to instantiate [DefaultReactorQueryGateway].
116+ *
117+ *
118+ * The `dispatchInterceptors` are defaulted to an empty list.
119+ * The [QueryBus] is a **hard requirement** and as such should be provided.
120+ *
121+ */
122+ class Builder internal constructor() {
123+ lateinit var queryBus: QueryBus
124+ val dispatchInterceptors: List <SuspendingMessageDispatchInterceptor <QueryMessage <* , * >>> = CopyOnWriteArrayList ()
125+ val resultInterceptors: List <SuspendingResultHandlerInterceptor <QueryMessage <* , * >, ResultMessage <* >>> = CopyOnWriteArrayList ()
126+
127+ /* *
128+ * Validate whether the fields contained in this Builder as set accordingly.
129+ *
130+ * @throws AxonConfigurationException if one field is asserted to be incorrect according to the Builder's
131+ * specifications
132+ */
133+ fun validate () = apply {
134+ BuilderUtils .assertThat(this ::queryBus.isInitialized, { it == true }, " The QueryBus is a hard requirement and should be provided" )
135+ }
136+ }
137+ }
0 commit comments