diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 2ab92a3f..350fc8d0 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -23,7 +23,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR: batch map handler panicked") +var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR(batchmap)") // Service implements the proto gen server interface and contains the map operation handler. type Service struct { @@ -161,7 +161,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer defer func() { if r := recover(); r != nil { log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v", errBatchMapHandlerPanic, r) + err = fmt.Errorf("%s: %v %v", errBatchMapHandlerPanic, r, string(debug.Stack())) } }() diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 823f1c5b..4880f16f 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -23,7 +23,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -var errMapHandlerPanic = errors.New("USER_CODE_ERROR: map handler panicked") +var errMapHandlerPanic = errors.New("USER_CODE_ERROR(map)") // Service implements the proto gen server interface and contains the map operation // handler. @@ -158,7 +158,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res defer func() { if r := recover(); r != nil { log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v", errMapHandlerPanic, r) + err = fmt.Errorf("%s: %v %v", errMapHandlerPanic, r, string(debug.Stack())) } }() diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index ce520252..8bb64726 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -23,7 +23,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) -var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR: map stream handler panicked") +var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR(mapstream)") // Service implements the proto gen server interface and contains the map // streaming function. @@ -129,7 +129,7 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes defer func() { if r := recover(); r != nil { log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v", errMapStreamHandlerPanic, r) + err = fmt.Errorf("%s: %v %v", errMapStreamHandlerPanic, r, string(debug.Stack())) return } }() diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 3738eb99..941ea5e7 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -21,7 +21,7 @@ const ( serverInfoFilePath = "/var/run/numaflow/sideinput-server-info" ) -var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR: side input handler panicked") +var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR(side input)") // Service implements the proto gen server interface and contains the retrieve operation handler type Service struct { @@ -42,7 +42,7 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (res if r := recover(); r != nil { log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack())) fs.shutdownCh <- struct{}{} - err = status.Errorf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r) + err = status.Errorf(codes.Internal, "%s: %v %v", errSideInputHandlerPanic, r, string(debug.Stack())) } }() messageSi := fs.Retriever.RetrieveSideInput(ctx) diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index 6067843d..3011c9a1 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -28,7 +28,7 @@ const ( UDContainerFallbackSink = "fb-udsink" ) -var errSinkHandlerPanic = errors.New("USER_CODE_ERROR: sink handler panicked") +var errSinkHandlerPanic = errors.New("USER_CODE_ERROR(sink)") // handlerDatum implements the Datum interface and is used in the sink functions. type handlerDatum struct { @@ -197,7 +197,7 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer defer func() { if r := recover(); r != nil { log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v", errSinkHandlerPanic, r) + err = fmt.Errorf("%s: %v %v", errSinkHandlerPanic, r, string(debug.Stack())) } }() responses := fs.Sinker.Sink(ctx, datumStreamCh) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index fff21ffb..b9de7077 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -32,7 +32,7 @@ type Service struct { shutdownCh chan<- struct{} } -var errSourceReadPanic = errors.New("USER_CODE_ERROR: source read function panicked") +var errSourceReadPanic = errors.New("USER_CODE_ERROR(source)") // ReadFn reads the data from the source. func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { @@ -124,7 +124,7 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour defer func() { if r := recover(); r != nil { log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v", errSourceReadPanic, r) + err = fmt.Errorf("%s: %v %v", errSourceReadPanic, r, string(debug.Stack())) return } close(messageCh) diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index a6ffb24a..c7cbc638 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -38,7 +38,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, return &v1.ReadyResponse{Ready: true}, nil } -var errTransformerPanic = errors.New("USER_CODE_ERROR: transformer function panicked") +var errTransformerPanic = errors.New("USER_CODE_ERROR(transformer)") // recvWithContext wraps stream.Recv() to respect context cancellation. func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) { @@ -159,7 +159,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq defer func() { if r := recover(); r != nil { log.Printf("panic inside handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("%s: %v", errTransformerPanic, r) + err = fmt.Errorf("%s: %v %v", errTransformerPanic, r, string(debug.Stack())) } }()