Skip to content

Commit

Permalink
chore: update panic error handling in containers (#173)
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 authored Feb 4, 2025
1 parent 2b9e9e9 commit 1d29855
Show file tree
Hide file tree
Showing 14 changed files with 103 additions and 38 deletions.
10 changes: 9 additions & 1 deletion pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -21,6 +24,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errBatchMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(batchmap)")

// Service implements the proto gen server interface and contains the map operation handler.
type Service struct {
mappb.UnimplementedMapServer
Expand Down Expand Up @@ -157,7 +162,10 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside batch map handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errBatchMapHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down
12 changes: 9 additions & 3 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,6 +24,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(map)")

// Service implements the proto gen server interface and contains the map operation
// handler.
type Service struct {
Expand Down Expand Up @@ -120,12 +123,12 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
return err
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
Expand Down Expand Up @@ -156,7 +159,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
err = status.Errorf(codes.Internal, "panic inside map handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errMapHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down
11 changes: 8 additions & 3 deletions pkg/mapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -254,9 +255,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
panic("map failed")
panic(panicMssg)
}),
// panic in the transformer causes the server to send a shutdown signal to shutdownCh channel.
// The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else.
Expand Down Expand Up @@ -288,6 +290,9 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed"))
require.Equal(t, expectedStatus, gotStatus)
gotMessage := gotStatus.Message()
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapHandlerPanic, panicMssg))
expectedMessage := expectedStatus.Message()
require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal")
require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message")
}
12 changes: 9 additions & 3 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,6 +24,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapStreamHandlerPanic = errors.New("UDF_EXECUTION_ERROR(mapstream)")

// Service implements the proto gen server interface and contains the map
// streaming function.
type Service struct {
Expand Down Expand Up @@ -111,12 +114,12 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
return err
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
Expand All @@ -127,7 +130,10 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside mapStream handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
return
}
}()
Expand Down
11 changes: 8 additions & 3 deletions pkg/mapstreamer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -326,9 +327,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
panic("map failed")
panic(panicMssg)
}),
shutdownCh: make(chan<- struct{}, 1),
}
Expand Down Expand Up @@ -357,8 +359,11 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed"))
require.Equal(t, expectedStatus, gotStatus)
gotMessage := gotStatus.Message()
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, panicMssg))
expectedMessage := expectedStatus.Message()
require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal")
require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message")
}

func TestService_MapFn_MultipleRequestsAndResponses(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
}

if recvErr != nil {
statusErr := status.Errorf(codes.Internal, recvErr.Error())
statusErr := status.Errorf(codes.Internal, "%s", recvErr.Error())
return statusErr
}

Expand All @@ -70,7 +70,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// also append the datum to the task
err := taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE:
Expand All @@ -80,21 +80,21 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// append the datum to the task
err := taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_MERGE:
// merge the tasks
err := taskManager.MergeTasks(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND:
// expand the task
err := taskManager.ExpandTask(d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -107,7 +107,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// wait for the go routine which reads from the output channel and sends to the stream to return
err := g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package sideinput

import (
"context"
"errors"
"log"
"runtime/debug"

epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
Expand All @@ -18,6 +22,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

var errSideInputHandlerPanic = errors.New("UDF_EXECUTION_ERROR(side input)")

// Service implements the proto gen server interface and contains the retrieve operation handler
type Service struct {
sideinputpb.UnimplementedSideInputServer
Expand All @@ -31,19 +37,22 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR
}

// RetrieveSideInput applies the function for each side input retrieval request.
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (resp *sideinputpb.SideInputResponse, err error) {
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()
messageSi := fs.Retriever.RetrieveSideInput(ctx)
var element *sideinputpb.SideInputResponse
element = &sideinputpb.SideInputResponse{
resp = &sideinputpb.SideInputResponse{
Value: messageSi.value,
NoBroadcast: messageSi.noBroadcast,
}
return element, nil
return resp, nil
}
10 changes: 9 additions & 1 deletion pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"time"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
Expand All @@ -26,6 +29,8 @@ const (
UDContainerFallbackSink = "fb-udsink"
)

var errSinkHandlerPanic = errors.New("UDF_EXECUTION_ERROR(sink)")

// handlerDatum implements the Datum interface and is used in the sink functions.
type handlerDatum struct {
id string
Expand Down Expand Up @@ -193,7 +198,10 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside sink handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errSinkHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()
responses := fs.Sinker.Sink(ctx, datumStreamCh)
Expand Down
Loading

0 comments on commit 1d29855

Please sign in to comment.