From e93fdf3cb01e725bdd276a13d5766026a46d4a8c Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Tue, 28 Jan 2025 15:44:17 +0530 Subject: [PATCH] map and map stream error handling Signed-off-by: adarsh0728 --- pkg/mapper/service.go | 8 +++++--- pkg/mapstreamer/service.go | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index bc1cb64d..823f1c5b 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -23,6 +23,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) +var errMapHandlerPanic = errors.New("USER_CODE_ERROR: map handler panicked") + // Service implements the proto gen server interface and contains the map operation // handler. type Service struct { @@ -120,12 +122,12 @@ outer: if err := g.Wait(); err != nil { log.Printf("Stopping the MapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "error processing requests: %v", err) + return status.Errorf(codes.Internal, "%s", err.Error()) } // check if there was an error while reading from the stream if readErr != nil { - return status.Errorf(codes.Internal, readErr.Error()) + return status.Errorf(codes.Internal, "%s", readErr.Error()) } return nil @@ -156,7 +158,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res defer func() { if r := recover(); r != nil { log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) - err = status.Errorf(codes.Internal, "panic inside map handler: %v", r) + err = fmt.Errorf("%s: %v", errMapHandlerPanic, r) } }() diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 8795ff08..ce520252 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -23,6 +23,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) +var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR: map stream handler panicked") + // Service implements the proto gen server interface and contains the map // streaming function. type Service struct { @@ -111,12 +113,12 @@ outer: if err := g.Wait(); err != nil { log.Printf("Stopping the MapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "error processing requests: %v", err) + return status.Errorf(codes.Internal, "%s", err.Error()) } // check if there was an error while reading from the stream if readErr != nil { - return status.Errorf(codes.Internal, readErr.Error()) + return status.Errorf(codes.Internal, "%s", readErr.Error()) } return nil @@ -127,7 +129,7 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes defer func() { if r := recover(); r != nil { log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside mapStream handler: %v", r) + err = fmt.Errorf("%s: %v", errMapStreamHandlerPanic, r) return } }()