From 332006cadb557b82df200e6a55fb3d7f9655044a Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 6 Nov 2024 21:48:59 +0530 Subject: [PATCH] chore: fix sourcer to read all responses (#168) Signed-off-by: Yashash H L --- pkg/sourcer/service.go | 55 ++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 741cc28f..9453c014 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -146,32 +146,35 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour // processReadData processes the read data and sends it to the client. func (fs *Service) processReadData(ctx context.Context, stream sourcepb.Source_ReadFnServer, messageCh <-chan Message) error { - select { - case <-ctx.Done(): - return ctx.Err() - case msg, ok := <-messageCh: - if !ok { - break - } - offset := &sourcepb.Offset{ - Offset: msg.Offset().Value(), - PartitionId: msg.Offset().PartitionId(), - } - element := &sourcepb.ReadResponse{ - Result: &sourcepb.ReadResponse_Result{ - Payload: msg.Value(), - Offset: offset, - EventTime: timestamppb.New(msg.EventTime()), - Keys: msg.Keys(), - Headers: msg.Headers(), - }, - Status: &sourcepb.ReadResponse_Status{ - Eot: false, - Code: 0, - }, - } - if err := stream.Send(element); err != nil { - return err +readLoop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg, ok := <-messageCh: + if !ok { + break readLoop + } + offset := &sourcepb.Offset{ + Offset: msg.Offset().Value(), + PartitionId: msg.Offset().PartitionId(), + } + element := &sourcepb.ReadResponse{ + Result: &sourcepb.ReadResponse_Result{ + Payload: msg.Value(), + Offset: offset, + EventTime: timestamppb.New(msg.EventTime()), + Keys: msg.Keys(), + Headers: msg.Headers(), + }, + Status: &sourcepb.ReadResponse_Status{ + Eot: false, + Code: 0, + }, + } + if err := stream.Send(element); err != nil { + return err + } } } err := stream.Send(&sourcepb.ReadResponse{