diff --git a/pkg/batchmapper/examples/batchmap-flatmap/README.md b/pkg/batchmapper/examples/batchmap-flatmap/README.md index 135b5ecb..be754531 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/README.md +++ b/pkg/batchmapper/examples/batchmap-flatmap/README.md @@ -2,24 +2,7 @@ An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function. - -To start a vertex in batch map mode we need to add the annotations as following -```yaml - - name: go-split - metadata: - annotations: - numaflow.numaproj.io/batch-map: "true" - scale: - min: 1 - udf: - container: - # Split input message into an array with comma, see https://github.com/numaproj/numaflow-go/tree/main/pkg/batchmapper/examples/batchmap-flatmap - image: quay.io/numaio/numaflow-go/batch-map-flatmap:stable - imagePullPolicy: Always -``` - - -Some considerations for batch map are as follows +Some important considerations for batch map are as follows - The user will have to ensure that the BatchResponse is tagged with the correct request ID as this will be used by Numaflow for populating information required for system correctness like MessageID for the ISB deduplication. diff --git a/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml index 7bac7625..3df141e9 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml +++ b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml @@ -8,9 +8,6 @@ spec: source: http: {} - name: go-split - metadata: - annotations: - numaflow.numaproj.io/batch-map: "true" scale: min: 1 udf: diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 2d3fb639..2d46ef09 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -9,6 +9,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -36,9 +37,13 @@ func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - // write server info to the file + // create a server info to the file, we need to add metadata to ensure selection of the + // correct map mode, in this case batch map + serverInfo := info.GetDefaultServerInfo() + serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.BatchMap)} + // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index a8748b18..5923e4af 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -19,7 +19,7 @@ const ( uds = "unix" address = "/var/run/numaflow/batchmap.sock" defaultMaxMessageSize = 1024 * 1024 * 64 - serverInfoFilePath = "/var/run/numaflow/batchmapper-server-info" + serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) // Service implements the proto gen server interface and contains the map operation diff --git a/pkg/info/server_info.go b/pkg/info/server_info.go index 12abbb5c..a38423d0 100644 --- a/pkg/info/server_info.go +++ b/pkg/info/server_info.go @@ -13,7 +13,7 @@ import ( var END = fmt.Sprintf("%U__END__", '\\') // U+005C__END__ -func GetSDKVersion() string { +func getSDKVersion() string { version := "" info, ok := debug.ReadBuildInfo() if !ok { @@ -109,3 +109,11 @@ func Read(opts ...Option) (*ServerInfo, error) { } return info, nil } + +// GetDefaultServerInfo returns a ServerInfo object with the default fields populated for Go-SDK +func GetDefaultServerInfo() *ServerInfo { + serverInfo := &ServerInfo{Protocol: UDS, Language: Go, MinimumNumaflowVersion: MinimumNumaflowVersion, + Version: getSDKVersion()} + return serverInfo + +} diff --git a/pkg/info/server_info_test.go b/pkg/info/server_info_test.go index 75fdde1f..ab553a88 100644 --- a/pkg/info/server_info_test.go +++ b/pkg/info/server_info_test.go @@ -22,8 +22,8 @@ func Test_getSDKVersion(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := GetSDKVersion(); got != tt.want { - t.Errorf("GetSDKVersion() = %v, want %v", got, tt.want) + if got := getSDKVersion(); got != tt.want { + t.Errorf("getSDKVersion() = %v, want %v", got, tt.want) } }) } diff --git a/pkg/info/types.go b/pkg/info/types.go index 4bdf541c..c6d607fc 100644 --- a/pkg/info/types.go +++ b/pkg/info/types.go @@ -15,6 +15,17 @@ const ( Java Language = "java" ) +type MapMode string + +const ( + UnaryMap MapMode = "unary-map" + StreamMap MapMode = "stream-map" + BatchMap MapMode = "batch-map" +) + +// MapModeKey is the key used in the server info metadata map to indicate which map mode is enabled. +const MapModeKey = "MAP_MODE" + // MinimumNumaflowVersion specifies the minimum Numaflow version required by the current SDK version const MinimumNumaflowVersion = "1.2.0-rc4" diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 60399b24..17f3e331 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -9,6 +9,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -36,9 +37,13 @@ func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - // write server info to the file + // create a server info to the file, we need to add metadata to ensure selection of the + // correct map mode, in this case unary map + serverInfo := info.GetDefaultServerInfo() + serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.UnaryMap)} + // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index f080feaf..9bb39b20 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -8,6 +8,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -35,9 +36,13 @@ func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - // write server info to the file + // create a server info to the file, we need to add metadata to ensure selection of the + // correct map mode, in this case streaming map + serverInfo := info.GetDefaultServerInfo() + serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.StreamMap)} + // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index c272b7ce..2254f26f 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -12,7 +12,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/mapstream.sock" - serverInfoFilePath = "/var/run/numaflow/mapstreamer-server-info" + serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) // Service implements the proto gen server interface and contains the map diff --git a/pkg/reducer/server.go b/pkg/reducer/server.go index 53989ebf..6de6d60b 100644 --- a/pkg/reducer/server.go +++ b/pkg/reducer/server.go @@ -8,6 +8,7 @@ import ( numaflow "github.com/numaproj/numaflow-go/pkg" reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -37,7 +38,7 @@ func (r *server) Start(ctx context.Context) error { // write server info to the file // start listening on unix domain socket - lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/reducestreamer/server.go b/pkg/reducestreamer/server.go index b24eda57..0021b73c 100644 --- a/pkg/reducestreamer/server.go +++ b/pkg/reducestreamer/server.go @@ -8,6 +8,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -37,7 +38,7 @@ func (r *server) Start(ctx context.Context) error { // write server info to the file // start listening on unix domain socket - lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sessionreducer/server.go b/pkg/sessionreducer/server.go index 7bfaf664..ed5befb4 100644 --- a/pkg/sessionreducer/server.go +++ b/pkg/sessionreducer/server.go @@ -8,6 +8,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" sessionreducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/sessionreduce/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -37,7 +38,7 @@ func (r *server) Start(ctx context.Context) error { // write server info to the file // start listening on unix domain socket - lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/shared/util.go b/pkg/shared/util.go index ad4cb9e7..93fc7c4a 100644 --- a/pkg/shared/util.go +++ b/pkg/shared/util.go @@ -16,10 +16,13 @@ const ( uds = "unix" ) -func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) { +func PrepareServer(sockAddr string, infoFilePath string, serverInfo *info.ServerInfo) (net.Listener, error) { + // If serverInfo is not provided, then create a default server info instance. + if serverInfo == nil { + serverInfo = info.GetDefaultServerInfo() + } // If infoFilePath is not empty, write the server info to the file. if infoFilePath != "" { - serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, MinimumNumaflowVersion: info.MinimumNumaflowVersion, Version: info.GetSDKVersion()} if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil { return nil, err } diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index f1ad76c1..d6202d99 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -9,6 +9,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -37,7 +38,7 @@ func (s *server) Start(ctx context.Context) error { defer stop() // start listening on unix domain socket - lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sinker/server.go b/pkg/sinker/server.go index 4833961a..5ae8748a 100644 --- a/pkg/sinker/server.go +++ b/pkg/sinker/server.go @@ -9,6 +9,7 @@ import ( numaflow "github.com/numaproj/numaflow-go/pkg" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -36,7 +37,7 @@ func (s *sinkServer) Start(ctx context.Context) error { // write server info to the file // start listening on unix domain socket - lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sourcer/server.go b/pkg/sourcer/server.go index 58bd01b8..34708c91 100644 --- a/pkg/sourcer/server.go +++ b/pkg/sourcer/server.go @@ -9,6 +9,7 @@ import ( numaflow "github.com/numaproj/numaflow-go/pkg" sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -38,7 +39,7 @@ func (s *server) Start(ctx context.Context) error { // write server info to the file // start listening on unix domain socket - lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index 1df4f125..0797e1ec 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -9,6 +9,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -37,7 +38,7 @@ func (m *server) Start(ctx context.Context) error { // write server info to the file // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, info.GetDefaultServerInfo()) if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) }