Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update panic error handling in containers #173

Merged
merged 10 commits into from
Feb 4, 2025
10 changes: 9 additions & 1 deletion pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -21,6 +24,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errBatchMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(batchmap)")

// Service implements the proto gen server interface and contains the map operation handler.
type Service struct {
mappb.UnimplementedMapServer
Expand Down Expand Up @@ -157,7 +162,10 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside batch map handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside batch map handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errBatchMapHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down
12 changes: 9 additions & 3 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,6 +24,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapHandlerPanic = errors.New("UDF_EXECUTION_ERROR(map)")

// Service implements the proto gen server interface and contains the map operation
// handler.
type Service struct {
Expand Down Expand Up @@ -120,12 +123,12 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
return err
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
Expand Down Expand Up @@ -156,7 +159,10 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
err = status.Errorf(codes.Internal, "panic inside map handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errMapHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()

Expand Down
11 changes: 8 additions & 3 deletions pkg/mapper/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -254,9 +255,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages {
panic("map failed")
panic(panicMssg)
}),
// panic in the transformer causes the server to send a shutdown signal to shutdownCh channel.
// The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else.
Expand Down Expand Up @@ -288,6 +290,9 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed"))
require.Equal(t, expectedStatus, gotStatus)
gotMessage := gotStatus.Message()
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapHandlerPanic, panicMssg))
expectedMessage := expectedStatus.Message()
require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal")
require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message")
}
12 changes: 9 additions & 3 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"runtime/debug"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand All @@ -23,6 +24,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

var errMapStreamHandlerPanic = errors.New("UDF_EXECUTION_ERROR(mapstream)")

// Service implements the proto gen server interface and contains the map
// streaming function.
type Service struct {
Expand Down Expand Up @@ -111,12 +114,12 @@ outer:
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
return err
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, readErr.Error())
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
Expand All @@ -127,7 +130,10 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside mapStream handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
return
}
}()
Expand Down
11 changes: 8 additions & 3 deletions pkg/mapstreamer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -326,9 +327,10 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) {
}

func TestService_MapFn_Panic(t *testing.T) {
panicMssg := "map failed"
svc := &Service{
MapperStream: MapStreamerFunc(func(ctx context.Context, keys []string, datum Datum, messageCh chan<- Message) {
panic("map failed")
panic(panicMssg)
}),
shutdownCh: make(chan<- struct{}, 1),
}
Expand Down Expand Up @@ -357,8 +359,11 @@ func TestService_MapFn_Panic(t *testing.T) {
_, err = stream.Recv()
require.Error(t, err, "Expected error while receiving message from the stream")
gotStatus, _ := status.FromError(err)
expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: panic inside mapStream handler: map failed"))
require.Equal(t, expectedStatus, gotStatus)
gotMessage := gotStatus.Message()
expectedStatus := status.Convert(status.Errorf(codes.Internal, "%s: %v", errMapStreamHandlerPanic, panicMssg))
expectedMessage := expectedStatus.Message()
require.Equal(t, expectedStatus.Code(), gotStatus.Code(), "Expected error codes to be equal")
require.True(t, strings.HasPrefix(gotMessage, expectedMessage), "Expected error message to start with the expected message")
}

func TestService_MapFn_MultipleRequestsAndResponses(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,14 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// create a new reduce task and start the reduce operation
err = taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case reducepb.ReduceRequest_WindowOperation_APPEND:
// append the datum to the reduce task
err = taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -95,7 +95,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
// wait for the go routine which reads from the output channel and sends to the stream to return
err = g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
}

if recvErr != nil {
statusErr := status.Errorf(codes.Internal, recvErr.Error())
statusErr := status.Errorf(codes.Internal, "%s", recvErr.Error())
return statusErr
}

Expand All @@ -70,7 +70,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// also append the datum to the task
err := taskManager.CreateTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_CLOSE:
Expand All @@ -80,21 +80,21 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// append the datum to the task
err := taskManager.AppendToTask(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_MERGE:
// merge the tasks
err := taskManager.MergeTasks(ctx, d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
case sessionreducepb.SessionReduceRequest_WindowOperation_EXPAND:
// expand the task
err := taskManager.ExpandTask(d)
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}
}
Expand All @@ -107,7 +107,7 @@ func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionR
// wait for the go routine which reads from the output channel and sends to the stream to return
err := g.Wait()
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
statusErr := status.Errorf(codes.Internal, "%s", err.Error())
return statusErr
}

Expand Down
17 changes: 13 additions & 4 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package sideinput

import (
"context"
"errors"
"log"
"runtime/debug"

epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
Expand All @@ -18,6 +22,8 @@ const (
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

var errSideInputHandlerPanic = errors.New("UDF_EXECUTION_ERROR(side input)")

// Service implements the proto gen server interface and contains the retrieve operation handler
type Service struct {
sideinputpb.UnimplementedSideInputServer
Expand All @@ -31,19 +37,22 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR
}

// RetrieveSideInput applies the function for each side input retrieval request.
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) {
func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (resp *sideinputpb.SideInputResponse, err error) {
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()
messageSi := fs.Retriever.RetrieveSideInput(ctx)
var element *sideinputpb.SideInputResponse
element = &sideinputpb.SideInputResponse{
resp = &sideinputpb.SideInputResponse{
Value: messageSi.value,
NoBroadcast: messageSi.noBroadcast,
}
return element, nil
return resp, nil
}
10 changes: 9 additions & 1 deletion pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"time"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
Expand All @@ -26,6 +29,8 @@ const (
UDContainerFallbackSink = "fb-udsink"
)

var errSinkHandlerPanic = errors.New("UDF_EXECUTION_ERROR(sink)")

// handlerDatum implements the Datum interface and is used in the sink functions.
type handlerDatum struct {
id string
Expand Down Expand Up @@ -193,7 +198,10 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside sink handler: %v", r)
st, _ := status.Newf(codes.Internal, "%s: %v", errSinkHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
err = st.Err()
}
}()
responses := fs.Sinker.Sink(ctx, datumStreamCh)
Expand Down
Loading