Skip to content

Commit

Permalink
added debug stack in details (debug info) of gRPC status
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Feb 3, 2025
1 parent b6e0fe8 commit 074bb1f
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 20 deletions.
10 changes: 7 additions & 3 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,7 +24,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR(batchmap)")
var errBatchMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(batchmap)")

// Service implements the proto gen server interface and contains the map operation handler.
type Service struct {
Expand Down Expand Up @@ -66,7 +67,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
}
log.Printf("Stopping the BatchMapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
return err
}
}
}
Expand Down Expand Up @@ -161,7 +162,10 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v %v", errBatchMapHandlerPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errBatchMapHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down
10 changes: 7 additions & 3 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,7 +24,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapHandlerPanic = errors.New("USER_CODE_ERROR(map)")
var errMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(map)")

// Service implements the proto gen server interface and contains the map operation
// handler.
Expand Down Expand Up @@ -122,7 +123,7 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
return err
}

// check if there was an error while reading from the stream
Expand Down Expand Up @@ -158,7 +159,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v %v", errMapHandlerPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errMapHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down
10 changes: 7 additions & 3 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,7 +24,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR(mapstream)")
var errMapStreamHandlerPanic = errors.New("UDF_EXECUTION_ERROR(mapstream)")

// Service implements the proto gen server interface and contains the map
// streaming function.
Expand Down Expand Up @@ -113,7 +114,7 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
return err
}

// check if there was an error while reading from the stream
Expand All @@ -129,7 +130,10 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v %v", errMapStreamHandlerPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
return
}
}()
Expand Down
8 changes: 6 additions & 2 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"runtime/debug"

epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -21,7 +22,7 @@ const (
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR(side input)")
var errSideInputHandlerPanic = errors.New("UDF_EXECUTION_ERROR(side input)")

// Service implements the proto gen server interface and contains the retrieve operation handler
type Service struct {
Expand All @@ -42,7 +43,10 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (res
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
err = status.Errorf(codes.Internal, "%s: %v %v", errSideInputHandlerPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()
messageSi := fs.Retriever.RetrieveSideInput(ctx)
Expand Down
10 changes: 7 additions & 3 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -28,7 +29,7 @@ const (
UDContainerFallbackSink = "fb-udsink"
)

var errSinkHandlerPanic = errors.New("USER_CODE_ERROR(sink)")
var errSinkHandlerPanic = errors.New("UDF_EXECUTION_ERROR(sink)")

// handlerDatum implements the Datum interface and is used in the sink functions.
type handlerDatum struct {
Expand Down Expand Up @@ -105,7 +106,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
}
log.Printf("Stopping the SinkFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
return err
}
}
}
Expand Down Expand Up @@ -197,7 +198,10 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v %v", errSinkHandlerPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errSinkHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()
responses := fs.Sinker.Sink(ctx, datumStreamCh)
Expand Down
10 changes: 7 additions & 3 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -32,7 +33,7 @@ type Service struct {
shutdownCh chan<- struct{}
}

var errSourceReadPanic = errors.New("USER_CODE_ERROR(source)")
var errSourceReadPanic = errors.New("UDF_EXECUTION_ERROR(source)")

// ReadFn reads the data from the source.
func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
Expand All @@ -50,7 +51,7 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
}
log.Printf("error processing requests: %v", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
return err
}
}
}
Expand Down Expand Up @@ -124,7 +125,10 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v %v", errSourceReadPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errSourceReadPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
return
}
close(messageCh)
Expand Down
10 changes: 7 additions & 3 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -38,7 +39,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse,
return &v1.ReadyResponse{Ready: true}, nil
}

var errTransformerPanic = errors.New("USER_CODE_ERROR(transformer)")
var errTransformerPanic = errors.New("UDF_EXECUTION_ERROR(transformer)")

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) {
Expand Down Expand Up @@ -123,7 +124,7 @@ outer:
if err := grp.Wait(); err != nil {
log.Printf("Stopping the SourceTransformFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
return err
}

// check if there was an error while reading from the stream
Expand Down Expand Up @@ -159,7 +160,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("%s: %v %v", errTransformerPanic, r, string(debug.Stack()))
st, _ := status.Newf(codes.Internal, "%s: %v", errTransformerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down

0 comments on commit 074bb1f

Please sign in to comment.