From 3b453964211ca5f53f00aeb2724bdd792b75a490 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 10 Apr 2023 16:21:53 -0700 Subject: [PATCH] test: fix server conn in test (#47) Signed-off-by: Sidhant Kohli --- pkg/function/server/server.go | 5 ++--- pkg/function/server/server_test.go | 27 ++++++++++++++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/function/server/server.go b/pkg/function/server/server.go index b94ab923..7dd73f20 100644 --- a/pkg/function/server/server.go +++ b/pkg/function/server/server.go @@ -105,8 +105,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - - lis, err := net.Listen(functionsdk.UDS, functionsdk.UDS_ADDR) + lis, err := net.Listen(functionsdk.UDS, opts.sockAddr) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", functionsdk.UDS, functionsdk.UDS_ADDR, err) } @@ -122,7 +121,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error { defer close(errCh) // start the grpc server go func(ch chan<- error) { - log.Println("starting the gRPC server with unix domain socket...") + log.Println("starting the gRPC server with unix domain socket...", lis.Addr()) err = grpcServer.Serve(lis) if err != nil { ch <- fmt.Errorf("failed to start the gRPC server: %v", err) diff --git a/pkg/function/server/server_test.go b/pkg/function/server/server_test.go index 7a1cc990..b07c3363 100644 --- a/pkg/function/server/server_test.go +++ b/pkg/function/server/server_test.go @@ -3,6 +3,7 @@ package server import ( "context" "fmt" + "google.golang.org/protobuf/types/known/emptypb" "os" "strconv" "sync" @@ -18,6 +19,21 @@ import ( "github.com/numaproj/numaflow-go/pkg/function/client" ) +func waitUntilReady(ctx context.Context, c functionsdk.Client, t *testing.T) { + var in = &emptypb.Empty{} + isReady, _ := c.IsReady(ctx, in) + for !isReady { + select { + case <-ctx.Done(): + assert.Fail(t, ctx.Err().Error()) + return + default: + time.Sleep(100 * time.Millisecond) + isReady, _ = c.IsReady(ctx, in) + } + } +} + type fields struct { mapHandler functionsdk.MapHandler mapTHandler functionsdk.MapTHandler @@ -48,11 +64,11 @@ func Test_server_map(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // note: using actual UDS connection - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(file.Name())) - c, err := client.New(client.WithSockAddr(file.Name())) + waitUntilReady(ctx, c, t) assert.NoError(t, err) defer func() { err = c.CloseConn(ctx) @@ -102,11 +118,11 @@ func Test_server_mapT(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // note: using actual UDS connection - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(file.Name())) - c, err := client.New(client.WithSockAddr(file.Name())) + waitUntilReady(ctx, c, t) assert.NoError(t, err) defer func() { err = c.CloseConn(ctx) @@ -179,11 +195,12 @@ func Test_server_reduce(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { // note: using actual UDS connection - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name())) c, err := client.New(client.WithSockAddr(file.Name())) + waitUntilReady(ctx, c, t) assert.NoError(t, err) defer func() { err = c.CloseConn(ctx)