From 3277ee2cd88f8a81c7d367060046379262ad3aa3 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Wed, 29 Jan 2025 11:58:17 +0530 Subject: [PATCH] handle panic errors and fix tests Signed-off-by: adarsh0728 --- pkg/batchmapper/service.go | 8 ++++++-- pkg/mapper/service_test.go | 5 +++-- pkg/mapstreamer/service_test.go | 5 +++-- pkg/reducer/service.go | 6 +++--- pkg/reducestreamer/service.go | 6 +++--- pkg/sessionreducer/service.go | 12 ++++++------ pkg/sideinput/service.go | 13 +++++++++---- 7 files changed, 33 insertions(+), 22 deletions(-) diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 676e2349..2ab92a3f 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -9,6 +9,8 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" @@ -21,6 +23,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) +var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR: batch map handler panicked") + // Service implements the proto gen server interface and contains the map operation handler. type Service struct { mappb.UnimplementedMapServer @@ -62,7 +66,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { } log.Printf("Stopping the BatchMapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return err + return status.Errorf(codes.Internal, "%s", err.Error()) } } } @@ -157,7 +161,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer defer func() { if r := recover(); r != nil { log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside batch map handler: %v", r) + err = fmt.Errorf("%s: %v", errBatchMapHandlerPanic, r) } }() diff --git a/pkg/mapper/service_test.go b/pkg/mapper/service_test.go index 7ec939d1..bbdc8c95 100644 --- a/pkg/mapper/service_test.go +++ b/pkg/mapper/service_test.go @@ -254,9 +254,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) { } func TestService_MapFn_Panic(t *testing.T) { + panicMssg := "map failed" svc := &Service{ Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { - panic("map failed") + panic(panicMssg) }), // panic in the transformer causes the server to send a shutdown signal to shutdownCh channel. // The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else. @@ -288,6 +289,6 @@ func TestService_MapFn_Panic(t *testing.T) { _, err = stream.Recv() require.Error(t, err, "Expected error while receiving message from the stream") gotStatus, _ := status.FromError(err) - expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed")) + expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapHandlerPanic, panicMssg)) require.Equal(t, expectedStatus, gotStatus) } diff --git a/pkg/mapstreamer/service_test.go b/pkg/mapstreamer/service_test.go index fd5b35e9..1373716e 100644 --- a/pkg/mapstreamer/service_test.go +++ b/pkg/mapstreamer/service_test.go @@ -326,9 +326,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) { } func TestService_MapFn_Panic(t *testing.T) { + panicMssg := "map failed" svc := &Service{ MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { - panic("map failed") + panic(panicMssg) }), shutdownCh: make(chan<- struct{}, 1), } @@ -357,7 +358,7 @@ func TestService_MapFn_Panic(t *testing.T) { _, err = stream.Recv() require.Error(t, err, "Expected error while receiving message from the stream") gotStatus, _ := status.FromError(err) - expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed")) + expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, panicMssg)) require.Equal(t, expectedStatus, gotStatus) } diff --git a/pkg/reducer/service.go b/pkg/reducer/service.go index 5786dcd1..32ed1c42 100644 --- a/pkg/reducer/service.go +++ b/pkg/reducer/service.go @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // create a new reduce task and start the reduce operation err = taskManager.CreateTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case reducepb.ReduceRequest_WindowOperation_APPEND: // append the datum to the reduce task err = taskManager.AppendToTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } } @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // wait for the go routine which reads from the output channel and sends to the stream to return err = g.Wait() if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } diff --git a/pkg/reducestreamer/service.go b/pkg/reducestreamer/service.go index ac2eaee1..9db0190c 100644 --- a/pkg/reducestreamer/service.go +++ b/pkg/reducestreamer/service.go @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // create a new reduce task and start the reduce operation err = taskManager.CreateTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case reducepb.ReduceRequest_WindowOperation_APPEND: // append the datum to the reduce task err = taskManager.AppendToTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } } @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // wait for the go routine which reads from the output channel and sends to the stream to return err = g.Wait() if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } diff --git a/pkg/sessionreducer/service.go b/pkg/sessionreducer/service.go index 28959207..87f09bf7 100644 --- a/pkg/sessionreducer/service.go +++ b/pkg/sessionreducer/service.go @@ -59,7 +59,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR } if recvErr != nil { - statusErr := status.Errorf(codes.Internal, recvErr.Error()) + statusErr := status.Errorf(codes.Internal, "%s", recvErr.Error()) return statusErr } @@ -70,7 +70,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR // also append the datum to the task err := taskManager.CreateTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE: @@ -80,21 +80,21 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR // append the datum to the task err := taskManager.AppendToTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case sessionreducepb.SessionReduceRequest_WindowOperation_MERGE: // merge the tasks err := taskManager.MergeTasks(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND: // expand the task err := taskManager.ExpandTask(d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } } @@ -107,7 +107,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR // wait for the go routine which reads from the output channel and sends to the stream to return err := g.Wait() if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 7628c00e..3738eb99 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -2,9 +2,12 @@ package sideinput import ( "context" + "errors" "log" "runtime/debug" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1" @@ -18,6 +21,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/sideinput-server-info" ) +var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR: side input handler panicked") + // Service implements the proto gen server interface and contains the retrieve operation handler type Service struct { sideinputpb.UnimplementedSideInputServer @@ -31,19 +36,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR } // RetrieveSideInput applies the function for each side input retrieval request. -func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) { +func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (resp *sideinputpb.SideInputResponse, err error) { // handle panic defer func() { if r := recover(); r != nil { log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} + err = status.Errorf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r) } }() messageSi := fs.Retriever.RetrieveSideInput(ctx) - var element *sideinputpb.SideInputResponse - element = &sideinputpb.SideInputResponse{ + resp = &sideinputpb.SideInputResponse{ Value: messageSi.value, NoBroadcast: messageSi.noBroadcast, } - return element, nil + return resp, nil }