From 43d37a5704e77ce37e8ebe4ba9f743f8536a40bd Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Mon, 27 Jan 2025 21:27:41 +0530 Subject: [PATCH] gRPC error code changes & added prefix in caseof handleRequest panics Signed-off-by: adarsh0728 --- .../{messaget.go => message.go} | 0 pkg/sourcetransformer/service.go | 20 +++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) rename pkg/sourcetransformer/{messaget.go => message.go} (100%) diff --git a/pkg/sourcetransformer/messaget.go b/pkg/sourcetransformer/message.go similarity index 100% rename from pkg/sourcetransformer/messaget.go rename to pkg/sourcetransformer/message.go diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index c41c65c5..9d64b165 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -38,7 +38,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, return &v1.ReadyResponse{Ready: true}, nil } -var errTransformerPanic = errors.New("transformer function panicked") +var errTransformerPanic = errors.New("USER_CODE_ERROR: transformer function panicked") // recvWithContext wraps stream.Recv() to respect context cancellation. func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) { @@ -88,7 +88,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn case resp := <-senderCh: if err := stream.Send(resp); err != nil { log.Printf("Failed to send response: %v", err) - return fmt.Errorf("failed to send response to client: %w", err) + return status.Errorf(codes.Unavailable, "failed to send response to client: %v", err) } } } @@ -119,6 +119,14 @@ outer: }) } + // check if there was an error while reading from the stream first + // otherwise since cancel() is used, error group go routines will return with + // context.Canceled, so we will enter the grp.Wait() error block and return from there. + if readErr != nil { + fs.shutdownCh <- struct{}{} + return status.Errorf(codes.Unavailable, "failed to receive request: %s", readErr.Error()) + } + // wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately. if err := grp.Wait(); err != nil { log.Printf("Stopping the SourceTransformFn with err, %s", err) @@ -126,10 +134,6 @@ outer: return status.Errorf(codes.Internal, "%s", err.Error()) } - // check if there was an error while reading from the stream - if readErr != nil { - return status.Errorf(codes.Internal, "%s", readErr.Error()) - } return nil } @@ -137,7 +141,7 @@ outer: func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnServer) error { req, err := stream.Recv() if err != nil { - return status.Errorf(codes.Internal, "failed to receive handshake: %v", err) + return status.Errorf(codes.Unavailable, "failed to receive handshake: %v", err) } if req.GetHandshake() == nil || !req.GetHandshake().GetSot() { return status.Errorf(codes.InvalidArgument, "invalid handshake") @@ -148,7 +152,7 @@ func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnS }, } if err := stream.Send(handshakeResponse); err != nil { - return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) + return status.Errorf(codes.Unavailable, "failed to send handshake response to client over gRPC stream: %v", err) } return nil }