Skip to content

Commit

Permalink
feat: introduce handshake to client and gRPC server (#42)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Apr 11, 2023
1 parent 3b45396 commit 7775e68
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 42 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.7.1
go.uber.org/automaxprocs v1.5.2
golang.org/x/net v0.8.0
golang.org/x/sync v0.1.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
Expand All @@ -17,7 +18,6 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
Expand Down
32 changes: 22 additions & 10 deletions pkg/function/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ package client
import (
"context"
"fmt"
"io"
"log"
"os"
"runtime"
"strconv"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/info"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"log"
"os"
"runtime"
"strconv"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -26,14 +28,14 @@ type client struct {

// New creates a new client object.
func New(inputOptions ...Option) (*client, error) {

var opts = &options{
maxMessageSize: function.DefaultMaxMessageSize,
maxMessageSize: function.DefaultMaxMessageSize,
sereverInfoFilePath: info.ServerInfoFilePath,
}

// Populate connection variables for client connection
// based on multiprocessing enabled/disabled
if function.IsMapMultiProcEnabled() == true {
if function.IsMapMultiProcEnabled() {
regMultProcResolver()
opts.sockAddr = function.TCP_ADDR
} else {
Expand All @@ -44,12 +46,22 @@ func New(inputOptions ...Option) (*client, error) {
inputOption(opts)
}

// TODO: WaitUntilReady() check unitl SIGTERM is received.
serverInfo, err := info.Read(info.WithServerInfoFilePath(opts.sereverInfoFilePath))
if err != nil {
// TODO: return nil, err
log.Println("Failed to execute info.Read(): ", err)
}
// TODO: Use serverInfo to check compatibility and start the right gRPC client.
if serverInfo != nil {
log.Printf("ServerInfo: %v\n", serverInfo)
}

c := new(client)
var conn *grpc.ClientConn
var err error
var sockAddr string
// Make a TCP connection client for multiprocessing grpc server
if function.IsMapMultiProcEnabled() == true {
if function.IsMapMultiProcEnabled() {
log.Println("Multiprocessing TCP Client ", function.TCP, opts.sockAddr)
sockAddr = fmt.Sprintf("%s%s", connAddr, opts.sockAddr)
conn, err = grpc.Dial(
Expand Down
12 changes: 10 additions & 2 deletions pkg/function/client/options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package client

type options struct {
sockAddr string
maxMessageSize int
sockAddr string
maxMessageSize int
sereverInfoFilePath string
}

// Option is the interface to apply options.
Expand All @@ -21,3 +22,10 @@ func WithMaxMessageSize(size int) Option {
opts.maxMessageSize = size
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(o *options) {
o.sereverInfoFilePath = f
}
}
12 changes: 10 additions & 2 deletions pkg/function/server/options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package server

type options struct {
sockAddr string
maxMessageSize int
sockAddr string
maxMessageSize int
sereverInfoFilePath string
}

// Option is the interface to apply options.
Expand All @@ -21,3 +22,10 @@ func WithSockAddr(addr string) Option {
opts.sockAddr = addr
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(opts *options) {
opts.sereverInfoFilePath = f
}
}
13 changes: 11 additions & 2 deletions pkg/function/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -83,14 +84,21 @@ func (s *server) RegisterReducer(r functionsdk.ReduceHandler) *server {
// Start starts the gRPC server via unix domain socket at configs.Addr and return error.
func (s *server) Start(ctx context.Context, inputOptions ...Option) error {
var opts = &options{
sockAddr: functionsdk.UDS_ADDR,
maxMessageSize: functionsdk.DefaultMaxMessageSize,
sockAddr: functionsdk.UDS_ADDR,
maxMessageSize: functionsdk.DefaultMaxMessageSize,
sereverInfoFilePath: info.ServerInfoFilePath,
}

for _, inputOption := range inputOptions {
inputOption(opts)
}

// Write server info to the file
serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(opts.sereverInfoFilePath)); err != nil {
return err
}

cleanup := func() error {
// err if no opts.sockAddr should be ignored
if _, err := os.Stat(opts.sockAddr); err == nil {
Expand All @@ -109,6 +117,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error {
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", functionsdk.UDS, functionsdk.UDS_ADDR, err)
}
defer func() { _ = lis.Close() }()
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(opts.maxMessageSize),
grpc.MaxSendMsgSize(opts.maxMessageSize),
Expand Down
53 changes: 39 additions & 14 deletions pkg/function/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package server
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/emptypb"
"os"
"strconv"
"sync"
"testing"
"time"

"google.golang.org/protobuf/types/known/emptypb"

"github.com/stretchr/testify/assert"
grpcmd "google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -41,12 +42,21 @@ type fields struct {
}

func Test_server_map(t *testing.T) {
file, err := os.CreateTemp("/tmp", "numaflow-test.sock")
socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(file.Name())
err = os.RemoveAll(socketFile.Name())
assert.NoError(t, err)
}()

serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info")
fmt.Println(serverInfoFile.Name())
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(serverInfoFile.Name())
assert.NoError(t, err)
}()

tests := []struct {
name string
fields fields
Expand All @@ -66,14 +76,15 @@ func Test_server_map(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(file.Name()))
c, err := client.New(client.WithSockAddr(file.Name()))
go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name()))
c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
err = c.CloseConn(ctx)
assert.NoError(t, err)
}()

for i := 0; i < 10; i++ {
keys := []string{fmt.Sprintf("client_%d", i)}
list, err := c.MapFn(ctx, &functionpb.Datum{
Expand All @@ -95,12 +106,20 @@ func Test_server_map(t *testing.T) {
}

func Test_server_mapT(t *testing.T) {
file, err := os.CreateTemp("/tmp", "numaflow-test.sock")
socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(file.Name())
err = os.RemoveAll(socketFile.Name())
assert.NoError(t, err)
}()

serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(serverInfoFile.Name())
assert.NoError(t, err)
}()

tests := []struct {
name string
fields fields
Expand All @@ -120,8 +139,8 @@ func Test_server_mapT(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(file.Name()))
c, err := client.New(client.WithSockAddr(file.Name()))
go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name()))
c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -155,6 +174,14 @@ func Test_server_reduce(t *testing.T) {
err = os.RemoveAll(file.Name())
assert.NoError(t, err)
}()

serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(serverInfoFile.Name())
assert.NoError(t, err)
}()

var testKeys = []string{"reduce_key"}
tests := []struct {
name string
Expand Down Expand Up @@ -197,9 +224,9 @@ func Test_server_reduce(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name()))
go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name()), WithServerInfoFilePath(serverInfoFile.Name()))

c, err := client.New(client.WithSockAddr(file.Name()))
c, err := client.New(client.WithSockAddr(file.Name()), client.WithServerInfoFilePath(serverInfoFile.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -228,9 +255,7 @@ func Test_server_reduce(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
for _, d := range resultDatumList {
dList.Elements = append(dList.Elements, d)
}
dList.Elements = append(dList.Elements, resultDatumList...)
}()

wg.Wait()
Expand Down
19 changes: 19 additions & 0 deletions pkg/info/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package info is used for the gRPC server to provide the information such as protocol, sdk version, language, etc, to the client.
//
// The server information can be used by the client to determine:
// - what is right protocol to use (UDS or TCP)
// - what is the numaflow sdk version used by the server
// - what is language used by the server
//
// The gRPC server (UDF, UDSink, etc) is supposed to have a shared file system with the client (numa container).
//
// Write()
// The gPRC server must use this function to write the correct ServerInfo when it starts.
//
// Read()
// The client is supposed to call the function to read the server information, before it starts to communicate with the gRPC server.
//
// WaitUntilReady()
// This function checks if the server info file is ready to read.
// The client (numa container) is supposed to call the function before it starts to Read() the server info file.
package info
20 changes: 20 additions & 0 deletions pkg/info/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package info

type options struct {
svrInfoFilePath string
}

func defaultOptions() *options {
return &options{
svrInfoFilePath: ServerInfoFilePath,
}
}

type Option func(*options)

// WithServerInfoFilePath sets the server info file path
func WithServerInfoFilePath(f string) Option {
return func(o *options) {
o.svrInfoFilePath = f
}
}
Loading

0 comments on commit 7775e68

Please sign in to comment.