diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 741cc28f..9453c014 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -146,32 +146,35 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour // processReadData processes the read data and sends it to the client. func (fs *Service) processReadData(ctx context.Context, stream sourcepb.Source_ReadFnServer, messageCh <-chan Message) error { - select { - case <-ctx.Done(): - return ctx.Err() - case msg, ok := <-messageCh: - if !ok { - break - } - offset := &sourcepb.Offset{ - Offset: msg.Offset().Value(), - PartitionId: msg.Offset().PartitionId(), - } - element := &sourcepb.ReadResponse{ - Result: &sourcepb.ReadResponse_Result{ - Payload: msg.Value(), - Offset: offset, - EventTime: timestamppb.New(msg.EventTime()), - Keys: msg.Keys(), - Headers: msg.Headers(), - }, - Status: &sourcepb.ReadResponse_Status{ - Eot: false, - Code: 0, - }, - } - if err := stream.Send(element); err != nil { - return err +readLoop: + for { + select { + case <-ctx.Done(): + return ctx.Err() + case msg, ok := <-messageCh: + if !ok { + break readLoop + } + offset := &sourcepb.Offset{ + Offset: msg.Offset().Value(), + PartitionId: msg.Offset().PartitionId(), + } + element := &sourcepb.ReadResponse{ + Result: &sourcepb.ReadResponse_Result{ + Payload: msg.Value(), + Offset: offset, + EventTime: timestamppb.New(msg.EventTime()), + Keys: msg.Keys(), + Headers: msg.Headers(), + }, + Status: &sourcepb.ReadResponse_Status{ + Eot: false, + Code: 0, + }, + } + if err := stream.Send(element); err != nil { + return err + } } } err := stream.Send(&sourcepb.ReadResponse{