Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: write server info for sideinput sdk #97

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions pkg/shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const (

func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) {
// If infoFilePath is not empty, write the server info to the file.
// For Side input we don't write data to the info server, hence will pass path as empty here.
// Could be used later on for similar cases
if infoFilePath != "" {
serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, Version: info.GetSDKVersion()}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil {
Expand Down
19 changes: 15 additions & 4 deletions pkg/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package sideinput

import "github.com/numaproj/numaflow-go/pkg/info"

// options is the struct to hold the server options.
type options struct {
sockAddr string
maxMessageSize int
sockAddr string
maxMessageSize int
serverInfoFilePath string
}

// Option is the interface to apply options.
Expand All @@ -12,8 +15,9 @@ type Option func(*options)
// defaultOptions returns the default options.
func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
}
}

Expand All @@ -30,3 +34,10 @@ func WithSockAddr(addr string) Option {
opts.sockAddr = addr
}
}

// WithServerInfoFilePath sets the server info file path to the given path.
func WithServerInfoFilePath(f string) Option {
return func(opts *options) {
opts.serverInfoFilePath = f
}
}
3 changes: 1 addition & 2 deletions pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ func (s *server) Start(ctx context.Context) error {
defer stop()

// start listening on unix domain socket
// For Side input we don't write data to the info server, hence will pass path as empty here.
lis, err := shared.PrepareServer(s.opts.sockAddr, "")
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sideinput/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ func TestServer_Start(t *testing.T) {
_ = os.RemoveAll(socketFile.Name())
}()

serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info")
defer func() {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var retrieveHandler = RetrieveFunc(func(ctx context.Context) Message {
return BroadcastMessage([]byte("test"))
})
// note: using actual uds connection
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
defer cancel()
err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name())).Start(ctx)
err := NewSideInputServer(retrieveHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}
Loading