Skip to content

Commit af3da3f

Browse files
authored
Have fakeConsumer.close take the lock before closing (#268)
This sets the FakeKafkaConsumer#Close to grab the msg mutex lock and lock it before closing the channel. This prevents a race between a user closing the consumer and committing
1 parent 84e50ab commit af3da3f

File tree

1 file changed

+2
-0
lines changed

1 file changed

+2
-0
lines changed

kafka/kafkatest/utils.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ func (f *FakeKafkaConsumer) Resume(p []kafkalib.TopicPartition) error {
169169
}
170170

171171
func (f *FakeKafkaConsumer) Close() error {
172+
f.msgMu.Lock()
173+
defer f.msgMu.Unlock()
172174
close(f.commits)
173175
return nil
174176
}

0 commit comments

Comments
 (0)