Skip to content

Commit

Permalink
handle panic errors and fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Jan 29, 2025
1 parent e93fdf3 commit 3277ee2
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 22 deletions.
8 changes: 6 additions & 2 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -21,6 +23,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR: batch map handler panicked")

// Service implements the proto gen server interface and contains the map operation handler.
type Service struct {
mappb.UnimplementedMapServer
Expand Down Expand Up @@ -62,7 +66,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
}
log.Printf("Stopping the BatchMapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return err
return status.Errorf(codes.Internal, "%s", err.Error())
}
}
}
Expand Down Expand Up @@ -157,7 +161,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside batch map handler: %v", r)
err = fmt.Errorf("%s: %v", errBatchMapHandlerPanic, r)
}
}()

Expand Down
5 changes: 3 additions & 2 deletions pkg/mapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
panic("map failed")
panic(panicMssg)
}),
// panic in the transformer causes the server to send a shutdown signal to shutdownCh channel.
// The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else.
Expand Down Expand Up @@ -288,6 +289,6 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed"))
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapHandlerPanic, panicMssg))
require.Equal(t, expectedStatus, gotStatus)
}
5 changes: 3 additions & 2 deletions pkg/mapstreamer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
panic("map failed")
panic(panicMssg)
}),
shutdownCh: make(chan<- struct{}, 1),
}
Expand Down Expand Up @@ -357,7 +358,7 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed"))
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, panicMssg))
require.Equal(t, expectedStatus, gotStatus)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
}

if recvErr != nil {
statusErr := status.Errorf(codes.Internal, recvErr.Error())
statusErr := status.Errorf(codes.Internal, "%s", recvErr.Error())
return statusErr
}

Expand All @@ -70,7 +70,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// also append the datum to the task
err := taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE:
Expand All @@ -80,21 +80,21 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// append the datum to the task
err := taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_MERGE:
// merge the tasks
err := taskManager.MergeTasks(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND:
// expand the task
err := taskManager.ExpandTask(d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -107,7 +107,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// wait for the go routine which reads from the output channel and sends to the stream to return
err := g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package sideinput

import (
"context"
"errors"
"log"
"runtime/debug"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
Expand All @@ -18,6 +21,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR: side input handler panicked")

// Service implements the proto gen server interface and contains the retrieve operation handler
type Service struct {
sideinputpb.UnimplementedSideInputServer
Expand All @@ -31,19 +36,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR
}

// RetrieveSideInput applies the function for each side input retrieval request.
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (resp *sideinputpb.SideInputResponse, err error) {
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
err = status.Errorf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r)
}
}()
messageSi := fs.Retriever.RetrieveSideInput(ctx)
var element *sideinputpb.SideInputResponse
element = &sideinputpb.SideInputResponse{
resp = &sideinputpb.SideInputResponse{
Value: messageSi.value,
NoBroadcast: messageSi.noBroadcast,
}
return element, nil
return resp, nil
}

0 comments on commit 3277ee2

Please sign in to comment.