13
13
//===----------------------------------------------------------------------===//
14
14
15
15
import NIOCore
16
+ import NIOConcurrencyHelpers
16
17
@testable import SwiftKafka
17
18
import XCTest
18
19
19
- /// `NIOAsyncSequenceProducerBackPressureStrategy` that always returns true.
20
- struct NoBackPressure : NIOAsyncSequenceProducerBackPressureStrategy {
21
- func didYield( bufferDepth: Int ) -> Bool { true }
22
- func didConsume( bufferDepth: Int ) -> Bool { true }
20
+ // MARK: - Helper Classes
21
+
22
+ /// A class that wraps a closure with a reference to that closure, allowing to change the underlying functionality
23
+ /// of `funcTofunc` after it has been passed.
24
+ class ClosureWrapper {
25
+ /// The wrapped closure.
26
+ var wrappedClosure : ( ( ) -> Void ) ?
27
+
28
+ /// Function that should be passed on.
29
+ /// By changing the `wrappedClosure`, the behaviour of `funcTofunc` can be changed.
30
+ func funcTofunc( ) {
31
+ self . wrappedClosure ? ( )
32
+ }
23
33
}
24
34
35
+ // MARK: - Tests
36
+
25
37
final class KafkaPollingSystemTests : XCTestCase {
26
38
typealias Message = String // Could be any type, this is just for testing
27
- typealias TestStateMachine = KafkaPollingSystem < Message , NoBackPressure > . StateMachine
39
+ typealias TestStateMachine = KafkaPollingSystem < Message > . StateMachine
28
40
29
41
func testBackPressure( ) async throws {
30
42
let pollInterval = Duration . milliseconds ( 100 )
31
43
32
- var expectation : XCTestExpectation ?
33
- let sut = KafkaPollingSystem < Message , NoBackPressure > ( )
34
- sut. pollClosure = {
35
- expectation? . fulfill ( )
36
- }
37
-
44
+ let sut = KafkaPollingSystem < Message > ( )
45
+ let closureWrapper = ClosureWrapper ( )
38
46
let runTask = Task {
39
- await sut. run ( pollInterval: pollInterval)
47
+ await sut. run (
48
+ pollInterval: pollInterval,
49
+ pollClosure: { closureWrapper. funcTofunc ( ) } ,
50
+ source: nil
51
+ )
40
52
}
41
53
42
- expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
54
+ let expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
55
+ closureWrapper. wrappedClosure = { expectation. fulfill ( ) }
43
56
sut. produceMore ( )
44
- XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
45
- XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
57
+ let result = await XCTWaiter . fulfillment ( of: [ expectation] , timeout: 1 )
58
+ XCTAssertEqual ( result, . completed)
59
+ if case . pollAndSleep = sut. nextPollLoopAction ( ) {
60
+ // Test passed
61
+ } else {
62
+ XCTFail ( )
63
+ }
46
64
47
65
sut. stopProducing ( )
48
- XCTAssertEqual ( TestStateMachine . PollLoopAction. suspendPollLoop, sut. nextPollLoopAction ( ) )
66
+ if case . suspendPollLoop = sut. nextPollLoopAction ( ) {
67
+ // Test passed
68
+ } else {
69
+ XCTFail ( )
70
+ }
49
71
50
- expectation = XCTestExpectation ( description: " Poll closure invoked after second produceMore() " )
72
+ let secondExpectation = XCTestExpectation ( description: " Poll closure invoked after second produceMore() " )
73
+ closureWrapper. wrappedClosure = {
74
+ secondExpectation. fulfill ( )
75
+ }
51
76
sut. produceMore ( )
52
- XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
53
- XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
77
+ let secondResult = await XCTWaiter . fulfillment ( of: [ secondExpectation] , timeout: 1 )
78
+ XCTAssertEqual ( secondResult, . completed)
79
+ if case . pollAndSleep = sut. nextPollLoopAction ( ) {
80
+ // Test passed
81
+ } else {
82
+ XCTFail ( )
83
+ }
54
84
55
85
sut. didTerminate ( )
56
- XCTAssertEqual ( TestStateMachine . PollLoopAction. shutdownPollLoop, sut. nextPollLoopAction ( ) )
86
+ if case . shutdownPollLoop = sut. nextPollLoopAction ( ) {
87
+ // Test passed
88
+ } else {
89
+ XCTFail ( )
90
+ }
57
91
58
92
runTask. cancel ( )
59
93
}
60
94
61
95
func testNoPollsAfterPollLoopSuspension( ) async throws {
62
96
let pollInterval = Duration . milliseconds ( 100 )
63
97
64
- var expectation : XCTestExpectation ?
65
- let sut = KafkaPollingSystem < Message , NoBackPressure > ( )
66
- sut. pollClosure = {
67
- expectation? . fulfill ( )
68
- }
69
-
98
+ let sut = KafkaPollingSystem < Message > ( )
99
+ let closureWrapper = ClosureWrapper ( )
70
100
let runTask = Task {
71
- await sut. run ( pollInterval: pollInterval)
101
+ await sut. run (
102
+ pollInterval: pollInterval,
103
+ pollClosure: { closureWrapper. funcTofunc ( ) } ,
104
+ source: nil
105
+ )
72
106
}
73
107
74
- expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
108
+ let expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
109
+ closureWrapper. wrappedClosure = { expectation. fulfill ( ) }
75
110
sut. produceMore ( )
76
- XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
77
- XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
111
+ let result = await XCTWaiter . fulfillment ( of: [ expectation] , timeout: 1 )
112
+ XCTAssertEqual ( result, . completed)
113
+ if case . pollAndSleep = sut. nextPollLoopAction ( ) {
114
+ // Test passed
115
+ } else {
116
+ XCTFail ( )
117
+ }
78
118
79
119
// We're definitely running now. Now suspend the poll loop.
80
120
sut. stopProducing ( )
81
- XCTAssertEqual ( TestStateMachine . PollLoopAction. suspendPollLoop, sut. nextPollLoopAction ( ) )
121
+ if case . suspendPollLoop = sut. nextPollLoopAction ( ) {
122
+ // Test passed
123
+ } else {
124
+ XCTFail ( )
125
+ }
126
+
82
127
// We change the poll closure so that our test fails when the poll closure is invoked.
83
- sut . pollClosure = {
128
+ closureWrapper . wrappedClosure = {
84
129
XCTFail ( " Poll loop still running after stopProducing() has been invoked " )
85
130
}
86
131
@@ -92,31 +137,45 @@ final class KafkaPollingSystemTests: XCTestCase {
92
137
func testRunTaskCancellationShutsDownStateMachine( ) async throws {
93
138
let pollInterval = Duration . milliseconds ( 100 )
94
139
95
- var expectation : XCTestExpectation ?
96
- let sut = KafkaPollingSystem < Message , NoBackPressure > ( )
97
- sut. pollClosure = {
98
- expectation? . fulfill ( )
99
- }
100
-
140
+ let sut = KafkaPollingSystem < Message > ( )
141
+ let closureWrapper = ClosureWrapper ( )
101
142
let runTask = Task {
102
- await sut. run ( pollInterval: pollInterval)
143
+ await sut. run (
144
+ pollInterval: pollInterval,
145
+ pollClosure: { closureWrapper. funcTofunc ( ) } ,
146
+ source: nil
147
+ )
103
148
}
104
149
105
- expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
150
+ let expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
151
+ closureWrapper. wrappedClosure = { expectation. fulfill ( ) }
106
152
sut. produceMore ( )
107
- XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
108
- XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
153
+ let result = await XCTWaiter . fulfillment ( of: [ expectation] , timeout: 1 )
154
+ XCTAssertEqual ( result, . completed)
155
+ if case . pollAndSleep = sut. nextPollLoopAction ( ) {
156
+ // Test passed
157
+ } else {
158
+ XCTFail ( )
159
+ }
109
160
110
161
// We're definitely running now. Now suspend the poll loop.
111
162
sut. stopProducing ( )
112
- XCTAssertEqual ( TestStateMachine . PollLoopAction. suspendPollLoop, sut. nextPollLoopAction ( ) )
163
+ if case . suspendPollLoop = sut. nextPollLoopAction ( ) {
164
+ // Test passed
165
+ } else {
166
+ XCTFail ( )
167
+ }
113
168
114
169
// Cancel the Task that runs the poll loop.
115
170
// This should result in the state machine shutting down.
116
171
runTask. cancel ( )
117
172
// Sleep for a second to make sure the poll loop's canncellationHandler gets invoked.
118
173
try await Task . sleep ( for: . seconds( 1 ) )
119
- XCTAssertEqual ( TestStateMachine . PollLoopAction. shutdownPollLoop, sut. nextPollLoopAction ( ) )
174
+ if case . shutdownPollLoop = sut. nextPollLoopAction ( ) {
175
+ // Test passed
176
+ } else {
177
+ XCTFail ( )
178
+ }
120
179
}
121
180
}
122
181
0 commit comments