Skip to content

Commit 2654c68

Browse files
author
Kelsey
authored
Fix FakeKafkaConsumer to advance through messages (#221)
1 parent a8a24e7 commit 2654c68

File tree

2 files changed

+41
-24
lines changed

2 files changed

+41
-24
lines changed

kafka/kafkatest/utils.go

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ func KafkaPipe(log logrus.FieldLogger) (*FakeKafkaConsumer, *FakeKafkaProducer)
3333
}
3434

3535
type FakeKafkaConsumer struct {
36-
messages []*kafkalib.Message
37-
msgMu sync.Mutex
38-
offset int64
39-
notify chan struct{}
40-
commits chan *kafkalib.Message
41-
log logrus.FieldLogger
36+
messages []*kafkalib.Message
37+
msgMu sync.Mutex
38+
offset int64
39+
readOffset int64
40+
notify chan struct{}
41+
commits chan *kafkalib.Message
42+
log logrus.FieldLogger
4243
}
4344

4445
type FakeKafkaProducer struct {
@@ -63,11 +64,12 @@ func (f *FakeKafkaProducer) Close() error {
6364

6465
func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Message) *FakeKafkaConsumer {
6566
r := &FakeKafkaConsumer{
66-
messages: make([]*kafkalib.Message, 0),
67-
offset: 0,
68-
notify: make(chan struct{}),
69-
log: log,
70-
commits: make(chan *kafkalib.Message, 1000),
67+
messages: make([]*kafkalib.Message, 0),
68+
offset: 0,
69+
readOffset: 0,
70+
notify: make(chan struct{}),
71+
log: log,
72+
commits: make(chan *kafkalib.Message, 1000),
7173
}
7274

7375
go func() {
@@ -86,11 +88,12 @@ func NewFakeKafkaConsumer(log logrus.FieldLogger, distri <-chan *kafkalib.Messag
8688
func (f *FakeKafkaConsumer) FetchMessage(ctx context.Context) (*kafkalib.Message, error) {
8789
for {
8890
f.msgMu.Lock()
89-
if int64(len(f.messages)) > f.offset {
90-
f.log.WithField("offset", f.offset).Trace("offering message")
91-
msg := f.messages[f.offset]
91+
if int64(len(f.messages)) > f.readOffset {
92+
f.log.WithField("offset", f.readOffset).Trace("offering message")
93+
msg := f.messages[f.readOffset]
9294
f.msgMu.Unlock()
9395

96+
f.readOffset = f.readOffset + 1
9497
return msg, nil
9598
}
9699
f.msgMu.Unlock()
@@ -121,13 +124,14 @@ func (f *FakeKafkaConsumer) CommitMessage(msg *kafkalib.Message) error {
121124
func (f *FakeKafkaConsumer) SetInitialOffset(offset int64) error {
122125
f.msgMu.Lock()
123126
f.offset = offset
127+
f.readOffset = offset
124128
f.msgMu.Unlock()
125129
return nil
126130
}
127131

128132
func (f *FakeKafkaConsumer) Seek(offset int64) error {
129133
f.msgMu.Lock()
130-
f.offset = offset
134+
f.readOffset = offset
131135
f.msgMu.Unlock()
132136
return nil
133137
}

kafka/kafkatest/utils_test.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,30 @@ func TestFakeKafkaProducer_WaitForKey(t *testing.T) {
1515
defer p.Close()
1616
defer c.Close()
1717

18-
key := []byte(`key`)
19-
val := []byte(`foobar`)
20-
err := p.Produce(context.Background(), &kafkalib.Message{
21-
Value: val,
22-
Key: key,
18+
ctx := context.Background()
19+
20+
err := p.Produce(ctx, &kafkalib.Message{
21+
Key: []byte(`key1`),
22+
Value: []byte(`val1`),
23+
})
24+
require.NoError(t, err)
25+
26+
err = p.Produce(ctx, &kafkalib.Message{
27+
Key: []byte(`key2`),
28+
Value: []byte(`val2`),
2329
})
24-
msg, err := c.FetchMessage(context.Background())
2530
require.NoError(t, err)
26-
require.Equal(t, key, msg.Key)
27-
require.Equal(t, val, msg.Value)
31+
32+
msg, err := c.FetchMessage(ctx)
33+
require.NoError(t, err)
34+
require.Equal(t, "key1", string(msg.Key))
35+
require.Equal(t, "val1", string(msg.Value))
36+
37+
msg, err = c.FetchMessage(ctx)
38+
require.NoError(t, err)
39+
require.Equal(t, "key2", string(msg.Key))
40+
require.Equal(t, "val2", string(msg.Value))
2841

2942
require.NoError(t, c.CommitMessage(msg))
30-
require.True(t, p.WaitForKey(key))
43+
require.True(t, p.WaitForKey([]byte(`key2`)))
3144
}

0 commit comments

Comments
 (0)