From 074bb1fb62458af2ac0c9ef80322d86cc2714b5f Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Mon, 3 Feb 2025 15:46:43 +0530 Subject: [PATCH] added debug stack in details (debug info) of gRPC status Signed-off-by: adarsh0728 --- pkg/batchmapper/service.go | 10 +++++++--- pkg/mapper/service.go | 10 +++++++--- pkg/mapstreamer/service.go | 10 +++++++--- pkg/sideinput/service.go | 8 ++++++-- pkg/sinker/service.go | 10 +++++++--- pkg/sourcer/service.go | 10 +++++++--- pkg/sourcetransformer/service.go | 10 +++++++--- 7 files changed, 48 insertions(+), 20 deletions(-) diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 350fc8d0..dd187bab 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -23,7 +24,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR(batchmap)") +var errBatchMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(batchmap)") // Service implements the proto gen server interface and contains the map operation handler. type Service struct { @@ -66,7 +67,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { } log.Printf("Stopping the BatchMapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "%s", err.Error()) + return err } } } @@ -161,7 +162,10 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer defer func() { if r := recover(); r != nil { log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v %v", errBatchMapHandlerPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errBatchMapHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 4880f16f..ed3bb72d 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -23,7 +24,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -var errMapHandlerPanic = errors.New("USER_CODE_ERROR(map)") +var errMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(map)") // Service implements the proto gen server interface and contains the map operation // handler. @@ -122,7 +123,7 @@ outer: if err := g.Wait(); err != nil { log.Printf("Stopping the MapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "%s", err.Error()) + return err } // check if there was an error while reading from the stream @@ -158,7 +159,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res defer func() { if r := recover(); r != nil { log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v %v", errMapHandlerPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errMapHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 8bb64726..5cda6e32 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -9,6 +9,7 @@ import ( "runtime/debug" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -23,7 +24,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR(mapstream)") +var errMapStreamHandlerPanic = errors.New("UDF_EXECUTION_ERROR(mapstream)") // Service implements the proto gen server interface and contains the map // streaming function. @@ -113,7 +114,7 @@ outer: if err := g.Wait(); err != nil { log.Printf("Stopping the MapFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "%s", err.Error()) + return err } // check if there was an error while reading from the stream @@ -129,7 +130,10 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes defer func() { if r := recover(); r != nil { log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v %v", errMapStreamHandlerPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() return } }() diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 941ea5e7..e4b038b2 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -6,6 +6,7 @@ import ( "log" "runtime/debug" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -21,7 +22,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/sideinput-server-info" ) -var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR(side input)") +var errSideInputHandlerPanic = errors.New("UDF_EXECUTION_ERROR(side input)") // Service implements the proto gen server interface and contains the retrieve operation handler type Service struct { @@ -42,7 +43,10 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (res if r := recover(); r != nil { log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} - err = status.Errorf(codes.Internal, "%s: %v %v", errSideInputHandlerPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() messageSi := fs.Retriever.RetrieveSideInput(ctx) diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index 3011c9a1..b0052b9c 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -10,6 +10,7 @@ import ( "time" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -28,7 +29,7 @@ const ( UDContainerFallbackSink = "fb-udsink" ) -var errSinkHandlerPanic = errors.New("USER_CODE_ERROR(sink)") +var errSinkHandlerPanic = errors.New("UDF_EXECUTION_ERROR(sink)") // handlerDatum implements the Datum interface and is used in the sink functions. type handlerDatum struct { @@ -105,7 +106,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { } log.Printf("Stopping the SinkFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "%s", err.Error()) + return err } } } @@ -197,7 +198,10 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer defer func() { if r := recover(); r != nil { log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v %v", errSinkHandlerPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errSinkHandlerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }() responses := fs.Sinker.Sink(ctx, datumStreamCh) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index b9de7077..60e3668f 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -10,6 +10,7 @@ import ( "time" "golang.org/x/sync/errgroup" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -32,7 +33,7 @@ type Service struct { shutdownCh chan<- struct{} } -var errSourceReadPanic = errors.New("USER_CODE_ERROR(source)") +var errSourceReadPanic = errors.New("UDF_EXECUTION_ERROR(source)") // ReadFn reads the data from the source. func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { @@ -50,7 +51,7 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { } log.Printf("error processing requests: %v", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "%s", err.Error()) + return err } } } @@ -124,7 +125,10 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour defer func() { if r := recover(); r != nil { log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v %v", errSourceReadPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errSourceReadPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() return } close(messageCh) diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index c7cbc638..4244ac48 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + epb "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -38,7 +39,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, return &v1.ReadyResponse{Ready: true}, nil } -var errTransformerPanic = errors.New("USER_CODE_ERROR(transformer)") +var errTransformerPanic = errors.New("UDF_EXECUTION_ERROR(transformer)") // recvWithContext wraps stream.Recv() to respect context cancellation. func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) { @@ -123,7 +124,7 @@ outer: if err := grp.Wait(); err != nil { log.Printf("Stopping the SourceTransformFn with err, %s", err) fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Internal, "%s", err.Error()) + return err } // check if there was an error while reading from the stream @@ -159,7 +160,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq defer func() { if r := recover(); r != nil { log.Printf("panic inside handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v %v", errTransformerPanic, r, string(debug.Stack())) + st, _ := status.Newf(codes.Internal, "%s: %v", errTransformerPanic, r).WithDetails(&epb.DebugInfo{ + Detail: string(debug.Stack()), + }) + err = st.Err() } }()