@@ -19,13 +19,12 @@ import XCTest
19
19
final class KafkaBackPressurePollingSystemTests : XCTestCase {
20
20
typealias TestStateMachine = KafkaBackPressurePollingSystem . StateMachine
21
21
22
- func testBackPressure( ) {
22
+ func testBackPressure( ) async throws {
23
23
let pollInterval = Duration . milliseconds ( 100 )
24
24
25
25
var expectation : XCTestExpectation ?
26
26
let sut = KafkaBackPressurePollingSystem ( logger: . kafkaTest)
27
27
sut. pollClosure = {
28
- XCTAssertNotNil ( expectation, " Unexpected invocation of poll closure " )
29
28
expectation? . fulfill ( )
30
29
}
31
30
@@ -38,7 +37,6 @@ final class KafkaBackPressurePollingSystemTests: XCTestCase {
38
37
XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
39
38
XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
40
39
41
- expectation = nil
42
40
sut. stopProducing ( )
43
41
XCTAssertEqual ( TestStateMachine . PollLoopAction. suspendPollLoop, sut. nextPollLoopAction ( ) )
44
42
@@ -47,20 +45,49 @@ final class KafkaBackPressurePollingSystemTests: XCTestCase {
47
45
XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
48
46
XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
49
47
50
- expectation = nil
51
48
sut. shutDown ( )
52
49
XCTAssertEqual ( TestStateMachine . PollLoopAction. shutdownPollLoop, sut. nextPollLoopAction ( ) )
53
50
54
51
runTask. cancel ( )
55
52
}
56
53
54
+ func testNoPollsAfterPollLoopSuspension( ) async throws {
55
+ let pollInterval = Duration . milliseconds ( 100 )
56
+
57
+ var expectation : XCTestExpectation ?
58
+ let sut = KafkaBackPressurePollingSystem ( logger: . kafkaTest)
59
+ sut. pollClosure = {
60
+ expectation? . fulfill ( )
61
+ }
62
+
63
+ let runTask = Task {
64
+ await sut. run ( pollInterval: pollInterval)
65
+ }
66
+
67
+ expectation = XCTestExpectation ( description: " Poll closure invoked after initial produceMore() " )
68
+ sut. produceMore ( )
69
+ XCTAssertEqual ( XCTWaiter ( ) . wait ( for: [ expectation!] , timeout: 1 ) , . completed)
70
+ XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
71
+
72
+ // We're definitely running now. Now suspend the poll loop.
73
+ sut. stopProducing ( )
74
+ XCTAssertEqual ( TestStateMachine . PollLoopAction. suspendPollLoop, sut. nextPollLoopAction ( ) )
75
+ // We change the poll closure so that our test fails when the poll closure is invoked.
76
+ sut. pollClosure = {
77
+ XCTFail ( " Poll loop still running after stopProducing() has been invoked " )
78
+ }
79
+
80
+ try await Task . sleep ( for: . seconds( 5 ) )
81
+
82
+ runTask. cancel ( )
83
+ }
84
+
57
85
func testRunTaskCancellationShutsDownStateMachine( ) async throws {
58
86
let pollInterval = Duration . milliseconds ( 100 )
59
87
60
88
var expectation : XCTestExpectation ?
61
89
let sut = KafkaBackPressurePollingSystem ( logger: . kafkaTest)
62
90
sut. pollClosure = {
63
- XCTAssertNotNil ( expectation, " Unexpected invocation of poll closure " )
64
91
expectation? . fulfill ( )
65
92
}
66
93
@@ -74,7 +101,6 @@ final class KafkaBackPressurePollingSystemTests: XCTestCase {
74
101
XCTAssertEqual ( TestStateMachine . PollLoopAction. pollAndSleep, sut. nextPollLoopAction ( ) )
75
102
76
103
// We're definitely running now. Now suspend the poll loop.
77
- expectation = nil
78
104
sut. stopProducing ( )
79
105
XCTAssertEqual ( TestStateMachine . PollLoopAction. suspendPollLoop, sut. nextPollLoopAction ( ) )
80
106
0 commit comments