From 7775e6816ab14a769c380cb3f35c9a58806dc40b Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Tue, 11 Apr 2023 09:47:29 -0700 Subject: [PATCH] feat: introduce handshake to client and gRPC server (#42) Signed-off-by: Derek Wang --- go.mod | 2 +- pkg/function/client/client.go | 32 ++++++--- pkg/function/client/options.go | 12 +++- pkg/function/server/options.go | 12 +++- pkg/function/server/server.go | 13 +++- pkg/function/server/server_test.go | 53 ++++++++++---- pkg/info/doc.go | 19 +++++ pkg/info/options.go | 20 ++++++ pkg/info/server_info.go | 111 +++++++++++++++++++++++++++++ pkg/info/server_info_test.go | 67 +++++++++++++++++ pkg/info/types.go | 28 ++++++++ pkg/sink/client/client.go | 17 ++++- pkg/sink/client/options.go | 10 ++- pkg/sink/server/options.go | 12 +++- pkg/sink/server/server.go | 13 +++- pkg/sink/server/server_test.go | 16 +++-- 16 files changed, 395 insertions(+), 42 deletions(-) create mode 100644 pkg/info/doc.go create mode 100644 pkg/info/options.go create mode 100644 pkg/info/server_info.go create mode 100644 pkg/info/server_info_test.go create mode 100644 pkg/info/types.go diff --git a/go.mod b/go.mod index e8188c3a..0982ddd6 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.7.1 go.uber.org/automaxprocs v1.5.2 + golang.org/x/net v0.8.0 golang.org/x/sync v0.1.0 google.golang.org/grpc v1.54.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0 @@ -17,7 +18,6 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/net v0.8.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect diff --git a/pkg/function/client/client.go b/pkg/function/client/client.go index 1dbc0f41..53cc87c5 100644 --- a/pkg/function/client/client.go +++ b/pkg/function/client/client.go @@ -3,19 +3,21 @@ package client import ( "context" "fmt" + "io" + "log" + "os" + "runtime" + "strconv" + functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1" "github.com/numaproj/numaflow-go/pkg/function" + "github.com/numaproj/numaflow-go/pkg/info" _ "go.uber.org/automaxprocs" "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/resolver" "google.golang.org/protobuf/types/known/emptypb" - "io" - "log" - "os" - "runtime" - "strconv" ) // client contains the grpc connection and the grpc client. @@ -26,14 +28,14 @@ type client struct { // New creates a new client object. func New(inputOptions ...Option) (*client, error) { - var opts = &options{ - maxMessageSize: function.DefaultMaxMessageSize, + maxMessageSize: function.DefaultMaxMessageSize, + sereverInfoFilePath: info.ServerInfoFilePath, } // Populate connection variables for client connection // based on multiprocessing enabled/disabled - if function.IsMapMultiProcEnabled() == true { + if function.IsMapMultiProcEnabled() { regMultProcResolver() opts.sockAddr = function.TCP_ADDR } else { @@ -44,12 +46,22 @@ func New(inputOptions ...Option) (*client, error) { inputOption(opts) } + // TODO: WaitUntilReady() check unitl SIGTERM is received. + serverInfo, err := info.Read(info.WithServerInfoFilePath(opts.sereverInfoFilePath)) + if err != nil { + // TODO: return nil, err + log.Println("Failed to execute info.Read(): ", err) + } + // TODO: Use serverInfo to check compatibility and start the right gRPC client. + if serverInfo != nil { + log.Printf("ServerInfo: %v\n", serverInfo) + } + c := new(client) var conn *grpc.ClientConn - var err error var sockAddr string // Make a TCP connection client for multiprocessing grpc server - if function.IsMapMultiProcEnabled() == true { + if function.IsMapMultiProcEnabled() { log.Println("Multiprocessing TCP Client ", function.TCP, opts.sockAddr) sockAddr = fmt.Sprintf("%s%s", connAddr, opts.sockAddr) conn, err = grpc.Dial( diff --git a/pkg/function/client/options.go b/pkg/function/client/options.go index 32efa804..062e1a4f 100644 --- a/pkg/function/client/options.go +++ b/pkg/function/client/options.go @@ -1,8 +1,9 @@ package client type options struct { - sockAddr string - maxMessageSize int + sockAddr string + maxMessageSize int + sereverInfoFilePath string } // Option is the interface to apply options. @@ -21,3 +22,10 @@ func WithMaxMessageSize(size int) Option { opts.maxMessageSize = size } } + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(o *options) { + o.sereverInfoFilePath = f + } +} diff --git a/pkg/function/server/options.go b/pkg/function/server/options.go index 25efa694..1f817c8c 100644 --- a/pkg/function/server/options.go +++ b/pkg/function/server/options.go @@ -1,8 +1,9 @@ package server type options struct { - sockAddr string - maxMessageSize int + sockAddr string + maxMessageSize int + sereverInfoFilePath string } // Option is the interface to apply options. @@ -21,3 +22,10 @@ func WithSockAddr(addr string) Option { opts.sockAddr = addr } } + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(opts *options) { + opts.sereverInfoFilePath = f + } +} diff --git a/pkg/function/server/server.go b/pkg/function/server/server.go index 7dd73f20..adf44444 100644 --- a/pkg/function/server/server.go +++ b/pkg/function/server/server.go @@ -11,6 +11,7 @@ import ( functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1" functionsdk "github.com/numaproj/numaflow-go/pkg/function" + "github.com/numaproj/numaflow-go/pkg/info" "google.golang.org/grpc" ) @@ -83,14 +84,21 @@ func (s *server) RegisterReducer(r functionsdk.ReduceHandler) *server { // Start starts the gRPC server via unix domain socket at configs.Addr and return error. func (s *server) Start(ctx context.Context, inputOptions ...Option) error { var opts = &options{ - sockAddr: functionsdk.UDS_ADDR, - maxMessageSize: functionsdk.DefaultMaxMessageSize, + sockAddr: functionsdk.UDS_ADDR, + maxMessageSize: functionsdk.DefaultMaxMessageSize, + sereverInfoFilePath: info.ServerInfoFilePath, } for _, inputOption := range inputOptions { inputOption(opts) } + // Write server info to the file + serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()} + if err := info.Write(serverInfo, info.WithServerInfoFilePath(opts.sereverInfoFilePath)); err != nil { + return err + } + cleanup := func() error { // err if no opts.sockAddr should be ignored if _, err := os.Stat(opts.sockAddr); err == nil { @@ -109,6 +117,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error { if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", functionsdk.UDS, functionsdk.UDS_ADDR, err) } + defer func() { _ = lis.Close() }() grpcServer := grpc.NewServer( grpc.MaxRecvMsgSize(opts.maxMessageSize), grpc.MaxSendMsgSize(opts.maxMessageSize), diff --git a/pkg/function/server/server_test.go b/pkg/function/server/server_test.go index b07c3363..49763b96 100644 --- a/pkg/function/server/server_test.go +++ b/pkg/function/server/server_test.go @@ -3,13 +3,14 @@ package server import ( "context" "fmt" - "google.golang.org/protobuf/types/known/emptypb" "os" "strconv" "sync" "testing" "time" + "google.golang.org/protobuf/types/known/emptypb" + "github.com/stretchr/testify/assert" grpcmd "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/timestamppb" @@ -41,12 +42,21 @@ type fields struct { } func Test_server_map(t *testing.T) { - file, err := os.CreateTemp("/tmp", "numaflow-test.sock") + socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock") assert.NoError(t, err) defer func() { - err = os.RemoveAll(file.Name()) + err = os.RemoveAll(socketFile.Name()) + assert.NoError(t, err) + }() + + serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info") + fmt.Println(serverInfoFile.Name()) + assert.NoError(t, err) + defer func() { + err = os.RemoveAll(serverInfoFile.Name()) assert.NoError(t, err) }() + tests := []struct { name string fields fields @@ -66,14 +76,15 @@ func Test_server_map(t *testing.T) { // note: using actual UDS connection ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(file.Name())) - c, err := client.New(client.WithSockAddr(file.Name())) + go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())) + c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name())) waitUntilReady(ctx, c, t) assert.NoError(t, err) defer func() { err = c.CloseConn(ctx) assert.NoError(t, err) }() + for i := 0; i < 10; i++ { keys := []string{fmt.Sprintf("client_%d", i)} list, err := c.MapFn(ctx, &functionpb.Datum{ @@ -95,12 +106,20 @@ func Test_server_map(t *testing.T) { } func Test_server_mapT(t *testing.T) { - file, err := os.CreateTemp("/tmp", "numaflow-test.sock") + socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock") assert.NoError(t, err) defer func() { - err = os.RemoveAll(file.Name()) + err = os.RemoveAll(socketFile.Name()) assert.NoError(t, err) }() + + serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info") + assert.NoError(t, err) + defer func() { + err = os.RemoveAll(serverInfoFile.Name()) + assert.NoError(t, err) + }() + tests := []struct { name string fields fields @@ -120,8 +139,8 @@ func Test_server_mapT(t *testing.T) { // note: using actual UDS connection ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(file.Name())) - c, err := client.New(client.WithSockAddr(file.Name())) + go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())) + c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name())) waitUntilReady(ctx, c, t) assert.NoError(t, err) defer func() { @@ -155,6 +174,14 @@ func Test_server_reduce(t *testing.T) { err = os.RemoveAll(file.Name()) assert.NoError(t, err) }() + + serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info") + assert.NoError(t, err) + defer func() { + err = os.RemoveAll(serverInfoFile.Name()) + assert.NoError(t, err) + }() + var testKeys = []string{"reduce_key"} tests := []struct { name string @@ -197,9 +224,9 @@ func Test_server_reduce(t *testing.T) { // note: using actual UDS connection ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name())) + go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name()), WithServerInfoFilePath(serverInfoFile.Name())) - c, err := client.New(client.WithSockAddr(file.Name())) + c, err := client.New(client.WithSockAddr(file.Name()), client.WithServerInfoFilePath(serverInfoFile.Name())) waitUntilReady(ctx, c, t) assert.NoError(t, err) defer func() { @@ -228,9 +255,7 @@ func Test_server_reduce(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - for _, d := range resultDatumList { - dList.Elements = append(dList.Elements, d) - } + dList.Elements = append(dList.Elements, resultDatumList...) }() wg.Wait() diff --git a/pkg/info/doc.go b/pkg/info/doc.go new file mode 100644 index 00000000..3150d76b --- /dev/null +++ b/pkg/info/doc.go @@ -0,0 +1,19 @@ +// Package info is used for the gRPC server to provide the information such as protocol, sdk version, language, etc, to the client. +// +// The server information can be used by the client to determine: +// - what is right protocol to use (UDS or TCP) +// - what is the numaflow sdk version used by the server +// - what is language used by the server +// +// The gRPC server (UDF, UDSink, etc) is supposed to have a shared file system with the client (numa container). +// +// Write() +// The gPRC server must use this function to write the correct ServerInfo when it starts. +// +// Read() +// The client is supposed to call the function to read the server information, before it starts to communicate with the gRPC server. +// +// WaitUntilReady() +// This function checks if the server info file is ready to read. +// The client (numa container) is supposed to call the function before it starts to Read() the server info file. +package info diff --git a/pkg/info/options.go b/pkg/info/options.go new file mode 100644 index 00000000..58fd56c3 --- /dev/null +++ b/pkg/info/options.go @@ -0,0 +1,20 @@ +package info + +type options struct { + svrInfoFilePath string +} + +func defaultOptions() *options { + return &options{ + svrInfoFilePath: ServerInfoFilePath, + } +} + +type Option func(*options) + +// WithServerInfoFilePath sets the server info file path +func WithServerInfoFilePath(f string) Option { + return func(o *options) { + o.svrInfoFilePath = f + } +} diff --git a/pkg/info/server_info.go b/pkg/info/server_info.go new file mode 100644 index 00000000..12abbb5c --- /dev/null +++ b/pkg/info/server_info.go @@ -0,0 +1,111 @@ +package info + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "runtime/debug" + "strings" + "time" +) + +var END = fmt.Sprintf("%U__END__", '\\') // U+005C__END__ + +func GetSDKVersion() string { + version := "" + info, ok := debug.ReadBuildInfo() + if !ok { + return version + } + for _, d := range info.Deps { + if strings.HasSuffix(d.Path, "/numaflow-go") { + version = d.Version + break + } + } + return version +} + +// Write writes the server info to a file +func Write(svrInfo *ServerInfo, opts ...Option) error { + b, err := json.Marshal(svrInfo) + if err != nil { + return fmt.Errorf("failed to marshal server info: %w", err) + } + options := defaultOptions() + for _, opt := range opts { + opt(options) + } + if err := os.Remove(options.svrInfoFilePath); !os.IsNotExist(err) && err != nil { + return fmt.Errorf("failed to remove server-info file: %w", err) + } + f, err := os.Create(options.svrInfoFilePath) + if err != nil { + return fmt.Errorf("failed to create server-info file: %w", err) + } + defer f.Close() + _, err = f.Write(b) + if err != nil { + return fmt.Errorf("failed to write server-info file: %w", err) + } + _, err = f.WriteString(END) + if err != nil { + return fmt.Errorf("failed to write END server-info file: %w", err) + } + return nil +} + +// WaitUntilReady waits until the server info is ready +func WaitUntilReady(ctx context.Context, opts ...Option) error { + options := defaultOptions() + for _, opt := range opts { + opt(options) + } + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + if fileInfo, err := os.Stat(options.svrInfoFilePath); err != nil { + log.Printf("Server info file %s is not ready...", options.svrInfoFilePath) + time.Sleep(1 * time.Second) + continue + } else { + if fileInfo.Size() > 0 { + return nil + } + } + } + } +} + +// Read reads the server info from a file +func Read(opts ...Option) (*ServerInfo, error) { + options := defaultOptions() + for _, opt := range opts { + opt(options) + } + // It takes some time for the server to write the server info file + // TODO: use a better way to wait for the file to be ready + retry := 0 + b, err := os.ReadFile(options.svrInfoFilePath) + for !strings.HasSuffix(string(b), END) && err == nil && retry < 10 { + time.Sleep(100 * time.Millisecond) + b, err = os.ReadFile(options.svrInfoFilePath) + retry++ + } + if err != nil { + return nil, err + } + if !strings.HasSuffix(string(b), END) { + return nil, fmt.Errorf("server info file is not ready") + } + b = b[:len(b)-len([]byte(END))] + info := &ServerInfo{} + if err := json.Unmarshal(b, info); err != nil { + return nil, fmt.Errorf("failed to unmarshal server info: %w", err) + } + return info, nil +} diff --git a/pkg/info/server_info_test.go b/pkg/info/server_info_test.go new file mode 100644 index 00000000..dbb016b8 --- /dev/null +++ b/pkg/info/server_info_test.go @@ -0,0 +1,67 @@ +package info + +import ( + "errors" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" +) + +func Test_getSDKVersion(t *testing.T) { + tests := []struct { + name string + want string + }{ + { + name: "getSDK", + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetSDKVersion(); got != tt.want { + t.Errorf("GetSDKVersion() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_WaitUntilReady(t *testing.T) { + serverInfoFile, err := os.CreateTemp("/tmp", "server-info") + assert.NoError(t, err) + defer os.Remove(serverInfoFile.Name()) + err = os.WriteFile(serverInfoFile.Name(), []byte("test"), 0644) + assert.NoError(t, err) + + t.Run("test timeout", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) + defer cancel() + err := WaitUntilReady(ctx, WithServerInfoFilePath("/tmp/not-exist")) + assert.True(t, errors.Is(err, context.DeadlineExceeded)) + }) + + t.Run("test success", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) + defer cancel() + err = WaitUntilReady(ctx, WithServerInfoFilePath(serverInfoFile.Name())) + assert.NoError(t, err) + }) +} + +func Test_Read_Write(t *testing.T) { + filepath := os.TempDir() + "/server-info" + defer os.Remove(filepath) + info := &ServerInfo{Protocol: UDS, Language: Go, Version: ""} + err := Write(info, WithServerInfoFilePath(filepath)) + assert.NoError(t, err) + got, err := Read(WithServerInfoFilePath("/tmp/not-exist")) + assert.Error(t, err) + assert.True(t, os.IsNotExist(err)) + assert.Nil(t, got) + got, err = Read(WithServerInfoFilePath(filepath)) + assert.NoError(t, err) + assert.Equal(t, info, got) +} diff --git a/pkg/info/types.go b/pkg/info/types.go new file mode 100644 index 00000000..ed3c468e --- /dev/null +++ b/pkg/info/types.go @@ -0,0 +1,28 @@ +package info + +const ( + ServerInfoFilePath = "/var/run/numaflow/server-info" +) + +type Protocol string + +const ( + UDS Protocol = "uds" + TCP Protocol = "tcp" +) + +type Language string + +const ( + Go Language = "go" + Python Language = "python" + Java Language = "java" +) + +// ServerInfo is the information about the server +type ServerInfo struct { + Protocol Protocol `json:"protocol"` + Language Language `json:"language"` + Version string `json:"version"` + Metaddata map[string]string `json:"metadata"` +} diff --git a/pkg/sink/client/client.go b/pkg/sink/client/client.go index d2a723f6..5484697e 100644 --- a/pkg/sink/client/client.go +++ b/pkg/sink/client/client.go @@ -3,8 +3,10 @@ package client import ( "context" "fmt" + "log" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/sink" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -21,15 +23,26 @@ var _ sink.Client = (*client)(nil) // New creates a new client object. func New(inputOptions ...Option) (*client, error) { - var opts = &options{ - sockAddr: sink.Addr, + sockAddr: sink.Addr, + sereverInfoFilePath: info.ServerInfoFilePath, } for _, inputOption := range inputOptions { inputOption(opts) } + // TODO: WaitUntilReady() check unitl SIGTERM is received. + serverInfo, err := info.Read(info.WithServerInfoFilePath(opts.sereverInfoFilePath)) + if err != nil { + // TODO: return nil, err + log.Println("Failed to execute info.Read(): ", err) + } + // TODO: Use serverInfo to check compatibility and start the right gRPC client. + if serverInfo != nil { + log.Printf("ServerInfo: %v\n", serverInfo) + } + c := new(client) sockAddr := fmt.Sprintf("%s:%s", sink.Protocol, opts.sockAddr) conn, err := grpc.Dial(sockAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/pkg/sink/client/options.go b/pkg/sink/client/options.go index e7b9d451..6a6fbe0a 100644 --- a/pkg/sink/client/options.go +++ b/pkg/sink/client/options.go @@ -1,7 +1,8 @@ package client type options struct { - sockAddr string + sockAddr string + sereverInfoFilePath string } // Option is the interface to apply options. @@ -13,3 +14,10 @@ func WithSockAddr(addr string) Option { opts.sockAddr = addr } } + +// WithServerInfoFilePath start the client with the given server info file path. This is mainly used for testing purpose. +func WithServerInfoFilePath(f string) Option { + return func(o *options) { + o.sereverInfoFilePath = f + } +} diff --git a/pkg/sink/server/options.go b/pkg/sink/server/options.go index 25efa694..3f954570 100644 --- a/pkg/sink/server/options.go +++ b/pkg/sink/server/options.go @@ -1,8 +1,9 @@ package server type options struct { - sockAddr string - maxMessageSize int + sockAddr string + maxMessageSize int + sereverInfoFilePath string } // Option is the interface to apply options. @@ -21,3 +22,10 @@ func WithSockAddr(addr string) Option { opts.sockAddr = addr } } + +// WithServerInfoFilePath sets the server info file path. +func WithServerInfoFilePath(path string) Option { + return func(opts *options) { + opts.sereverInfoFilePath = path + } +} diff --git a/pkg/sink/server/server.go b/pkg/sink/server/server.go index 97b8ba29..3007184d 100644 --- a/pkg/sink/server/server.go +++ b/pkg/sink/server/server.go @@ -10,6 +10,7 @@ import ( "syscall" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" + "github.com/numaproj/numaflow-go/pkg/info" sinksdk "github.com/numaproj/numaflow-go/pkg/sink" "google.golang.org/grpc" ) @@ -48,14 +49,21 @@ func (s *server) RegisterSinker(h sinksdk.SinkHandler) *server { // Start starts the gRPC server via unix domain socket at configs.Addr and return error. func (s *server) Start(ctx context.Context, inputOptions ...Option) error { var opts = &options{ - sockAddr: sinksdk.Addr, - maxMessageSize: sinksdk.DefaultMaxMessageSize, + sockAddr: sinksdk.Addr, + maxMessageSize: sinksdk.DefaultMaxMessageSize, + sereverInfoFilePath: info.ServerInfoFilePath, } for _, inputOption := range inputOptions { inputOption(opts) } + // Write server info to the file + serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()} + if err := info.Write(serverInfo, info.WithServerInfoFilePath(opts.sereverInfoFilePath)); err != nil { + return err + } + cleanup := func() error { // err if no opts.sockAddr should be ignored if _, err := os.Stat(opts.sockAddr); err == nil { @@ -75,6 +83,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error { if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", sinksdk.Protocol, sinksdk.Addr, err) } + defer func() { _ = lis.Close() }() grpcServer := grpc.NewServer( grpc.MaxRecvMsgSize(opts.maxMessageSize), grpc.MaxSendMsgSize(opts.maxMessageSize), diff --git a/pkg/sink/server/server_test.go b/pkg/sink/server/server_test.go index 24498e75..4b91ef66 100644 --- a/pkg/sink/server/server_test.go +++ b/pkg/sink/server/server_test.go @@ -15,10 +15,18 @@ import ( ) func Test_server_sink(t *testing.T) { - file, err := os.CreateTemp("/tmp", "numaflow-test.sock") + + socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock") + assert.NoError(t, err) + defer func() { + err = os.RemoveAll(socketFile.Name()) + assert.NoError(t, err) + }() + + serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info") assert.NoError(t, err) defer func() { - err = os.RemoveAll(file.Name()) + err = os.RemoveAll(serverInfoFile.Name()) assert.NoError(t, err) }() @@ -53,9 +61,9 @@ func Test_server_sink(t *testing.T) { // note: using actual UDS connection ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go New().RegisterSinker(tt.fields.sinkHandler).Start(ctx, WithSockAddr(file.Name())) + go New().RegisterSinker(tt.fields.sinkHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())) - c, err := client.New(client.WithSockAddr(file.Name())) + c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name())) assert.NoError(t, err) defer func() { err = c.CloseConn(ctx)