diff --git a/pkg/info/server_info.go b/pkg/info/server_info.go index a38423d0..15b15bfb 100644 --- a/pkg/info/server_info.go +++ b/pkg/info/server_info.go @@ -112,8 +112,9 @@ func Read(opts ...Option) (*ServerInfo, error) { // GetDefaultServerInfo returns a ServerInfo object with the default fields populated for Go-SDK func GetDefaultServerInfo() *ServerInfo { - serverInfo := &ServerInfo{Protocol: UDS, Language: Go, MinimumNumaflowVersion: MinimumNumaflowVersion, - Version: getSDKVersion()} - return serverInfo - + return &ServerInfo{ + Protocol: UDS, + Language: Go, + Version: getSDKVersion(), + } } diff --git a/pkg/info/server_info_test.go b/pkg/info/server_info_test.go index ab553a88..683311ea 100644 --- a/pkg/info/server_info_test.go +++ b/pkg/info/server_info_test.go @@ -56,8 +56,8 @@ func Test_Read_Write(t *testing.T) { defer os.Remove(filepath) info := &ServerInfo{ Protocol: TCP, - Language: Java, - MinimumNumaflowVersion: MinimumNumaflowVersion, + Language: Go, + MinimumNumaflowVersion: "1.3.1-z", Version: "11", Metadata: map[string]string{"key1": "value1", "key2": "value2"}, } diff --git a/pkg/info/types.go b/pkg/info/types.go index e53448d3..92fe546b 100644 --- a/pkg/info/types.go +++ b/pkg/info/types.go @@ -10,10 +10,23 @@ const ( type Language string const ( - Go Language = "go" - Python Language = "python" - Java Language = "java" - Rust Language = "rust" + Go Language = "go" +) + +type ContainerType string + +// the string content matches the corresponding server info file name. +// DO NOT change it unless the server info file name is changed. +const ( + Sourcer ContainerType = "sourcer" + Sourcetransformer ContainerType = "sourcetransformer" + Sinker ContainerType = "sinker" + Mapper ContainerType = "mapper" + Reducer ContainerType = "reducer" + Reducestreamer ContainerType = "reducestreamer" + Sessionreducer ContainerType = "sessionreducer" + Sideinput ContainerType = "sideinput" + Fbsinker ContainerType = "fb-sinker" ) type MapMode string @@ -30,7 +43,17 @@ const MapModeKey = "MAP_MODE" // MinimumNumaflowVersion is the minimum version of Numaflow required by the current SDK version // To update this value, please follow the instructions for MINIMUM_NUMAFLOW_VERSION in // https://github.com/numaproj/numaflow-rs/blob/main/src/shared.rs -const MinimumNumaflowVersion = "1.3.1-z" +var MinimumNumaflowVersion = map[ContainerType]string{ + Sourcer: "1.3.1-z", + Sourcetransformer: "1.3.1-z", + Sinker: "1.3.1-z", + Mapper: "1.3.1-z", + Reducestreamer: "1.3.1-z", + Reducer: "1.3.1-z", + Sessionreducer: "1.3.1-z", + Sideinput: "1.3.1-z", + Fbsinker: "1.3.1-z", +} // ServerInfo is the information about the server type ServerInfo struct { diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 729026fe..53f7dfb3 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -53,6 +53,7 @@ func (m *server) Start(ctx context.Context) error { // create a server info to the file, we need to add metadata to ensure selection of the // correct map mode, in this case unary map serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Mapper] serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.UnaryMap)} // start listening on unix domain socket diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index 78130071..5426c6cc 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -53,6 +53,7 @@ func (m *server) Start(ctx context.Context) error { // create a server info to the file, we need to add metadata to ensure selection of the // correct map mode, in this case streaming map serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Mapper] serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.StreamMap)} // start listening on unix domain socket diff --git a/pkg/reducer/server.go b/pkg/reducer/server.go index 180126cd..a51ff56c 100644 --- a/pkg/reducer/server.go +++ b/pkg/reducer/server.go @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error { defer stop() // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Reducer] // start listening on unix domain socket - lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/reducestreamer/server.go b/pkg/reducestreamer/server.go index 51b12222..31322933 100644 --- a/pkg/reducestreamer/server.go +++ b/pkg/reducestreamer/server.go @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error { defer stop() // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Reducestreamer] // start listening on unix domain socket - lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sessionreducer/server.go b/pkg/sessionreducer/server.go index ed3fdff5..f20169c3 100644 --- a/pkg/sessionreducer/server.go +++ b/pkg/sessionreducer/server.go @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error { defer stop() // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sessionreducer] // start listening on unix domain socket - lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index 3d394c58..f1c0862b 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -50,8 +50,11 @@ func (s *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() + // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sideinput] // start listening on unix domain socket - lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sinker/server.go b/pkg/sinker/server.go index 9ac5f4cf..82987bb3 100644 --- a/pkg/sinker/server.go +++ b/pkg/sinker/server.go @@ -49,8 +49,10 @@ func NewServer(h Sinker, inputOptions ...Option) numaflow.Server { func (s *sinkServer) Start(ctx context.Context) error { // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sinker] // start listening on unix domain socket - lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sourcer/server.go b/pkg/sourcer/server.go index 12076e9e..0f759a79 100644 --- a/pkg/sourcer/server.go +++ b/pkg/sourcer/server.go @@ -49,10 +49,11 @@ func NewServer( // Start starts the gRPC server via unix domain socket at shared.address and return error. func (s *server) Start(ctx context.Context) error { - // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcer] // start listening on unix domain socket - lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index c42da01a..b0aab9f5 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -50,8 +50,10 @@ func (m *server) Start(ctx context.Context) error { defer stop() // write server info to the file + serverInfo := info.GetDefaultServerInfo() + serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcetransformer] // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, info.GetDefaultServerInfo()) + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) }