From c6ae6bcd95ae6f091523bf829ac802765e929794 Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Tue, 28 Jan 2025 10:07:17 +0530 Subject: [PATCH] source and sink changes Signed-off-by: adarsh0728 --- pkg/sinker/service.go | 8 ++++++-- pkg/sourcer/service.go | 8 ++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index 8c8c7f9a..6067843d 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -10,6 +10,8 @@ import ( "time" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" @@ -26,6 +28,8 @@ const ( UDContainerFallbackSink = "fb-udsink" ) +var errSinkHandlerPanic = errors.New("USER_CODE_ERROR: sink handler panicked") + // handlerDatum implements the Datum interface and is used in the sink functions. type handlerDatum struct { id string @@ -101,7 +105,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { } log.Printf("Stopping the SinkFn with err, %s", err) fs.shutdownCh <- struct{}{} - return err + return status.Errorf(codes.Internal, "%s", err.Error()) } } } @@ -193,7 +197,7 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer defer func() { if r := recover(); r != nil { log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside sink handler: %v", r) + err = fmt.Errorf("%s: %v", errSinkHandlerPanic, r) } }() responses := fs.Sinker.Sink(ctx, datumStreamCh) diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 9453c014..fff21ffb 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -10,6 +10,8 @@ import ( "time" "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -30,6 +32,8 @@ type Service struct { shutdownCh chan<- struct{} } +var errSourceReadPanic = errors.New("USER_CODE_ERROR: source read function panicked") + // ReadFn reads the data from the source. func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { ctx := stream.Context() @@ -46,7 +50,7 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { } log.Printf("error processing requests: %v", err) fs.shutdownCh <- struct{}{} - return err + return status.Errorf(codes.Internal, "%s", err.Error()) } } } @@ -120,7 +124,7 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour defer func() { if r := recover(); r != nil { log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) - err = fmt.Errorf("panic inside source handler: %v", r) + err = fmt.Errorf("%s: %v", errSourceReadPanic, r) return } close(messageCh)