From d4e7e523f49a2d96295ab3ddf4227dd460ecfa7e Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 26 Jul 2024 07:28:10 +0530 Subject: [PATCH] add unit test Signed-off-by: Yashash H L --- pkg/mapper/server_test.go | 86 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/pkg/mapper/server_test.go b/pkg/mapper/server_test.go index c4736e85..8c661ff0 100644 --- a/pkg/mapper/server_test.go +++ b/pkg/mapper/server_test.go @@ -3,10 +3,15 @@ package mapper import ( "context" "os" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" ) func TestMapServer_Start(t *testing.T) { @@ -25,8 +30,87 @@ func TestMapServer_Start(t *testing.T) { return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) }) // note: using actual uds connection - ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) } + +// tests the case where the server is shutdown gracefully when a panic occurs in the map handler +func TestMapServer_GracefulShutdown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + dir := t.TempDir() + socketFile, _ := os.Create(dir + "/test.sock") + defer func() { + _ = os.RemoveAll(socketFile.Name()) + }() + + serverInfoFile, _ := os.Create(dir + "/numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + + var mapHandler = MapperFunc(func(ctx context.Context, keys []string, d Datum) Messages { + msg := d.Value() + if keys[0] == "key2" { + time.Sleep(20 * time.Millisecond) + panic("panic test") + } + time.Sleep(50 * time.Millisecond) + return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) + }) + + done := make(chan struct{}) + go func() { + err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(socketFile.Name())).Start(ctx) + assert.NoError(t, err) + close(done) + }() + + // wait for the server to start + time.Sleep(10 * time.Millisecond) + + // create a client + conn, err := grpc.Dial( + "unix://"+socketFile.Name(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatalf("Failed to dial server: %v", err) + } + defer conn.Close() + + client := mappb.NewMapClient(conn) + // send two map requests with key1 and key2 as keys simultaneously + keys := []string{"key1", "key2"} + var wg sync.WaitGroup + for _, key := range keys { + wg.Add(1) + go func(key string) { + defer wg.Done() + req := &mappb.MapRequest{ + Keys: []string{key}, + } + + resp, err := client.MapFn(ctx, req) + // since there is a panic in the map handler for key2, we should get an error + // other requests should be successful + if key == "key2" { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, resp) + } + }(key) + } + + wg.Wait() + // wait for the server to shutdown gracefully because of the panic + select { + case <-ctx.Done(): + t.Fatal("server did not shutdown gracefully") + case <-done: + } +}