Skip to content

Commit

Permalink
test: fix server conn in test (#47)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Apr 10, 2023
1 parent 2ecaa6c commit 3b45396
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
5 changes: 2 additions & 3 deletions pkg/function/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error {

ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

lis, err := net.Listen(functionsdk.UDS, functionsdk.UDS_ADDR)
lis, err := net.Listen(functionsdk.UDS, opts.sockAddr)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", functionsdk.UDS, functionsdk.UDS_ADDR, err)
}
Expand All @@ -122,7 +121,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error {
defer close(errCh)
// start the grpc server
go func(ch chan<- error) {
log.Println("starting the gRPC server with unix domain socket...")
log.Println("starting the gRPC server with unix domain socket...", lis.Addr())
err = grpcServer.Serve(lis)
if err != nil {
ch <- fmt.Errorf("failed to start the gRPC server: %v", err)
Expand Down
27 changes: 22 additions & 5 deletions pkg/function/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/emptypb"
"os"
"strconv"
"sync"
Expand All @@ -18,6 +19,21 @@ import (
"github.com/numaproj/numaflow-go/pkg/function/client"
)

func waitUntilReady(ctx context.Context, c functionsdk.Client, t *testing.T) {
var in = &emptypb.Empty{}
isReady, _ := c.IsReady(ctx, in)
for !isReady {
select {
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
return
default:
time.Sleep(100 * time.Millisecond)
isReady, _ = c.IsReady(ctx, in)
}
}
}

type fields struct {
mapHandler functionsdk.MapHandler
mapTHandler functionsdk.MapTHandler
Expand Down Expand Up @@ -48,11 +64,11 @@ func Test_server_map(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(file.Name()))

c, err := client.New(client.WithSockAddr(file.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
err = c.CloseConn(ctx)
Expand Down Expand Up @@ -102,11 +118,11 @@ func Test_server_mapT(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(file.Name()))

c, err := client.New(client.WithSockAddr(file.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
err = c.CloseConn(ctx)
Expand Down Expand Up @@ -179,11 +195,12 @@ func Test_server_reduce(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name()))

c, err := client.New(client.WithSockAddr(file.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
err = c.CloseConn(ctx)
Expand Down

0 comments on commit 3b45396

Please sign in to comment.