Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce handshake to client and gRPC server #42

Merged
merged 8 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/golang/mock v1.6.0
github.com/stretchr/testify v1.7.1
go.uber.org/automaxprocs v1.5.2
golang.org/x/net v0.8.0
golang.org/x/sync v0.1.0
google.golang.org/grpc v1.54.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
Expand All @@ -17,7 +18,6 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
Expand Down
32 changes: 22 additions & 10 deletions pkg/function/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,21 @@ package client
import (
"context"
"fmt"
"io"
"log"
"os"
"runtime"
"strconv"

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
"github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/info"
_ "go.uber.org/automaxprocs"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/protobuf/types/known/emptypb"
"io"
"log"
"os"
"runtime"
"strconv"
)

// client contains the grpc connection and the grpc client.
Expand All @@ -26,14 +28,14 @@ type client struct {

// New creates a new client object.
func New(inputOptions ...Option) (*client, error) {

var opts = &options{
maxMessageSize: function.DefaultMaxMessageSize,
maxMessageSize: function.DefaultMaxMessageSize,
sereverInfoFilePath: info.ServerInfoFilePath,
}

// Populate connection variables for client connection
// based on multiprocessing enabled/disabled
if function.IsMapMultiProcEnabled() == true {
if function.IsMapMultiProcEnabled() {
regMultProcResolver()
opts.sockAddr = function.TCP_ADDR
} else {
Expand All @@ -44,12 +46,22 @@ func New(inputOptions ...Option) (*client, error) {
inputOption(opts)
}

// TODO: WaitUntilReady() check unitl SIGTERM is received.
serverInfo, err := info.Read(info.WithServerInfoFilePath(opts.sereverInfoFilePath))
if err != nil {
// TODO: return nil, err
log.Println("Failed to execute info.Read(): ", err)
}
// TODO: Use serverInfo to check compatibility and start the right gRPC client.
if serverInfo != nil {
log.Printf("ServerInfo: %v\n", serverInfo)
}

c := new(client)
var conn *grpc.ClientConn
var err error
var sockAddr string
// Make a TCP connection client for multiprocessing grpc server
if function.IsMapMultiProcEnabled() == true {
if function.IsMapMultiProcEnabled() {
log.Println("Multiprocessing TCP Client ", function.TCP, opts.sockAddr)
sockAddr = fmt.Sprintf("%s%s", connAddr, opts.sockAddr)
conn, err = grpc.Dial(
Expand Down
12 changes: 10 additions & 2 deletions pkg/function/client/options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package client

type options struct {
sockAddr string
maxMessageSize int
sockAddr string
maxMessageSize int
sereverInfoFilePath string
}

// Option is the interface to apply options.
Expand All @@ -21,3 +22,10 @@ func WithMaxMessageSize(size int) Option {
opts.maxMessageSize = size
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(o *options) {
o.sereverInfoFilePath = f
}
}
12 changes: 10 additions & 2 deletions pkg/function/server/options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package server

type options struct {
sockAddr string
maxMessageSize int
sockAddr string
maxMessageSize int
sereverInfoFilePath string
}

// Option is the interface to apply options.
Expand All @@ -21,3 +22,10 @@ func WithSockAddr(addr string) Option {
opts.sockAddr = addr
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(opts *options) {
opts.sereverInfoFilePath = f
}
}
13 changes: 11 additions & 2 deletions pkg/function/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

functionpb "github.com/numaproj/numaflow-go/pkg/apis/proto/function/v1"
functionsdk "github.com/numaproj/numaflow-go/pkg/function"
"github.com/numaproj/numaflow-go/pkg/info"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -83,14 +84,21 @@ func (s *server) RegisterReducer(r functionsdk.ReduceHandler) *server {
// Start starts the gRPC server via unix domain socket at configs.Addr and return error.
func (s *server) Start(ctx context.Context, inputOptions ...Option) error {
var opts = &options{
sockAddr: functionsdk.UDS_ADDR,
maxMessageSize: functionsdk.DefaultMaxMessageSize,
sockAddr: functionsdk.UDS_ADDR,
maxMessageSize: functionsdk.DefaultMaxMessageSize,
sereverInfoFilePath: info.ServerInfoFilePath,
}

for _, inputOption := range inputOptions {
inputOption(opts)
}

// Write server info to the file
serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(opts.sereverInfoFilePath)); err != nil {
return err
}

cleanup := func() error {
// err if no opts.sockAddr should be ignored
if _, err := os.Stat(opts.sockAddr); err == nil {
Expand All @@ -109,6 +117,7 @@ func (s *server) Start(ctx context.Context, inputOptions ...Option) error {
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", functionsdk.UDS, functionsdk.UDS_ADDR, err)
}
defer func() { _ = lis.Close() }()
grpcServer := grpc.NewServer(
grpc.MaxRecvMsgSize(opts.maxMessageSize),
grpc.MaxSendMsgSize(opts.maxMessageSize),
Expand Down
53 changes: 39 additions & 14 deletions pkg/function/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package server
import (
"context"
"fmt"
"google.golang.org/protobuf/types/known/emptypb"
"os"
"strconv"
"sync"
"testing"
"time"

"google.golang.org/protobuf/types/known/emptypb"

"github.com/stretchr/testify/assert"
grpcmd "google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -41,12 +42,21 @@ type fields struct {
}

func Test_server_map(t *testing.T) {
file, err := os.CreateTemp("/tmp", "numaflow-test.sock")
socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(file.Name())
err = os.RemoveAll(socketFile.Name())
assert.NoError(t, err)
}()

serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info")
fmt.Println(serverInfoFile.Name())
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(serverInfoFile.Name())
assert.NoError(t, err)
}()

tests := []struct {
name string
fields fields
Expand All @@ -66,14 +76,15 @@ func Test_server_map(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(file.Name()))
c, err := client.New(client.WithSockAddr(file.Name()))
go New().RegisterMapper(tt.fields.mapHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name()))
c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
err = c.CloseConn(ctx)
assert.NoError(t, err)
}()

for i := 0; i < 10; i++ {
keys := []string{fmt.Sprintf("client_%d", i)}
list, err := c.MapFn(ctx, &functionpb.Datum{
Expand All @@ -95,12 +106,20 @@ func Test_server_map(t *testing.T) {
}

func Test_server_mapT(t *testing.T) {
file, err := os.CreateTemp("/tmp", "numaflow-test.sock")
socketFile, err := os.CreateTemp("/tmp", "numaflow-test.sock")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(file.Name())
err = os.RemoveAll(socketFile.Name())
assert.NoError(t, err)
}()

serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(serverInfoFile.Name())
assert.NoError(t, err)
}()

tests := []struct {
name string
fields fields
Expand All @@ -120,8 +139,8 @@ func Test_server_mapT(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(file.Name()))
c, err := client.New(client.WithSockAddr(file.Name()))
go New().RegisterMapperT(tt.fields.mapTHandler).Start(ctx, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name()))
c, err := client.New(client.WithSockAddr(socketFile.Name()), client.WithServerInfoFilePath(serverInfoFile.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -155,6 +174,14 @@ func Test_server_reduce(t *testing.T) {
err = os.RemoveAll(file.Name())
assert.NoError(t, err)
}()

serverInfoFile, err := os.CreateTemp("/tmp", "numaflow-test-info")
assert.NoError(t, err)
defer func() {
err = os.RemoveAll(serverInfoFile.Name())
assert.NoError(t, err)
}()

var testKeys = []string{"reduce_key"}
tests := []struct {
name string
Expand Down Expand Up @@ -197,9 +224,9 @@ func Test_server_reduce(t *testing.T) {
// note: using actual UDS connection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name()))
go New().RegisterReducer(tt.fields.reduceHandler).Start(ctx, WithSockAddr(file.Name()), WithServerInfoFilePath(serverInfoFile.Name()))

c, err := client.New(client.WithSockAddr(file.Name()))
c, err := client.New(client.WithSockAddr(file.Name()), client.WithServerInfoFilePath(serverInfoFile.Name()))
waitUntilReady(ctx, c, t)
assert.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -228,9 +255,7 @@ func Test_server_reduce(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
for _, d := range resultDatumList {
dList.Elements = append(dList.Elements, d)
}
dList.Elements = append(dList.Elements, resultDatumList...)
}()

wg.Wait()
Expand Down
19 changes: 19 additions & 0 deletions pkg/info/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Package info is used for the gRPC server to provide the information such as protocol, sdk version, language, etc, to the client.
//
// The server information can be used by the client to determine:
// - what is right protocol to use (UDS or TCP)
// - what is the numaflow sdk version used by the server
// - what is language used by the server
//
// The gRPC server (UDF, UDSink, etc) is supposed to have a shared file system with the client (numa container).
//
// Write()
// The gPRC server must use this function to write the correct ServerInfo when it starts.
//
// Read()
// The client is supposed to call the function to read the server information, before it starts to communicate with the gRPC server.
//
// WaitUntilReady()
// This function checks if the server info file is ready to read.
// The client (numa container) is supposed to call the function before it starts to Read() the server info file.
package info
20 changes: 20 additions & 0 deletions pkg/info/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package info

type options struct {
svrInfoFilePath string
}

func defaultOptions() *options {
return &options{
svrInfoFilePath: ServerInfoFilePath,
}
}

type Option func(*options)

// WithServerInfoFilePath sets the server info file path
func WithServerInfoFilePath(f string) Option {
return func(o *options) {
o.svrInfoFilePath = f
}
}
Loading