Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update panic error handling in containers #173

Merged
merged 10 commits into from
Feb 4, 2025
8 changes: 6 additions & 2 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -21,6 +23,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errBatchMapHandlerPanic = errors.New("USER_CODE_ERROR: batch map handler panicked")

// Service implements the proto gen server interface and contains the map operation handler.
type Service struct {
mappb.UnimplementedMapServer
Expand Down Expand Up @@ -62,7 +66,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
}
log.Printf("Stopping the BatchMapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return err
return status.Errorf(codes.Internal, "%s", err.Error())
}
}
}
Expand Down Expand Up @@ -157,7 +161,7 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside batch map handler: %v", r)
err = fmt.Errorf("%s: %v", errBatchMapHandlerPanic, r)
}
}()

Expand Down
8 changes: 5 additions & 3 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapHandlerPanic = errors.New("USER_CODE_ERROR: map handler panicked")

// Service implements the proto gen server interface and contains the map operation
// handler.
type Service struct {
Expand Down Expand Up @@ -120,12 +122,12 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
return status.Errorf(codes.Internal, "%s", err.Error())
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
Expand Down Expand Up @@ -156,7 +158,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
err = status.Errorf(codes.Internal, "panic inside map handler: %v", r)
err = fmt.Errorf("%s: %v", errMapHandlerPanic, r)
}
}()

Expand Down
5 changes: 3 additions & 2 deletions pkg/mapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
panic("map failed")
panic(panicMssg)
}),
// panic in the transformer causes the server to send a shutdown signal to shutdownCh channel.
// The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else.
Expand Down Expand Up @@ -288,6 +289,6 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed"))
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapHandlerPanic, panicMssg))
require.Equal(t, expectedStatus, gotStatus)
}
8 changes: 5 additions & 3 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapStreamHandlerPanic = errors.New("USER_CODE_ERROR: map stream handler panicked")

// Service implements the proto gen server interface and contains the map
// streaming function.
type Service struct {
Expand Down Expand Up @@ -111,12 +113,12 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
return status.Errorf(codes.Internal, "%s", err.Error())
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
Expand All @@ -127,7 +129,7 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside mapStream handler: %v", r)
err = fmt.Errorf("%s: %v", errMapStreamHandlerPanic, r)
return
}
}()
Expand Down
5 changes: 3 additions & 2 deletions pkg/mapstreamer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
panic("map failed")
panic(panicMssg)
}),
shutdownCh: make(chan<- struct{}, 1),
}
Expand Down Expand Up @@ -357,7 +358,7 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed"))
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, panicMssg))
require.Equal(t, expectedStatus, gotStatus)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
}

if recvErr != nil {
statusErr := status.Errorf(codes.Internal, recvErr.Error())
statusErr := status.Errorf(codes.Internal, "%s", recvErr.Error())
return statusErr
}

Expand All @@ -70,7 +70,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// also append the datum to the task
err := taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE:
Expand All @@ -80,21 +80,21 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// append the datum to the task
err := taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_MERGE:
// merge the tasks
err := taskManager.MergeTasks(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND:
// expand the task
err := taskManager.ExpandTask(d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -107,7 +107,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// wait for the go routine which reads from the output channel and sends to the stream to return
err := g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package sideinput

import (
"context"
"errors"
"log"
"runtime/debug"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
Expand All @@ -18,6 +21,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

var errSideInputHandlerPanic = errors.New("USER_CODE_ERROR: side input handler panicked")

// Service implements the proto gen server interface and contains the retrieve operation handler
type Service struct {
sideinputpb.UnimplementedSideInputServer
Expand All @@ -31,19 +36,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR
}

// RetrieveSideInput applies the function for each side input retrieval request.
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (resp *sideinputpb.SideInputResponse, err error) {
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
err = status.Errorf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r)
}
}()
messageSi := fs.Retriever.RetrieveSideInput(ctx)
var element *sideinputpb.SideInputResponse
element = &sideinputpb.SideInputResponse{
resp = &sideinputpb.SideInputResponse{
Value: messageSi.value,
NoBroadcast: messageSi.noBroadcast,
}
return element, nil
return resp, nil
}
8 changes: 6 additions & 2 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
Expand All @@ -26,6 +28,8 @@ const (
UDContainerFallbackSink = "fb-udsink"
)

var errSinkHandlerPanic = errors.New("USER_CODE_ERROR: sink handler panicked")

// handlerDatum implements the Datum interface and is used in the sink functions.
type handlerDatum struct {
id string
Expand Down Expand Up @@ -101,7 +105,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
}
log.Printf("Stopping the SinkFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return err
return status.Errorf(codes.Internal, "%s", err.Error())
}
}
}
Expand Down Expand Up @@ -193,7 +197,7 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside sink handler: %v", r)
err = fmt.Errorf("%s: %v", errSinkHandlerPanic, r)
}
}()
responses := fs.Sinker.Sink(ctx, datumStreamCh)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -30,6 +32,8 @@ type Service struct {
shutdownCh chan<- struct{}
}

var errSourceReadPanic = errors.New("USER_CODE_ERROR: source read function panicked")

// ReadFn reads the data from the source.
func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
ctx := stream.Context()
Expand All @@ -46,7 +50,7 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
}
log.Printf("error processing requests: %v", err)
fs.shutdownCh <- struct{}{}
return err
return status.Errorf(codes.Internal, "%s", err.Error())
}
}
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside source handler: %v", r)
err = fmt.Errorf("%s: %v", errSourceReadPanic, r)
return
}
close(messageCh)
Expand Down
File renamed without changes.
9 changes: 5 additions & 4 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse,
return &v1.ReadyResponse{Ready: true}, nil
}

var errTransformerPanic = errors.New("transformer function panicked")
var errTransformerPanic = errors.New("USER_CODE_ERROR: transformer function panicked")

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) {
Expand Down Expand Up @@ -123,13 +123,14 @@ outer:
if err := grp.Wait(); err != nil {
log.Printf("Stopping the SourceTransformFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, err.Error())
return status.Errorf(codes.Internal, "%s", err.Error())
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
}

Expand Down Expand Up @@ -158,7 +159,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside handler: %v %v", r, string(debug.Stack()))
err = errTransformerPanic
err = fmt.Errorf("%s: %v", errTransformerPanic, r)
}
}()

Expand Down
Loading