Skip to content

Commit

Permalink
feat: container-type level version compatibility check
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Sep 25, 2024
1 parent 39fbfd4 commit 98709ca
Show file tree
Hide file tree
Showing 12 changed files with 57 additions and 19 deletions.
9 changes: 5 additions & 4 deletions pkg/info/server_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ func Read(opts ...Option) (*ServerInfo, error) {

// GetDefaultServerInfo returns a ServerInfo object with the default fields populated for Go-SDK
func GetDefaultServerInfo() *ServerInfo {
serverInfo := &ServerInfo{Protocol: UDS, Language: Go, MinimumNumaflowVersion: MinimumNumaflowVersion,
Version: getSDKVersion()}
return serverInfo

return &ServerInfo{
Protocol: UDS,
Language: Go,
Version: getSDKVersion(),
}
}
4 changes: 2 additions & 2 deletions pkg/info/server_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func Test_Read_Write(t *testing.T) {
defer os.Remove(filepath)
info := &ServerInfo{
Protocol: TCP,
Language: Java,
MinimumNumaflowVersion: MinimumNumaflowVersion,
Language: Go,
MinimumNumaflowVersion: "1.3.1-z",
Version: "11",
Metadata: map[string]string{"key1": "value1", "key2": "value2"},
}
Expand Down
31 changes: 26 additions & 5 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ const (
type Language string

const (
Go Language = "go"
Python Language = "python"
Java Language = "java"
Rust Language = "rust"
Go Language = "go"
)

type ContainerType string

const (
Sourcer ContainerType = "sourcer"
Sourcetransformer ContainerType = "sourcetransformer"
Sinker ContainerType = "sinker"
Mapper ContainerType = "mapper"
Reducer ContainerType = "reducer"
Reducestreamer ContainerType = "reducestreamer"
Sessionreducer ContainerType = "sessionreducer"
Sideinput ContainerType = "sideinput"
Fbsinker ContainerType = "fb-sinker"
)

type MapMode string
Expand All @@ -30,7 +41,17 @@ const MapModeKey = "MAP_MODE"
// MinimumNumaflowVersion is the minimum version of Numaflow required by the current SDK version
// To update this value, please follow the instructions for MINIMUM_NUMAFLOW_VERSION in
// https://github.com/numaproj/numaflow-rs/blob/main/src/shared.rs
const MinimumNumaflowVersion = "1.3.1-z"
var MinimumNumaflowVersion = map[ContainerType]string{
Sourcer: "1.3.1-z",
Sourcetransformer: "1.3.1-z",
Sinker: "1.3.1-z",
Mapper: "1.3.1-z",
Reducestreamer: "1.3.1-z",
Reducer: "1.3.1-z",
Sessionreducer: "1.3.1-z",
Sideinput: "1.3.1-z",
Fbsinker: "1.3.1-z",
}

// ServerInfo is the information about the server
type ServerInfo struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (m *server) Start(ctx context.Context) error {
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case unary map
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Mapper]
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.UnaryMap)}

// start listening on unix domain socket
Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (m *server) Start(ctx context.Context) error {
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case streaming map
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Mapper]
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.StreamMap)}

// start listening on unix domain socket
Expand Down
4 changes: 3 additions & 1 deletion pkg/reducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Reducer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/reducestreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Reducestreamer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sessionreducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sessionreducer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ func (s *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sideinput]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sinker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func NewServer(h Sinker, inputOptions ...Option) numaflow.Server {
func (s *sinkServer) Start(ctx context.Context) error {

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sinker]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sourcer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ func NewServer(

// Start starts the gRPC server via unix domain socket at shared.address and return error.
func (s *server) Start(ctx context.Context) error {

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sourcetransformer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func (m *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcetransformer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down

0 comments on commit 98709ca

Please sign in to comment.