Skip to content

Commit 1db836a

Browse files
committed
Add method to fetch messages in batch
Since FetchMessage is already reading messages from a fetched batch, this new method just hold the messages util the batchSize number of messages are read. Fixes #123
1 parent af1725f commit 1db836a

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/segmentio/kafka-go
22

3-
go 1.23
3+
go 1.23.0
44

55
require (
66
github.com/klauspost/compress v1.15.9

reader.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,68 @@ func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
912912
}
913913
}
914914

915+
// FetchMessageBatch fetches a batch of messages from the reader. It is similar to
916+
// FetchMessage, except it blocks until no. of messages read reaches batchSize.
917+
func (r *Reader) FetchMessageBatch(ctx context.Context, batchSize int) ([]Message, error) {
918+
r.activateReadLag()
919+
msgBatch := make([]Message, 0, batchSize)
920+
921+
var i int
922+
for i <= batchSize {
923+
r.mutex.Lock()
924+
925+
if !r.closed && r.version == 0 {
926+
r.start(r.getTopicPartitionOffset())
927+
}
928+
929+
version := r.version
930+
r.mutex.Unlock()
931+
932+
select {
933+
case <-ctx.Done():
934+
return []Message{}, ctx.Err()
935+
936+
case err := <-r.runError:
937+
return []Message{}, err
938+
939+
case m, ok := <-r.msgs:
940+
if !ok {
941+
return []Message{}, io.EOF
942+
}
943+
944+
if m.version < version {
945+
continue
946+
}
947+
948+
r.mutex.Lock()
949+
950+
switch {
951+
case m.error != nil:
952+
case version == r.version:
953+
r.offset = m.message.Offset + 1
954+
r.lag = m.watermark - r.offset
955+
}
956+
957+
r.mutex.Unlock()
958+
959+
if errors.Is(m.error, io.EOF) {
960+
// io.EOF is used as a marker to indicate that the stream
961+
// has been closed, in case it was received from the inner
962+
// reader we don't want to confuse the program and replace
963+
// the error with io.ErrUnexpectedEOF.
964+
m.error = io.ErrUnexpectedEOF
965+
}
966+
if m.error != nil {
967+
return nil, m.error
968+
}
969+
970+
msgBatch = append(msgBatch, m.message)
971+
}
972+
i++
973+
}
974+
return msgBatch, nil
975+
}
976+
915977
// ReadLag returns the current lag of the reader by fetching the last offset of
916978
// the topic and partition and computing the difference between that value and
917979
// the offset of the last message returned by ReadMessage.

0 commit comments

Comments
 (0)