From 9179ac584a4a1d1934c7f977e45a02613fe96a44 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 3 Oct 2024 11:15:26 +0530 Subject: [PATCH] fix unit test Signed-off-by: Yashash H L --- pkg/mapper/service.go | 7 ++++--- pkg/mapper/service_test.go | 4 ++-- pkg/sourcetransformer/service.go | 6 +++--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 9635e2d2..87c32534 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -63,7 +63,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { return err } case <-grpCtx.Done(): - return nil + return grpCtx.Err() } } }) @@ -129,10 +129,11 @@ func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error { } // handleRequest processes each request and sends the response to the response channel. -func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, responseCh chan<- *mappb.MapResponse) error { +func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, responseCh chan<- *mappb.MapResponse) (err error) { defer func() { if r := recover(); r != nil { log.Printf("panic inside map handler: %v %v", r, string(debug.Stack())) + err = status.Errorf(codes.Internal, "panic inside map handler: %v", r) } }() @@ -154,7 +155,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *mappb.MapRequest, res select { case responseCh <- resp: case <-ctx.Done(): - return nil + return ctx.Err() } return nil } diff --git a/pkg/mapper/service_test.go b/pkg/mapper/service_test.go index 8719112a..a6cda1f5 100644 --- a/pkg/mapper/service_test.go +++ b/pkg/mapper/service_test.go @@ -256,7 +256,7 @@ func TestService_MapFn_Multiple_Messages(t *testing.T) { func TestService_MapFn_Panic(t *testing.T) { svc := &Service{ Mapper: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { - panic("transformer panicked") + panic("map failed") }), // panic in the transformer causes the server to send a shutdown signal to shutdownCh channel. // The function that errgroup runs in a goroutine will be blocked until this shutdown signal is received somewhere else. @@ -288,6 +288,6 @@ func TestService_MapFn_Panic(t *testing.T) { _, err = stream.Recv() require.Error(t, err, "Expected error while receiving message from the stream") gotStatus, _ := status.FromError(err) - expectedStatus := status.Convert(status.Errorf(codes.Internal, "panic")) + expectedStatus := status.Convert(status.Errorf(codes.Internal, "error processing requests: rpc error: code = Internal desc = panic inside map handler: map failed")) require.Equal(t, expectedStatus, gotStatus) } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 30f5d0ca..6dd2577d 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -63,7 +63,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn for { select { case <-grpCtx.Done(): - return nil + return grpCtx.Err() case resp := <-senderCh: if err := stream.Send(resp); err != nil { return fmt.Errorf("failed to send response to client: %w", err) @@ -87,7 +87,7 @@ outer: if err != nil { log.Printf("failed to receive request: %v", err) readErr = err - // read loop is not part of the errgroup, so we need to cancel the context + // read loop is not part of the error group, so we need to cancel the context // to signal the other goroutines to stop processing. cancel() break outer @@ -160,7 +160,7 @@ func (fs *Service) handleRequest(ctx context.Context, req *v1.SourceTransformReq select { case responseCh <- resp: case <-ctx.Done(): - return nil + return ctx.Err() } return nil }