diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 676e2349..dd187bab 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -9,6 +9,9 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" @@ -21,6 +24,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) +var errBatchMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(batchmap)") + // Service implements the proto gen server interface and contains the map operation handler. type Service struct { mappb.UnimplementedMapServer @@ -157,7 +162,10 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer defer func() { if r := recover(); r != nil { log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside batch map handler: %v", r) + st, _ := status.Newf(codes.Internal, "%s: %v", errBatchMapHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index bc1cb64d..ed3bb72d 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -23,6 +24,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) +var errMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(map)") + // Service implements the proto gen server interface and contains the map operation // handler. type Service struct { @@ -120,12 +123,12 @@ outer: if err := g.Wait(); err != nil { log.Printf("Stopping the MapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "error processing requests: %v", err) + return err } // check if there was an error while reading from the stream if readErr != nil { - return status.Errorf(codes.Internal, readErr.Error()) + return status.Errorf(codes.Internal, "%s", readErr.Error()) } return nil @@ -156,7 +159,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res defer func() { if r := recover(); r != nil { log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) - err = status.Errorf(codes.Internal, "panic inside map handler: %v", r) + st, _ := status.Newf(codes.Internal, "%s: %v", errMapHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() diff --git a/pkg/mapper/service_test.go b/pkg/mapper/service_test.go index 7ec939d1..9d074686 100644 --- a/pkg/mapper/service_test.go +++ b/pkg/mapper/service_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "strings" "testing" "time" @@ -254,9 +255,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) { } func TestService_MapFn_Panic(t *testing.T) { + panicMssg := "map failed" svc := &Service{ Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { - panic("map failed") + panic(panicMssg) }), // panic in the transformer causes the server to send a shutdown signal to shutdownCh channel. // The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else. @@ -288,6 +290,9 @@ func TestService_MapFn_Panic(t *testing.T) { _, err = stream.Recv() require.Error(t, err, "Expected error while receiving message from the stream") gotStatus, _ := status.FromError(err) - expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed")) - require.Equal(t, expectedStatus, gotStatus) + gotMessage := gotStatus.Message() + expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapHandlerPanic, panicMssg)) + expectedMessage := expectedStatus.Message() + require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal") + require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message") } diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 8795ff08..5cda6e32 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -23,6 +24,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) +var errMapStreamHandlerPanic = errors.New("UDF_EXECUTION_ERROR(mapstream)") + // Service implements the proto gen server interface and contains the map // streaming function. type Service struct { @@ -111,12 +114,12 @@ outer: if err := g.Wait(); err != nil { log.Printf("Stopping the MapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "error processing requests: %v", err) + return err } // check if there was an error while reading from the stream if readErr != nil { - return status.Errorf(codes.Internal, readErr.Error()) + return status.Errorf(codes.Internal, "%s", readErr.Error()) } return nil @@ -127,7 +130,10 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes defer func() { if r := recover(); r != nil { log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside mapStream handler: %v", r) + st, _ := status.Newf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() return } }() diff --git a/pkg/mapstreamer/service_test.go b/pkg/mapstreamer/service_test.go index fd5b35e9..5629f197 100644 --- a/pkg/mapstreamer/service_test.go +++ b/pkg/mapstreamer/service_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net" + "strings" "testing" "time" @@ -326,9 +327,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) { } func TestService_MapFn_Panic(t *testing.T) { + panicMssg := "map failed" svc := &Service{ MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) { - panic("map failed") + panic(panicMssg) }), shutdownCh: make(chan<- struct{}, 1), } @@ -357,8 +359,11 @@ func TestService_MapFn_Panic(t *testing.T) { _, err = stream.Recv() require.Error(t, err, "Expected error while receiving message from the stream") gotStatus, _ := status.FromError(err) - expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed")) - require.Equal(t, expectedStatus, gotStatus) + gotMessage := gotStatus.Message() + expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, panicMssg)) + expectedMessage := expectedStatus.Message() + require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal") + require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message") } func TestService_MapFn_MultipleRequestsAndResponses(t *testing.T) { diff --git a/pkg/reducer/service.go b/pkg/reducer/service.go index 5786dcd1..32ed1c42 100644 --- a/pkg/reducer/service.go +++ b/pkg/reducer/service.go @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // create a new reduce task and start the reduce operation err = taskManager.CreateTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case reducepb.ReduceRequest_WindowOperation_APPEND: // append the datum to the reduce task err = taskManager.AppendToTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } } @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // wait for the go routine which reads from the output channel and sends to the stream to return err = g.Wait() if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } diff --git a/pkg/reducestreamer/service.go b/pkg/reducestreamer/service.go index ac2eaee1..9db0190c 100644 --- a/pkg/reducestreamer/service.go +++ b/pkg/reducestreamer/service.go @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // create a new reduce task and start the reduce operation err = taskManager.CreateTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case reducepb.ReduceRequest_WindowOperation_APPEND: // append the datum to the reduce task err = taskManager.AppendToTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } } @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { // wait for the go routine which reads from the output channel and sends to the stream to return err = g.Wait() if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } diff --git a/pkg/sessionreducer/service.go b/pkg/sessionreducer/service.go index 28959207..87f09bf7 100644 --- a/pkg/sessionreducer/service.go +++ b/pkg/sessionreducer/service.go @@ -59,7 +59,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR } if recvErr != nil { - statusErr := status.Errorf(codes.Internal, recvErr.Error()) + statusErr := status.Errorf(codes.Internal, "%s", recvErr.Error()) return statusErr } @@ -70,7 +70,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR // also append the datum to the task err := taskManager.CreateTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE: @@ -80,21 +80,21 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR // append the datum to the task err := taskManager.AppendToTask(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case sessionreducepb.SessionReduceRequest_WindowOperation_MERGE: // merge the tasks err := taskManager.MergeTasks(ctx, d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } case sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND: // expand the task err := taskManager.ExpandTask(d) if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } } @@ -107,7 +107,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR // wait for the go routine which reads from the output channel and sends to the stream to return err := g.Wait() if err != nil { - statusErr := status.Errorf(codes.Internal, err.Error()) + statusErr := status.Errorf(codes.Internal, "%s", err.Error()) return statusErr } diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 7628c00e..e4b038b2 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -2,9 +2,13 @@ package sideinput import ( "context" + "errors" "log" "runtime/debug" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1" @@ -18,6 +22,8 @@ const ( serverInfoFilePath = "/var/run/numaflow/sideinput-server-info" ) +var errSideInputHandlerPanic = errors.New("UDF_EXECUTION_ERROR(side input)") + // Service implements the proto gen server interface and contains the retrieve operation handler type Service struct { sideinputpb.UnimplementedSideInputServer @@ -31,19 +37,22 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR } // RetrieveSideInput applies the function for each side input retrieval request. -func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) { +func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (resp *sideinputpb.SideInputResponse, err error) { // handle panic defer func() { if r := recover(); r != nil { log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} + st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() messageSi := fs.Retriever.RetrieveSideInput(ctx) - var element *sideinputpb.SideInputResponse - element = &sideinputpb.SideInputResponse{ + resp = &sideinputpb.SideInputResponse{ Value: messageSi.value, NoBroadcast: messageSi.noBroadcast, } - return element, nil + return resp, nil } diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index 8c8c7f9a..b0052b9c 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -10,6 +10,9 @@ import ( "time" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" @@ -26,6 +29,8 @@ const ( UDContainerFallbackSink = "fb-udsink" ) +var errSinkHandlerPanic = errors.New("UDF_EXECUTION_ERROR(sink)") + // handlerDatum implements the Datum interface and is used in the sink functions. type handlerDatum struct { id string @@ -193,7 +198,10 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer defer func() { if r := recover(); r != nil { log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside sink handler: %v", r) + st, _ := status.Newf(codes.Internal, "%s: %v", errSinkHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() responses := fs.Sinker.Sink(ctx, datumStreamCh) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 9453c014..60e3668f 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -10,6 +10,9 @@ import ( "time" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -30,6 +33,8 @@ type Service struct { shutdownCh chan<- struct{} } +var errSourceReadPanic = errors.New("UDF_EXECUTION_ERROR(source)") + // ReadFn reads the data from the source. func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { ctx := stream.Context() @@ -120,7 +125,10 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour defer func() { if r := recover(); r != nil { log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside source handler: %v", r) + st, _ := status.Newf(codes.Internal, "%s: %v", errSourceReadPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() return } close(messageCh) diff --git a/pkg/sourcetransformer/messaget.go b/pkg/sourcetransformer/message.go similarity index 100% rename from pkg/sourcetransformer/messaget.go rename to pkg/sourcetransformer/message.go diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index e19690f3..4244ac48 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -38,7 +39,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, return &v1.ReadyResponse{Ready: true}, nil } -var errTransformerPanic = errors.New("transformer function panicked") +var errTransformerPanic = errors.New("UDF_EXECUTION_ERROR(transformer)") // recvWithContext wraps stream.Recv() to respect context cancellation. func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) { @@ -123,13 +124,14 @@ outer: if err := grp.Wait(); err != nil { log.Printf("Stopping the SourceTransformFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, err.Error()) + return err } // check if there was an error while reading from the stream if readErr != nil { - return status.Errorf(codes.Internal, readErr.Error()) + return status.Errorf(codes.Internal, "%s", readErr.Error()) } + return nil } @@ -158,7 +160,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq defer func() { if r := recover(); r != nil { log.Printf("panic inside handler: %v %v", r, string(debug.Stack())) - err = errTransformerPanic + st, _ := status.Newf(codes.Internal, "%s: %v", errTransformerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() diff --git a/pkg/sourcetransformer/service_test.go b/pkg/sourcetransformer/service_test.go index 79449c9a..e53c8529 100644 --- a/pkg/sourcetransformer/service_test.go +++ b/pkg/sourcetransformer/service_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net" + "strings" "testing" "time" @@ -259,9 +260,10 @@ func TestService_SourceTransformFn_Multiple_Messages(t *testing.T) { } func TestService_SourceTransformFn_Panic(t *testing.T) { + panicMssg := "transformer panicked" svc := &Service{ Transformer: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages { - panic("transformer panicked") + panic(panicMssg) }), // panic in the transformer causes the server to send a shutdown signal to shutdownCh channel. // The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else. @@ -293,6 +295,9 @@ func TestService_SourceTransformFn_Panic(t *testing.T) { _, err = stream.Recv() require.Error(t, err, "Expected error while receiving message from the stream") gotStatus, _ := status.FromError(err) - expectedStatus := status.Convert(status.Errorf(codes.Internal, errTransformerPanic.Error())) - require.Equal(t, expectedStatus, gotStatus) + gotMessage := gotStatus.Message() + expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errTransformerPanic, panicMssg)) + expectedMessage := expectedStatus.Message() + require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal") + require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message") }