Skip to content

Commit

Permalink
revert: handle panic cases only in this pr
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Jan 28, 2025
1 parent 43d37a5 commit f93025d
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
case resp := <-senderCh:
if err := stream.Send(resp); err != nil {
log.Printf("Failed to send response: %v", err)
return status.Errorf(codes.Unavailable, "failed to send response to client: %v", err)
return fmt.Errorf("failed to send response to client: %w", err)
}
}
}
Expand Down Expand Up @@ -119,29 +119,26 @@ outer:
})
}

// check if there was an error while reading from the stream first
// otherwise since cancel() is used, error group go routines will return with
// context.Canceled, so we will enter the grp.Wait() error block and return from there.
if readErr != nil {
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Unavailable, "failed to receive request: %s", readErr.Error())
}

// wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately.
if err := grp.Wait(); err != nil {
log.Printf("Stopping the SourceTransformFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, "%s", readErr.Error())
}

return nil
}

// performHandshake handles the handshake logic at the start of the stream.
func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnServer) error {
req, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Unavailable, "failed to receive handshake: %v", err)
return status.Errorf(codes.Internal, "failed to receive handshake: %v", err)
}
if req.GetHandshake() == nil || !req.GetHandshake().GetSot() {
return status.Errorf(codes.InvalidArgument, "invalid handshake")
Expand All @@ -152,7 +149,7 @@ func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnS
},
}
if err := stream.Send(handshakeResponse); err != nil {
return status.Errorf(codes.Unavailable, "failed to send handshake response to client over gRPC stream: %v", err)
return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err)
}
return nil
}
Expand Down

0 comments on commit f93025d

Please sign in to comment.