Skip to content

Commit

Permalink
gRPC error code changes & added prefix in caseof handleRequest panics
Browse files Browse the repository at this point in the history
Signed-off-by: adarsh0728 <[email protected]>
  • Loading branch information
adarsh0728 committed Jan 27, 2025
1 parent da42e80 commit 43d37a5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
File renamed without changes.
20 changes: 12 additions & 8 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse,
return &v1.ReadyResponse{Ready: true}, nil
}

var errTransformerPanic = errors.New("transformer function panicked")
var errTransformerPanic = errors.New("USER_CODE_ERROR: transformer function panicked")

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream v1.SourceTransform_SourceTransformFnServer) (*v1.SourceTransformRequest, error) {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
case resp := <-senderCh:
if err := stream.Send(resp); err != nil {
log.Printf("Failed to send response: %v", err)
return fmt.Errorf("failed to send response to client: %w", err)
return status.Errorf(codes.Unavailable, "failed to send response to client: %v", err)
}
}
}
Expand Down Expand Up @@ -119,25 +119,29 @@ outer:
})
}

// check if there was an error while reading from the stream first
// otherwise since cancel() is used, error group go routines will return with
// context.Canceled, so we will enter the grp.Wait() error block and return from there.
if readErr != nil {
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Unavailable, "failed to receive request: %s", readErr.Error())
}

// wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately.
if err := grp.Wait(); err != nil {
log.Printf("Stopping the SourceTransformFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "%s", err.Error())
}

// check if there was an error while reading from the stream
if readErr != nil {
return status.Errorf(codes.Internal, "%s", readErr.Error())
}
return nil
}

// performHandshake handles the handshake logic at the start of the stream.
func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnServer) error {
req, err := stream.Recv()
if err != nil {
return status.Errorf(codes.Internal, "failed to receive handshake: %v", err)
return status.Errorf(codes.Unavailable, "failed to receive handshake: %v", err)
}
if req.GetHandshake() == nil || !req.GetHandshake().GetSot() {
return status.Errorf(codes.InvalidArgument, "invalid handshake")
Expand All @@ -148,7 +152,7 @@ func (fs *Service) performHandshake(stream v1.SourceTransform_SourceTransformFnS
},
}
if err := stream.Send(handshakeResponse); err != nil {
return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err)
return status.Errorf(codes.Unavailable, "failed to send handshake response to client over gRPC stream: %v", err)
}
return nil
}
Expand Down

0 comments on commit 43d37a5

Please sign in to comment.