diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 2d58de8c..5ffaa7b0 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -34,6 +34,13 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { ctx := stream.Context() errCh := make(chan error, 1) + // Send an empty header so that the client can start sending read requests + // (rust client does not send read requests until it receives a header) + err := stream.SendHeader(map[string][]string{}) + if err != nil { + return err + } + var wg sync.WaitGroup wg.Add(1) @@ -133,6 +140,13 @@ func (a *ackRequest) Offset() Offset { func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error { ctx := stream.Context() + // Send an empty header so that the client can start sending ack requests + // (rust client does not send ack requests until it receives a header) + err := stream.SendHeader(map[string][]string{}) + if err != nil { + return err + } + // handle panic defer func() { if r := recover(); r != nil {