From f93025decf06ca14569e0f3cfb062deea48f3bab Mon Sep 17 00:00:00 2001 From: adarsh0728 Date: Tue, 28 Jan 2025 09:23:01 +0530 Subject: [PATCH] revert: handle panic cases only in this pr Signed-off-by: adarsh0728 --- pkg/sourcetransformer/service.go | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 9d64b165..a6ffb24a 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -88,7 +88,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn case resp := <-senderCh: if err := stream.Send(resp); err != nil { log.Printf("Failed to send response: %v", err) - return status.Errorf(codes.Unavailable, "failed to send response to client: %v", err) + return fmt.Errorf("failed to send response to client: %w", err) } } } @@ -119,14 +119,6 @@ outer: }) } - // check if there was an error while reading from the stream first - // otherwise since cancel() is used, error group go routines will return with - // context.Canceled, so we will enter the grp.Wait() error block and return from there. - if readErr != nil { - fs.shutdownCh <- struct{}{} - return status.Errorf(codes.Unavailable, "failed to receive request: %s", readErr.Error()) - } - // wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately. if err := grp.Wait(); err != nil { log.Printf("Stopping the SourceTransformFn with err, %s", err) @@ -134,6 +126,11 @@ outer: return status.Errorf(codes.Internal, "%s", err.Error()) } + // check if there was an error while reading from the stream + if readErr != nil { + return status.Errorf(codes.Internal, "%s", readErr.Error()) + } + return nil } @@ -141,7 +138,7 @@ outer: func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnServer) error { req, err := stream.Recv() if err != nil { - return status.Errorf(codes.Unavailable, "failed to receive handshake: %v", err) + return status.Errorf(codes.Internal, "failed to receive handshake: %v", err) } if req.GetHandshake() == nil || !req.GetHandshake().GetSot() { return status.Errorf(codes.InvalidArgument, "invalid handshake") @@ -152,7 +149,7 @@ func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnS }, } if err := stream.Send(handshakeResponse); err != nil { - return status.Errorf(codes.Unavailable, "failed to send handshake response to client over gRPC stream: %v", err) + return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) } return nil }