Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: container-type level version compatibility check #154

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions pkg/info/server_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,9 @@ func Read(opts ...Option) (*ServerInfo, error) {

// GetDefaultServerInfo returns a ServerInfo object with the default fields populated for Go-SDK
func GetDefaultServerInfo() *ServerInfo {
serverInfo := &ServerInfo{Protocol: UDS, Language: Go, MinimumNumaflowVersion: MinimumNumaflowVersion,
Version: getSDKVersion()}
return serverInfo

return &ServerInfo{
Protocol: UDS,
Language: Go,
Version: getSDKVersion(),
}
}
4 changes: 2 additions & 2 deletions pkg/info/server_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func Test_Read_Write(t *testing.T) {
defer os.Remove(filepath)
info := &ServerInfo{
Protocol: TCP,
Language: Java,
MinimumNumaflowVersion: MinimumNumaflowVersion,
Language: Go,
MinimumNumaflowVersion: "1.3.1-z",
Version: "11",
Metadata: map[string]string{"key1": "value1", "key2": "value2"},
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,23 @@ const (
type Language string

const (
Go Language = "go"
Python Language = "python"
Java Language = "java"
Rust Language = "rust"
Go Language = "go"
)

type ContainerType string

// the string content matches the corresponding server info file name.
// DO NOT change it unless the server info file name is changed.
const (
Sourcer ContainerType = "sourcer"
Sourcetransformer ContainerType = "sourcetransformer"
Sinker ContainerType = "sinker"
Mapper ContainerType = "mapper"
Reducer ContainerType = "reducer"
Reducestreamer ContainerType = "reducestreamer"
Sessionreducer ContainerType = "sessionreducer"
Sideinput ContainerType = "sideinput"
Fbsinker ContainerType = "fb-sinker"
)

type MapMode string
Expand All @@ -30,7 +43,17 @@ const MapModeKey = "MAP_MODE"
// MinimumNumaflowVersion is the minimum version of Numaflow required by the current SDK version
// To update this value, please follow the instructions for MINIMUM_NUMAFLOW_VERSION in
// https://github.com/numaproj/numaflow-rs/blob/main/src/shared.rs
const MinimumNumaflowVersion = "1.3.1-z"
var MinimumNumaflowVersion = map[ContainerType]string{
Sourcer: "1.3.1-z",
Sourcetransformer: "1.3.1-z",
Sinker: "1.3.1-z",
Mapper: "1.3.1-z",
Reducestreamer: "1.3.1-z",
Reducer: "1.3.1-z",
Sessionreducer: "1.3.1-z",
Sideinput: "1.3.1-z",
Fbsinker: "1.3.1-z",
}

// ServerInfo is the information about the server
type ServerInfo struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (m *server) Start(ctx context.Context) error {
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case unary map
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Mapper]
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.UnaryMap)}

// start listening on unix domain socket
Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (m *server) Start(ctx context.Context) error {
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case streaming map
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Mapper]
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.StreamMap)}

// start listening on unix domain socket
Expand Down
4 changes: 3 additions & 1 deletion pkg/reducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Reducer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/reducestreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Reducestreamer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sessionreducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ func (r *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sessionreducer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ func (s *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sideinput]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sinker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ func NewServer(h Sinker, inputOptions ...Option) numaflow.Server {
func (s *sinkServer) Start(ctx context.Context) error {

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sinker]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/sourcer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ func NewServer(

// Start starts the gRPC server via unix domain socket at shared.address and return error.
func (s *server) Start(ctx context.Context) error {

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/sourcetransformer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func (m *server) Start(ctx context.Context) error {
defer stop()

// write server info to the file
serverInfo := info.GetDefaultServerInfo()
serverInfo.MinimumNumaflowVersion = info.MinimumNumaflowVersion[info.Sourcetransformer]
// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, info.GetDefaultServerInfo())
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
Loading