From 0400b0377280af3d546d946e7f885cdf07168000 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 23 Feb 2025 03:29:33 +0530 Subject: [PATCH] fix: server stuck during shutdown in multi-partitioned pipeline (#175) Signed-off-by: Yashash H L --- pkg/batchmapper/server.go | 1 + pkg/batchmapper/service.go | 8 ++++++-- pkg/mapper/server.go | 1 + pkg/mapper/service.go | 8 ++++++-- pkg/mapstreamer/server.go | 1 + pkg/mapstreamer/service.go | 8 ++++++-- pkg/sideinput/server.go | 1 + pkg/sideinput/service.go | 6 +++++- pkg/sinker/server.go | 1 + pkg/sinker/service.go | 8 ++++++-- pkg/sourcer/server.go | 1 + pkg/sourcer/service.go | 16 ++++++++++++---- pkg/sourcetransformer/server.go | 1 + pkg/sourcetransformer/service.go | 8 ++++++-- 14 files changed, 54 insertions(+), 15 deletions(-) diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 701aff63..396b24d8 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -36,6 +36,7 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server { svc := &Service{ BatchMapper: m, shutdownCh: shutdownCh, + once: sync.Once{}, } return &server{ diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index dd187bab..dbdf2a3b 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "runtime/debug" + "sync" "golang.org/x/sync/errgroup" epb "google.golang.org/genproto/googleapis/rpc/errdetails" @@ -31,6 +32,7 @@ type Service struct { mappb.UnimplementedMapServer BatchMapper BatchMapper shutdownCh chan<- struct{} + once sync.Once } // IsReady returns true to indicate the gRPC connection is ready. @@ -65,8 +67,10 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { log.Printf("Stopping the BatchMapFn") return nil } - log.Printf("Stopping the BatchMapFn with err, %s", err) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + log.Printf("Stopping the BatchMapFn with err, %s", err) + fs.shutdownCh <- struct{}{} + }) return err } } diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 53f7dfb3..f3339f67 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -36,6 +36,7 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server { svc := &Service{ Mapper: m, shutdownCh: shutdownCh, + once: sync.Once{}, } return &server{ diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index ed3bb72d..b877cf0b 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "runtime/debug" + "sync" "golang.org/x/sync/errgroup" epb "google.golang.org/genproto/googleapis/rpc/errdetails" @@ -32,6 +33,7 @@ type Service struct { mappb.UnimplementedMapServer Mapper Mapper shutdownCh chan<- struct{} + once sync.Once } // IsReady returns true to indicate the gRPC connection is ready. @@ -121,8 +123,10 @@ outer: // wait for all goroutines to finish if err := g.Wait(); err != nil { - log.Printf("Stopping the MapFn with err, %s", err) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + log.Printf("Stopping the MapFn with err, %s", err) + fs.shutdownCh <- struct{}{} + }) return err } diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index 42bdcd82..6c62d51b 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -36,6 +36,7 @@ func NewServer(ms MapStreamer, inputOptions ...Option) numaflow.Server { svc := &Service{ MapperStream: ms, shutdownCh: shutdownCh, + once: sync.Once{}, } return &server{ diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 5cda6e32..7034b0d5 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "runtime/debug" + "sync" "golang.org/x/sync/errgroup" epb "google.golang.org/genproto/googleapis/rpc/errdetails" @@ -32,6 +33,7 @@ type Service struct { mappb.UnimplementedMapServer shutdownCh chan<- struct{} MapperStream MapStreamer + once sync.Once } // IsReady returns true to indicate the gRPC connection is ready. @@ -112,8 +114,10 @@ outer: // wait for all goroutines to finish if err := g.Wait(); err != nil { - log.Printf("Stopping the MapFn with err, %s", err) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + log.Printf("Stopping the MapFn with err, %s", err) + fs.shutdownCh <- struct{}{} + }) return err } diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index f1c0862b..81822a19 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -36,6 +36,7 @@ func NewSideInputServer(r SideInputRetriever, inputOptions ...Option) numaflow.S svc := &Service{ Retriever: r, shutdownCh: shutdownCh, + once: sync.Once{}, } return &server{ diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index e4b038b2..ffcb2daf 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -5,6 +5,7 @@ import ( "errors" "log" "runtime/debug" + "sync" epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" @@ -29,6 +30,7 @@ type Service struct { sideinputpb.UnimplementedSideInputServer Retriever SideInputRetriever shutdownCh chan<- struct{} + once sync.Once } // IsReady returns true to indicate the gRPC connection is ready. @@ -42,7 +44,9 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (res defer func() { if r := recover(); r != nil { log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + fs.shutdownCh <- struct{}{} + }) st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{ Detail: string(debug.Stack()), }) diff --git a/pkg/sinker/server.go b/pkg/sinker/server.go index 82987bb3..548a0840 100644 --- a/pkg/sinker/server.go +++ b/pkg/sinker/server.go @@ -36,6 +36,7 @@ func NewServer(h Sinker, inputOptions ...Option) numaflow.Server { svc := &Service{ Sinker: h, shutdownCh: shutdownCh, + once: sync.Once{}, } return &sinkServer{ diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index b0052b9c..df689ba4 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "runtime/debug" + "sync" "time" "golang.org/x/sync/errgroup" @@ -70,6 +71,7 @@ type Service struct { sinkpb.UnimplementedSinkServer shutdownCh chan<- struct{} Sinker Sinker + once sync.Once } // IsReady returns true to indicate the gRPC connection is ready. @@ -104,8 +106,10 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { log.Printf("Stopping the SinkFn") return nil } - log.Printf("Stopping the SinkFn with err, %s", err) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + log.Printf("Stopping the SinkFn with err, %s", err) + fs.shutdownCh <- struct{}{} + }) return err } } diff --git a/pkg/sourcer/server.go b/pkg/sourcer/server.go index 0f759a79..7b78a252 100644 --- a/pkg/sourcer/server.go +++ b/pkg/sourcer/server.go @@ -38,6 +38,7 @@ func NewServer( svc := &Service{ Source: source, shutdownCh: shutdownCh, + once: sync.Once{}, } return &server{ diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 60e3668f..7dcf6310 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "runtime/debug" + "sync" "time" "golang.org/x/sync/errgroup" @@ -31,6 +32,7 @@ type Service struct { sourcepb.UnimplementedSourceServer Source Sourcer shutdownCh chan<- struct{} + once sync.Once } var errSourceReadPanic = errors.New("UDF_EXECUTION_ERROR(source)") @@ -49,8 +51,10 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { if errors.Is(err, io.EOF) { return nil } - log.Printf("error processing requests: %v", err) - fs.shutdownCh <- struct{}{} + log.Printf("error processing read requests: %v", err) + fs.once.Do(func() { + fs.shutdownCh <- struct{}{} + }) return err } } @@ -280,7 +284,9 @@ func (fs *Service) receiveAckRequests(ctx context.Context, stream sourcepb.Sourc defer func() { if r := recover(); r != nil { log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + fs.shutdownCh <- struct{}{} + }) err = fmt.Errorf("panic inside source handler: %v", r) } }() @@ -328,7 +334,9 @@ func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.P defer func() { if r := recover(); r != nil { log.Printf("panic inside sourcer handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + fs.shutdownCh <- struct{}{} + }) } }() diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index b0aab9f5..76093107 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -35,6 +35,7 @@ func NewServer(m SourceTransformer, inputOptions ...Option) numaflow.Server { svc := &Service{ Transformer: m, shutdownCh: shutdownCh, + once: sync.Once{}, } return &server{ diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 4244ac48..01d735ea 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -7,6 +7,7 @@ import ( "io" "log" "runtime/debug" + "sync" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -32,6 +33,7 @@ type Service struct { v1.UnimplementedSourceTransformServer Transformer SourceTransformer shutdownCh chan<- struct{} + once sync.Once } // IsReady returns true to indicate the gRPC connection is ready. @@ -122,8 +124,10 @@ outer: // wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately. if err := grp.Wait(); err != nil { - log.Printf("Stopping the SourceTransformFn with err, %s", err) - fs.shutdownCh <- struct{}{} + fs.once.Do(func() { + log.Printf("Stopping the SourceTransformFn with err, %s", err) + fs.shutdownCh <- struct{}{} + }) return err }