From 568c60d3dd8cbff9a6e6e0ae6eaaa7a24d6d506c Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 11 Sep 2024 21:26:05 +0530 Subject: [PATCH] send empty header Signed-off-by: Yashash H L --- pkg/sourcer/service.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 2d58de8c..5ffaa7b0 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -34,6 +34,13 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { ctx := stream.Context() errCh := make(chan error, 1) + // Send an empty header so that the client can start sending read requests + // (rust client does not send read requests until it receives a header) + err := stream.SendHeader(map[string][]string{}) + if err != nil { + return err + } + var wg sync.WaitGroup wg.Add(1) @@ -133,6 +140,13 @@ func (a *ackRequest) Offset() Offset { func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error { ctx := stream.Context() + // Send an empty header so that the client can start sending ack requests + // (rust client does not send ack requests until it receives a header) + err := stream.SendHeader(map[string][]string{}) + if err != nil { + return err + } + // handle panic defer func() { if r := recover(); r != nil {