Skip to content

Commit

Permalink
add debug stack to err log
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Jan 31, 2025
1 parent 3277ee2 commit f3fc30e
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR: batch map handler panicked")
var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR(batchmap)")

// Service implements the proto gen server interface and contains the map operation handler.
type Service struct {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v", errBatchMapHandlerPanic, r)
err = fmt.Errorf("%s: %v %v", errBatchMapHandlerPanic, r, string(debug.Stack()))
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapHandlerPanic = errors.New("USER_CODE_ERROR: map handler panicked")
var errMapHandlerPanic = errors.New("USER_CODE_ERROR(map)")

// Service implements the proto gen server interface and contains the map operation
// handler.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v", errMapHandlerPanic, r)
err = fmt.Errorf("%s: %v %v", errMapHandlerPanic, r, string(debug.Stack()))
}
}()

Expand Down
4 changes: 2 additions & 2 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR: map stream handler panicked")
var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR(mapstream)")

// Service implements the proto gen server interface and contains the map
// streaming function.
Expand Down Expand Up @@ -129,7 +129,7 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v", errMapStreamHandlerPanic, r)
err = fmt.Errorf("%s: %v %v", errMapStreamHandlerPanic, r, string(debug.Stack()))
return
}
}()
Expand Down
4 changes: 2 additions & 2 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR: side input handler panicked")
var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR(side input)")

// Service implements the proto gen server interface and contains the retrieve operation handler
type Service struct {
Expand All @@ -42,7 +42,7 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (res
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
err = status.Errorf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r)
err = status.Errorf(codes.Internal, "%s: %v %v", errSideInputHandlerPanic, r, string(debug.Stack()))
}
}()
messageSi := fs.Retriever.RetrieveSideInput(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (
UDContainerFallbackSink = "fb-udsink"
)

var errSinkHandlerPanic = errors.New("USER_CODE_ERROR: sink handler panicked")
var errSinkHandlerPanic = errors.New("USER_CODE_ERROR(sink)")

// handlerDatum implements the Datum interface and is used in the sink functions.
type handlerDatum struct {
Expand Down Expand Up @@ -197,7 +197,7 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v", errSinkHandlerPanic, r)
err = fmt.Errorf("%s: %v %v", errSinkHandlerPanic, r, string(debug.Stack()))
}
}()
responses := fs.Sinker.Sink(ctx, datumStreamCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Service struct {
shutdownCh chan<- struct{}
}

var errSourceReadPanic = errors.New("USER_CODE_ERROR: source read function panicked")
var errSourceReadPanic = errors.New("USER_CODE_ERROR(source)")

// ReadFn reads the data from the source.
func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
Expand Down Expand Up @@ -124,7 +124,7 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v", errSourceReadPanic, r)
err = fmt.Errorf("%s: %v %v", errSourceReadPanic, r, string(debug.Stack()))
return
}
close(messageCh)
Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse,
return &v1.ReadyResponse{Ready: true}, nil
}

var errTransformerPanic = errors.New("USER_CODE_ERROR: transformer function panicked")
var errTransformerPanic = errors.New("USER_CODE_ERROR(transformer)")

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) {
Expand Down Expand Up @@ -159,7 +159,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v", errTransformerPanic, r)
err = fmt.Errorf("%s: %v %v", errTransformerPanic, r, string(debug.Stack()))
}
}()

Expand Down

0 comments on commit f3fc30e

Please sign in to comment.