Skip to content

Commit 36d1947

Browse files
AdrienAdrien
authored andcommitted
Test: Add arity tests for Kotlin functions, consumers, and suppliers
This commit introduces a comprehensive suite of tests to cover all possible declarations of Kotlin functions, consumers, and suppliers within the `spring-cloud-function-kotlin` module. The primary goal of these tests is to explore the various declaration combinations and identify potential areas for improvement and enhanced support in the framework. Signed-off-by: Adrien <[email protected]>
1 parent 8e20b79 commit 36d1947

13 files changed

+2964
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.springframework.cloud.function.kotlin.arity
2+
3+
import org.springframework.boot.SpringApplication
4+
import org.springframework.boot.autoconfigure.SpringBootApplication
5+
import org.springframework.boot.runApplication
6+
7+
@SpringBootApplication
8+
open class KotlinArityApplication
9+
10+
fun main(args: Array<String>) {
11+
SpringApplication.run(KotlinArityApplication::class.java, *args)
12+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.function.kotlin.arity
18+
19+
import kotlinx.coroutines.delay
20+
import kotlinx.coroutines.flow.Flow
21+
import kotlinx.coroutines.flow.flow
22+
import org.springframework.messaging.Message
23+
import org.springframework.messaging.support.MessageBuilder
24+
import org.springframework.stereotype.Component
25+
import reactor.core.publisher.Flux
26+
import reactor.core.publisher.Mono
27+
import java.time.Duration
28+
import java.util.UUID
29+
30+
/**
31+
* Examples of implementing suppliers using Kotlin's function type.
32+
*
33+
* ## List of Combinations Implemented:
34+
* --- Coroutine ---
35+
* 1. () -> R -> supplierKotlinPlain
36+
* 2. () -> Flow<R> -> supplierKotlinFlow
37+
* 3. suspend () -> R -> supplierKotlinSuspendPlain
38+
* 4. suspend () -> Flow<R> -> supplierKotlinSuspendFlow
39+
* --- Reactor ---
40+
* 5. () -> Mono<R> -> supplierKotlinMono
41+
* 6. () -> Flux<R> -> supplierKotlinFlux
42+
* --- Message<T> ---
43+
* 7. () -> Message<R> -> supplierKotlinMessage
44+
* 8. () -> Mono<Message<R>> -> supplierKotlinMonoMessage
45+
* 9. suspend () -> Message<R> -> supplierKotlinSuspendMessage
46+
* 10. () -> Flux<Message<R>> -> supplierKotlinFluxMessage
47+
* 11. () -> Flow<Message<R>> -> supplierKotlinFlowMessage
48+
* 12. suspend () -> Flow<Message<R>> -> supplierKotlinSuspendFlowMessage
49+
*
50+
* @author Adrien Poupard
51+
*/
52+
class KotlinSupplierKotlinExamples
53+
54+
/** 1) () -> R */
55+
@Component
56+
class SupplierKotlinPlain : () -> Int {
57+
override fun invoke(): Int {
58+
return 42
59+
}
60+
}
61+
62+
/** 2) () -> Flow<R> */
63+
@Component
64+
class SupplierKotlinFlow : () -> Flow<String> {
65+
override fun invoke(): Flow<String> {
66+
return flow {
67+
emit("A")
68+
emit("B")
69+
emit("C")
70+
}
71+
}
72+
}
73+
74+
/** 3) suspend () -> R */
75+
@Component
76+
class SupplierKotlinSuspendPlain : suspend () -> String {
77+
override suspend fun invoke(): String {
78+
return "Hello from suspend"
79+
}
80+
}
81+
82+
/** 4) suspend () -> Flow<R> */
83+
@Component
84+
class SupplierKotlinSuspendFlow : suspend () -> Flow<String> {
85+
override suspend fun invoke(): Flow<String> {
86+
return flow {
87+
emit("x")
88+
emit("y")
89+
emit("z")
90+
}
91+
}
92+
}
93+
94+
/** 5) () -> Mono<R> */
95+
@Component
96+
class SupplierKotlinMono : () -> Mono<String> {
97+
override fun invoke(): Mono<String> {
98+
return Mono.just("Hello from Mono").delayElement(Duration.ofMillis(50))
99+
}
100+
}
101+
102+
/** 6) () -> Flux<R> */
103+
@Component
104+
class SupplierKotlinFlux : () -> Flux<String> {
105+
override fun invoke(): Flux<String> {
106+
return Flux.just("Alpha", "Beta", "Gamma").delayElements(Duration.ofMillis(20))
107+
}
108+
}
109+
110+
/** 7) () -> Message<R> */
111+
@Component
112+
class SupplierKotlinMessage : () -> Message<String> {
113+
override fun invoke(): Message<String> {
114+
return MessageBuilder.withPayload("Hello from Message")
115+
.setHeader("messageId", UUID.randomUUID().toString())
116+
.build()
117+
}
118+
}
119+
120+
/** 8) () -> Mono<Message<R>> */
121+
@Component
122+
class SupplierKotlinMonoMessage : () -> Mono<Message<String>> {
123+
override fun invoke(): Mono<Message<String>> {
124+
return Mono.just(
125+
MessageBuilder.withPayload("Hello from Mono Message")
126+
.setHeader("monoMessageId", UUID.randomUUID().toString())
127+
.setHeader("source", "mono")
128+
.build()
129+
).delayElement(Duration.ofMillis(40))
130+
}
131+
}
132+
133+
/** 9) suspend () -> Message<R> */
134+
@Component
135+
class SupplierKotlinSuspendMessage : suspend () -> Message<String> {
136+
override suspend fun invoke(): Message<String> {
137+
return MessageBuilder.withPayload("Hello from Suspend Message")
138+
.setHeader("suspendMessageId", UUID.randomUUID().toString())
139+
.setHeader("wasSuspended", true)
140+
.build()
141+
}
142+
}
143+
144+
/** 10) () -> Flux<Message<R>> */
145+
@Component
146+
class SupplierKotlinFluxMessage : () -> Flux<Message<String>> {
147+
override fun invoke(): Flux<Message<String>> {
148+
return Flux.just("Msg1", "Msg2")
149+
.delayElements(Duration.ofMillis(30))
150+
.map { payload ->
151+
MessageBuilder.withPayload(payload)
152+
.setHeader("fluxMessageId", UUID.randomUUID().toString())
153+
.build()
154+
}
155+
}
156+
}
157+
158+
/** 11) () -> Flow<Message<R>> */
159+
@Component
160+
class SupplierKotlinFlowMessage : () -> Flow<Message<String>> {
161+
override fun invoke(): Flow<Message<String>> {
162+
return flow {
163+
listOf("FlowMsg1", "FlowMsg2").forEach { payload ->
164+
emit(
165+
MessageBuilder.withPayload(payload)
166+
.setHeader("flowMessageId", UUID.randomUUID().toString())
167+
.build()
168+
)
169+
}
170+
}
171+
}
172+
}
173+
174+
/** 12) suspend () -> Flow<Message<R>> */
175+
@Component
176+
class SupplierKotlinSuspendFlowMessage : suspend () -> Flow<Message<String>> {
177+
override suspend fun invoke(): Flow<Message<String>> {
178+
return flow {
179+
listOf("SuspendFlowMsg1", "SuspendFlowMsg2").forEach { payload ->
180+
emit(
181+
MessageBuilder.withPayload(payload)
182+
.setHeader("suspendFlowMessageId", UUID.randomUUID().toString())
183+
.build()
184+
)
185+
}
186+
}
187+
}
188+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.function.kotlin.arity
18+
19+
import kotlinx.coroutines.delay
20+
import kotlinx.coroutines.flow.Flow
21+
import org.springframework.context.annotation.Bean
22+
import org.springframework.context.annotation.Configuration
23+
import org.springframework.messaging.Message
24+
import reactor.core.publisher.Flux
25+
import reactor.core.publisher.Mono
26+
27+
/**
28+
* ## List of Combinations Tested (in requested order):
29+
* --- Coroutine ---
30+
* 1. (T) -> Unit -> consumerPlain
31+
* 2. (Flow<T>) -> Unit -> consumerFlow
32+
* 3. suspend (T) -> Unit -> consumerSuspendPlain
33+
* 4. suspend (Flow<T>) -> Unit -> consumerSuspendFlow
34+
* --- Reactor ---
35+
* 5. (T) -> Mono<Void> -> consumerMonoInput
36+
* 6. (Mono<T>) -> Mono<Void> -> consumerMono
37+
* 7. (Flux<T>) -> Mono<Void> -> consumerFlux
38+
* --- Message<T> ---
39+
* 8. (Message<T>) -> Unit -> consumerMessage
40+
* 9. (Mono<Message<T>>) -> Mono<Void> -> consumerMonoMessage
41+
* 10. suspend (Message<T>) -> Unit -> consumerSuspendMessage
42+
* 11. (Flux<Message<T>>) -> Unit -> consumerFluxMessage
43+
* 12. (Flow<Message<T>>) -> Unit -> consumerFlowMessage
44+
* 13. suspend (Flow<Message<T>>) -> Unit -> consumerSuspendFlowMessage
45+
*
46+
* @author Adrien Poupard
47+
*/
48+
@Configuration
49+
open class KotlinConsumerArityBean {
50+
51+
/** 1) (T) -> Unit */
52+
@Bean
53+
open fun consumerPlain(): (String) -> Unit = { input ->
54+
println("Consumed: $input")
55+
}
56+
57+
/** 2) (Flow<T>) -> Unit */
58+
@Bean
59+
open fun consumerFlow(): (Flow<String>) -> Unit = { flowInput ->
60+
println("Received flow: $flowInput (would collect in coroutine)")
61+
}
62+
63+
/** 3) suspend (T) -> Unit */
64+
@Bean
65+
open fun consumerSuspendPlain(): suspend (String) -> Unit = { input ->
66+
println("Suspend consumed: $input")
67+
}
68+
69+
/** 4) suspend (Flow<T>) -> Unit */
70+
@Bean
71+
open fun consumerSuspendFlow(): suspend (Flow<String>) -> Unit = { flowInput ->
72+
flowInput.collect { item ->
73+
println("Flow item consumed: $item")
74+
}
75+
}
76+
77+
/** 5) (T) -> Mono<Void> */
78+
@Bean
79+
open fun consumerMonoInput(): (String) -> Mono<Void> = { input ->
80+
Mono.fromRunnable<Void> {
81+
println("[Reactor] Consumed T: $input")
82+
}
83+
}
84+
85+
/** 6) (Mono<T>) -> Mono<Void> */
86+
@Bean
87+
open fun consumerMono(): (Mono<String>) -> Mono<Void> = { monoInput ->
88+
monoInput.doOnNext { item ->
89+
println("[Reactor] Consumed Mono item: $item")
90+
}.then()
91+
}
92+
93+
/** 7) (Flux<T>) -> Mono<Void> */
94+
@Bean
95+
open fun consumerFlux(): (Flux<String>) -> Mono<Void> = { fluxInput ->
96+
fluxInput.doOnNext { item ->
97+
println("[Reactor] Consumed Flux item: $item")
98+
}.then()
99+
}
100+
101+
/** 8) (Message<T>) -> Unit */
102+
@Bean
103+
open fun consumerMessage(): (Message<String>) -> Unit = { message ->
104+
println("[Message] Consumed payload: ${message.payload}, Headers: ${message.headers}")
105+
}
106+
107+
/** 9) (Mono<Message<T>>) -> Mono<Void> */
108+
@Bean
109+
open fun consumerMonoMessage(): (Mono<Message<String>>) -> Mono<Void> = { monoMsgInput ->
110+
monoMsgInput
111+
.doOnNext { message ->
112+
println("[Message][Mono] Consumed payload: ${message.payload}, Header id: ${message.headers.id}")
113+
}
114+
.then()
115+
}
116+
117+
/** 10) suspend (Message<T>) -> Unit */
118+
@Bean
119+
open fun consumerSuspendMessage(): suspend (Message<String>) -> Unit = { message ->
120+
println("[Message][Suspend] Consumed payload: ${message.payload}, Header count: ${message.headers.size}")
121+
}
122+
123+
/** 11) (Flux<Message<T>>) -> Unit */
124+
@Bean
125+
open fun consumerFluxMessage(): (Flux<Message<String>>) -> Unit = { fluxMsgInput ->
126+
// Explicit subscription needed here because the lambda itself returns Unit
127+
fluxMsgInput.subscribe { message ->
128+
println("[Message] Consumed Flux payload: ${message.payload}, Headers: ${message.headers}")
129+
}
130+
}
131+
132+
/** 12) (Flow<Message<T>>) -> Unit */
133+
@Bean
134+
open fun consumerFlowMessage(): (Flow<Message<String>>) -> Unit = { flowMsgInput ->
135+
// Similar to Flux consumer returning Unit, explicit collection might be needed depending on context.
136+
println("[Message] Received Flow: $flowMsgInput (would need explicit collection if signature returns Unit)")
137+
// Example:
138+
// CoroutineScope(Dispatchers.IO).launch {
139+
// flowMsgInput.collect { message -> println(...) }
140+
// }
141+
}
142+
143+
/** 13) suspend (Flow<Message<T>>) -> Unit */
144+
@Bean
145+
open fun consumerSuspendFlowMessage(): suspend (Flow<Message<String>>) -> Unit = { flowMsgInput ->
146+
flowMsgInput.collect { message ->
147+
println("[Message] Consumed Suspend Flow payload: ${message.payload}, Headers: ${message.headers}")
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)