Skip to content

Commit 797e35a

Browse files
committed
Refactor KafkaPollingSystemTests
Modifications: * move common logic into `setUp()` method
1 parent 80776fd commit 797e35a

File tree

2 files changed

+50
-63
lines changed

2 files changed

+50
-63
lines changed

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ public actor KafkaProducer {
114114
logger: self.logger
115115
)
116116

117-
118117
// (NIOAsyncSequenceProducer.makeSequence Documentation Excerpt)
119118
// This method returns a struct containing a NIOAsyncSequenceProducer.Source and a NIOAsyncSequenceProducer.
120119
// The source MUST be held by the caller and used to signal new elements or finish.

Tests/SwiftKafkaTests/KafkaPollingSystemTests.swift

Lines changed: 50 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,135 +17,123 @@ import NIOCore
1717
@testable import SwiftKafka
1818
import XCTest
1919

20-
// TODO: stream logic to setup?
2120
final class KafkaPollingSystemTests: XCTestCase {
2221
typealias Message = String // Could be any type, this is just for testing
2322
typealias TestStateMachine = KafkaPollingSystem<Message>.StateMachine
2423

25-
func testBackPressure() async throws {
26-
let pollInterval = Duration.milliseconds(100)
24+
let pollInterval = Duration.milliseconds(100)
25+
var sut: KafkaPollingSystem<Message>!
26+
var expectationStream: AsyncStream<Void>!
27+
var pollIterator: AsyncStream<Void>.Iterator!
28+
var runTask: Task<Void, Error>!
29+
30+
override func setUp() {
31+
self.sut = KafkaPollingSystem<Message>()
2732

28-
let sut = KafkaPollingSystem<Message>()
29-
let expectationStream = AsyncStream { continuation in
30-
sut.pollClosure = {
33+
// Enables us to await the next call to pollClosure
34+
self.expectationStream = AsyncStream { continuation in
35+
self.sut.pollClosure = {
3136
continuation.yield()
3237
}
3338
}
34-
var pollIterator = expectationStream.makeAsyncIterator()
39+
self.pollIterator = self.expectationStream.makeAsyncIterator()
3540

36-
let _ = Task {
37-
try await sut.run(pollInterval: pollInterval)
41+
self.runTask = Task {
42+
try await self.sut.run(pollInterval: self.pollInterval)
3843
}
3944

40-
sut.produceMore()
41-
await pollIterator.next()
42-
if case .pollAndSleep = sut.nextPollLoopAction() {
45+
super.setUp()
46+
}
47+
48+
override func tearDown() {
49+
self.sut = nil
50+
self.expectationStream = nil
51+
self.pollIterator = nil
52+
self.runTask = nil
53+
54+
super.tearDown()
55+
}
56+
57+
func testBackPressure() async throws {
58+
self.sut.produceMore()
59+
await self.pollIterator.next()
60+
if case .pollAndSleep = self.sut.nextPollLoopAction() {
4361
// Test passed
4462
} else {
4563
XCTFail()
4664
}
4765

48-
sut.stopProducing()
49-
if case .suspendPollLoop = sut.nextPollLoopAction() {
66+
self.sut.stopProducing()
67+
if case .suspendPollLoop = self.sut.nextPollLoopAction() {
5068
// Test passed
5169
} else {
5270
XCTFail()
5371
}
5472

55-
sut.produceMore()
56-
await pollIterator.next()
57-
if case .pollAndSleep = sut.nextPollLoopAction() {
73+
self.sut.produceMore()
74+
await self.pollIterator.next()
75+
if case .pollAndSleep = self.sut.nextPollLoopAction() {
5876
// Test passed
5977
} else {
6078
XCTFail()
6179
}
6280

63-
sut.didTerminate()
64-
if case .shutdownPollLoop = sut.nextPollLoopAction() {
81+
self.sut.didTerminate()
82+
if case .shutdownPollLoop = self.sut.nextPollLoopAction() {
6583
// Test passed
6684
} else {
6785
XCTFail()
6886
}
6987
}
7088

7189
func testNoPollsAfterPollLoopSuspension() async throws {
72-
let pollInterval = Duration.milliseconds(100)
73-
74-
let sut = KafkaPollingSystem<Message>()
75-
76-
let expectationStream = AsyncStream { continuation in
77-
sut.pollClosure = {
78-
continuation.yield()
79-
}
80-
}
81-
var pollIterator = expectationStream.makeAsyncIterator()
82-
83-
let _ = Task {
84-
try await sut.run(pollInterval: pollInterval)
85-
}
86-
87-
sut.produceMore()
88-
await pollIterator.next()
89-
if case .pollAndSleep = sut.nextPollLoopAction() {
90+
self.sut.produceMore()
91+
await self.pollIterator.next()
92+
if case .pollAndSleep = self.sut.nextPollLoopAction() {
9093
// Test passed
9194
} else {
9295
XCTFail()
9396
}
9497

9598
// We're definitely running now. Now suspend the poll loop.
96-
sut.stopProducing()
97-
if case .suspendPollLoop = sut.nextPollLoopAction() {
99+
self.sut.stopProducing()
100+
if case .suspendPollLoop = self.sut.nextPollLoopAction() {
98101
// Test passed
99102
} else {
100103
XCTFail()
101104
}
102105

103106
// We change the poll closure so that our test fails when the poll closure is invoked.
104-
sut.pollClosure = {
107+
self.sut.pollClosure = {
105108
XCTFail("Poll loop still running after stopProducing() has been invoked")
106109
}
107110

108111
try await Task.sleep(for: .seconds(5))
109112
}
110113

111114
func testRunTaskCancellationShutsDownStateMachine() async throws {
112-
let pollInterval = Duration.milliseconds(100)
113-
114-
let sut = KafkaPollingSystem<Message>()
115-
116-
let expectationStream = AsyncStream { continuation in
117-
sut.pollClosure = {
118-
continuation.yield()
119-
}
120-
}
121-
var pollIterator = expectationStream.makeAsyncIterator()
122-
123-
let runTask = Task {
124-
try await sut.run(pollInterval: pollInterval)
125-
}
126-
127-
sut.produceMore()
128-
await pollIterator.next()
129-
if case .pollAndSleep = sut.nextPollLoopAction() {
115+
self.sut.produceMore()
116+
await self.pollIterator.next()
117+
if case .pollAndSleep = self.sut.nextPollLoopAction() {
130118
// Test passed
131119
} else {
132120
XCTFail()
133121
}
134122

135123
// We're definitely running now. Now suspend the poll loop.
136-
sut.stopProducing()
137-
if case .suspendPollLoop = sut.nextPollLoopAction() {
124+
self.sut.stopProducing()
125+
if case .suspendPollLoop = self.sut.nextPollLoopAction() {
138126
// Test passed
139127
} else {
140128
XCTFail()
141129
}
142130

143131
// Cancel the Task that runs the poll loop.
144132
// This should result in the state machine shutting down.
145-
runTask.cancel()
133+
self.runTask.cancel()
146134
// Sleep for a second to make sure the poll loop's canncellationHandler gets invoked.
147135
try await Task.sleep(for: .seconds(1))
148-
if case .shutdownPollLoop = sut.nextPollLoopAction() {
136+
if case .shutdownPollLoop = self.sut.nextPollLoopAction() {
149137
// Test passed
150138
} else {
151139
XCTFail()

0 commit comments

Comments
 (0)