From d82fc27d714d7057bc77f78f2a0d2a754c1b44e0 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 30 Aug 2024 03:14:30 +0530 Subject: [PATCH] chore: log panics inside udf (#145) Signed-off-by: Yashash H L --- pkg/batchmapper/server.go | 2 ++ pkg/batchmapper/service.go | 2 ++ pkg/mapper/server.go | 2 ++ pkg/mapper/service.go | 3 +++ pkg/mapstreamer/server.go | 2 ++ pkg/mapstreamer/service.go | 3 +++ pkg/reducer/server.go | 3 ++- pkg/reducer/task_manager.go | 3 +++ pkg/reducestreamer/server.go | 2 ++ pkg/reducestreamer/task_manager.go | 3 +++ pkg/sessionreducer/server.go | 2 ++ pkg/sessionreducer/task_manager.go | 4 ++++ pkg/shared/util.go | 4 ++++ pkg/sideinput/server.go | 2 ++ pkg/sideinput/service.go | 3 +++ pkg/sinker/server.go | 2 ++ pkg/sinker/service.go | 3 +++ pkg/sourcer/server.go | 2 ++ pkg/sourcer/service.go | 6 ++++++ pkg/sourcetransformer/server.go | 2 ++ pkg/sourcetransformer/service.go | 3 +++ 21 files changed, 57 insertions(+), 1 deletion(-) diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 61736427..c6fe51e4 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -3,6 +3,7 @@ package batchmapper import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -76,6 +77,7 @@ func (m *server) Start(ctx context.Context) error { defer wg.Done() select { case <-m.shutdownCh: + log.Printf("received shutdown signal") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(m.grpcServer) diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index ba4df99d..06b4a3cf 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "log" + "runtime/debug" "go.uber.org/atomic" "golang.org/x/sync/errgroup" @@ -55,6 +56,7 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 456f33c6..729026fe 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -3,6 +3,7 @@ package mapper import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -76,6 +77,7 @@ func (m *server) Start(ctx context.Context) error { defer wg.Done() select { case <-m.shutdownCh: + log.Printf("shutdown signal received") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(m.grpcServer) diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 7d5c2dc7..235e372b 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -2,6 +2,8 @@ package mapper import ( "context" + "log" + "runtime/debug" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -38,6 +40,7 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.Map // Use defer and recover to handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} // Send shutdown signal err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r) } diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index 5b810fc9..78130071 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -3,6 +3,7 @@ package mapstreamer import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -76,6 +77,7 @@ func (m *server) Start(ctx context.Context) error { defer wg.Done() select { case <-m.shutdownCh: + log.Printf("shutdown signal received") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(m.grpcServer) diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 3a865869..2b883316 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -2,6 +2,8 @@ package mapstreamer import ( "context" + "log" + "runtime/debug" "google.golang.org/protobuf/types/known/emptypb" @@ -39,6 +41,7 @@ func (fs *Service) MapStreamFn(d *mapstreampb.MapStreamRequest, stream mapstream // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() diff --git a/pkg/reducer/server.go b/pkg/reducer/server.go index e82f8039..180126cd 100644 --- a/pkg/reducer/server.go +++ b/pkg/reducer/server.go @@ -3,6 +3,7 @@ package reducer import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -60,7 +61,6 @@ func (r *server) Start(ctx context.Context) error { // create a grpc server r.grpcServer = shared.CreateGRPCServer(r.opts.maxMessageSize) - defer r.grpcServer.GracefulStop() // register the reduce service reducepb.RegisterReduceServer(r.grpcServer, r.svc) @@ -72,6 +72,7 @@ func (r *server) Start(ctx context.Context) error { defer wg.Done() select { case <-r.shutdownCh: + log.Printf("received shutdown signal") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(r.grpcServer) diff --git a/pkg/reducer/task_manager.go b/pkg/reducer/task_manager.go index d2f03730..719d26f3 100644 --- a/pkg/reducer/task_manager.go +++ b/pkg/reducer/task_manager.go @@ -3,6 +3,8 @@ package reducer import ( "context" "fmt" + "log" + "runtime/debug" "strings" v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" @@ -89,6 +91,7 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack())) rtm.shutdownCh <- struct{}{} } }() diff --git a/pkg/reducestreamer/server.go b/pkg/reducestreamer/server.go index 0d460b6f..51b12222 100644 --- a/pkg/reducestreamer/server.go +++ b/pkg/reducestreamer/server.go @@ -3,6 +3,7 @@ package reducestreamer import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -73,6 +74,7 @@ func (r *server) Start(ctx context.Context) error { defer wg.Done() select { case <-r.shutdownCh: + log.Printf("received shutdown signal") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(r.grpcServer) diff --git a/pkg/reducestreamer/task_manager.go b/pkg/reducestreamer/task_manager.go index 2a6db6e6..633d3ad3 100644 --- a/pkg/reducestreamer/task_manager.go +++ b/pkg/reducestreamer/task_manager.go @@ -3,6 +3,8 @@ package reducestreamer import ( "context" "fmt" + "log" + "runtime/debug" "strings" "sync" @@ -102,6 +104,7 @@ func (rtm *reduceStreamTaskManager) CreateTask(ctx context.Context, request *v1. // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack())) rtm.shutdownCh <- struct{}{} } }() diff --git a/pkg/sessionreducer/server.go b/pkg/sessionreducer/server.go index 4249e76b..ed3fdff5 100644 --- a/pkg/sessionreducer/server.go +++ b/pkg/sessionreducer/server.go @@ -3,6 +3,7 @@ package sessionreducer import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -73,6 +74,7 @@ func (r *server) Start(ctx context.Context) error { defer wg.Done() select { case <-r.shutdownCh: + log.Printf("received shutdown signal") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(r.grpcServer) diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index 12e912f1..44d8d880 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -3,6 +3,8 @@ package sessionreducer import ( "context" "fmt" + "log" + "runtime/debug" "strings" "sync" @@ -134,6 +136,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside session reduce handler: %v %v", r, string(debug.Stack())) rtm.shutdownCh <- struct{}{} } }() @@ -207,6 +210,7 @@ func (rtm *sessionReduceTaskManager) MergeTasks(ctx context.Context, request *v1 // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside session reduce handler: %v %v", r, string(debug.Stack())) rtm.shutdownCh <- struct{}{} } }() diff --git a/pkg/shared/util.go b/pkg/shared/util.go index 347ed1a7..1755bd8b 100644 --- a/pkg/shared/util.go +++ b/pkg/shared/util.go @@ -2,6 +2,7 @@ package shared import ( "fmt" + "log" "net" "os" "time" @@ -54,6 +55,7 @@ func StopGRPCServer(grpcServer *grpc.Server) { // if it is not stopped, stop it forcefully stopped := make(chan struct{}) go func() { + log.Printf("gracefully stopping grpc server") grpcServer.GracefulStop() close(stopped) }() @@ -61,8 +63,10 @@ func StopGRPCServer(grpcServer *grpc.Server) { t := time.NewTimer(30 * time.Second) select { case <-t.C: + log.Printf("forcefully stopping grpc server") grpcServer.Stop() case <-stopped: t.Stop() } + log.Printf("grpc server stopped") } diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index e32ab7a1..3d394c58 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -3,6 +3,7 @@ package sideinput import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -71,6 +72,7 @@ func (s *server) Start(ctx context.Context) error { defer wg.Done() select { case <-s.shutdownCh: + log.Printf("shutdown signal received") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(s.grpcServer) diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index b10bdff7..7628c00e 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -2,6 +2,8 @@ package sideinput import ( "context" + "log" + "runtime/debug" "google.golang.org/protobuf/types/known/emptypb" @@ -33,6 +35,7 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*si // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() diff --git a/pkg/sinker/server.go b/pkg/sinker/server.go index 3e298f66..9ac5f4cf 100644 --- a/pkg/sinker/server.go +++ b/pkg/sinker/server.go @@ -3,6 +3,7 @@ package sinker import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -74,6 +75,7 @@ func (s *sinkServer) Start(ctx context.Context) error { defer wg.Done() select { case <-s.shutdownCh: + log.Printf("shutdown signal received") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(s.grpcServer) diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index d775cbdd..bb99dab7 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -3,6 +3,8 @@ package sinker import ( "context" "io" + "log" + "runtime/debug" "sync" "time" @@ -83,6 +85,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() diff --git a/pkg/sourcer/server.go b/pkg/sourcer/server.go index 247fa37a..12076e9e 100644 --- a/pkg/sourcer/server.go +++ b/pkg/sourcer/server.go @@ -3,6 +3,7 @@ package sourcer import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -76,6 +77,7 @@ func (s *server) Start(ctx context.Context) error { defer wg.Done() select { case <-s.shutdownCh: + log.Printf("shutdown signal received") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(s.grpcServer) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index e4e5426d..2667b776 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -2,6 +2,8 @@ package sourcer import ( "context" + "log" + "runtime/debug" "time" "google.golang.org/protobuf/types/known/emptypb" @@ -34,6 +36,7 @@ func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.P // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside sourcer handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() @@ -72,6 +75,7 @@ func (fs *Service) ReadFn(d *sourcepb.ReadRequest, stream sourcepb.Source_ReadFn // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() @@ -117,6 +121,7 @@ func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() @@ -139,6 +144,7 @@ func (fs *Service) PartitionsFn(ctx context.Context, _ *emptypb.Empty) (*sourcep // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }() diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index 7f22989a..c42da01a 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -3,6 +3,7 @@ package sourcetransformer import ( "context" "fmt" + "log" "os/signal" "sync" "syscall" @@ -71,6 +72,7 @@ func (m *server) Start(ctx context.Context) error { defer wg.Done() select { case <-m.shutdownCh: + log.Printf("shutdown signal received") case <-ctxWithSignal.Done(): } shared.StopGRPCServer(m.grpcServer) diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 1602a50a..1651f825 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -2,6 +2,8 @@ package sourcetransformer import ( "context" + "log" + "runtime/debug" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -36,6 +38,7 @@ func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformR // handle panic defer func() { if r := recover(); r != nil { + log.Printf("panic inside sourcetransform handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} } }()