Skip to content

Commit

Permalink
source and sink changes
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Jan 28, 2025
1 parent f93025d commit c6ae6bc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
8 changes: 6 additions & 2 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
Expand All @@ -26,6 +28,8 @@ const (
UDContainerFallbackSink = "fb-udsink"
)

var errSinkHandlerPanic = errors.New("USER_CODE_ERROR: sink handler panicked")

// handlerDatum implements the Datum interface and is used in the sink functions.
type handlerDatum struct {
id string
Expand Down Expand Up @@ -101,7 +105,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
}
log.Printf("Stopping the SinkFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return err
return status.Errorf(codes.Internal, "%s", err.Error())
}
}
}
Expand Down Expand Up @@ -193,7 +197,7 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside sink handler: %v", r)
err = fmt.Errorf("%s: %v", errSinkHandlerPanic, r)
}
}()
responses := fs.Sinker.Sink(ctx, datumStreamCh)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -30,6 +32,8 @@ type Service struct {
shutdownCh chan<- struct{}
}

var errSourceReadPanic = errors.New("USER_CODE_ERROR: source read function panicked")

// ReadFn reads the data from the source.
func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
ctx := stream.Context()
Expand All @@ -46,7 +50,7 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
}
log.Printf("error processing requests: %v", err)
fs.shutdownCh <- struct{}{}
return err
return status.Errorf(codes.Internal, "%s", err.Error())
}
}
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (fs *Service) receiveReadRequests(ctx context.Context, stream sourcepb.Sour
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
err = fmt.Errorf("panic inside source handler: %v", r)
err = fmt.Errorf("%s: %v", errSourceReadPanic, r)
return
}
close(messageCh)
Expand Down

0 comments on commit c6ae6bc

Please sign in to comment.