From cffcb82ec7265b8b4f54ccabdfaf1d8a00de4bda Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 15 Jul 2024 11:54:39 -0700 Subject: [PATCH] refactor Signed-off-by: Sidhant Kohli --- .../examples/batchmap-flatmap/pipeline.yaml | 3 --- pkg/batchmapper/server.go | 12 ++++++++++-- pkg/batchmapper/service.go | 2 +- pkg/info/server_info.go | 7 +++++++ pkg/info/types.go | 10 ++++++++++ pkg/mapstreamer/server.go | 12 ++++++++++-- pkg/mapstreamer/service.go | 2 +- pkg/shared/util.go | 2 +- 8 files changed, 40 insertions(+), 10 deletions(-) diff --git a/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml index 7bac7625..3df141e9 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml +++ b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml @@ -8,9 +8,6 @@ spec: source: http: {} - name: go-split - metadata: - annotations: - numaflow.numaproj.io/batch-map: "true" scale: min: 1 udf: diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 2d3fb639..fe59dbf1 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -9,6 +9,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -36,9 +37,16 @@ func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - // write server info to the file + // write server info to the file, we need to add metadata to ensure selection of the + // correct map mode, in this case batch map + serverInfo := info.GetDefaultServerInfo() + serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.BatchMap)} + if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil { + return err + } + // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(m.opts.sockAddr, "") if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index a8748b18..5923e4af 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -19,7 +19,7 @@ const ( uds = "unix" address = "/var/run/numaflow/batchmap.sock" defaultMaxMessageSize = 1024 * 1024 * 64 - serverInfoFilePath = "/var/run/numaflow/batchmapper-server-info" + serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) // Service implements the proto gen server interface and contains the map operation diff --git a/pkg/info/server_info.go b/pkg/info/server_info.go index 12abbb5c..843aa1e5 100644 --- a/pkg/info/server_info.go +++ b/pkg/info/server_info.go @@ -109,3 +109,10 @@ func Read(opts ...Option) (*ServerInfo, error) { } return info, nil } + +func GetDefaultServerInfo() *ServerInfo { + serverInfo := &ServerInfo{Protocol: UDS, Language: Go, MinimumNumaflowVersion: MinimumNumaflowVersion, + Version: GetSDKVersion()} + return serverInfo + +} diff --git a/pkg/info/types.go b/pkg/info/types.go index 4bdf541c..eb8f0a85 100644 --- a/pkg/info/types.go +++ b/pkg/info/types.go @@ -15,6 +15,16 @@ const ( Java Language = "java" ) +type MapMode string + +const ( + UnaryMap MapMode = "unary-map" + StreamMap MapMode = "stream-map" + BatchMap MapMode = "batch-map" +) + +const MapModeMetadata = "MAP_MODE" + // MinimumNumaflowVersion specifies the minimum Numaflow version required by the current SDK version const MinimumNumaflowVersion = "1.2.0-rc4" diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index f080feaf..3260ee5b 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -8,6 +8,7 @@ import ( "github.com/numaproj/numaflow-go/pkg" mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" + "github.com/numaproj/numaflow-go/pkg/info" "github.com/numaproj/numaflow-go/pkg/shared" ) @@ -35,9 +36,16 @@ func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() - // write server info to the file + // write server info to the file, we need to add metadata to ensure selection of the + // correct map mode, in this case streaming map + serverInfo := info.GetDefaultServerInfo() + serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.StreamMap)} + if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil { + return err + } + // start listening on unix domain socket - lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + lis, err := shared.PrepareServer(m.opts.sockAddr, "") if err != nil { return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) } diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index c272b7ce..2254f26f 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -12,7 +12,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/mapstream.sock" - serverInfoFilePath = "/var/run/numaflow/mapstreamer-server-info" + serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) // Service implements the proto gen server interface and contains the map diff --git a/pkg/shared/util.go b/pkg/shared/util.go index ad4cb9e7..7e4f99f3 100644 --- a/pkg/shared/util.go +++ b/pkg/shared/util.go @@ -19,7 +19,7 @@ const ( func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) { // If infoFilePath is not empty, write the server info to the file. if infoFilePath != "" { - serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, MinimumNumaflowVersion: info.MinimumNumaflowVersion, Version: info.GetSDKVersion()} + serverInfo := info.GetDefaultServerInfo() if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil { return nil, err }