Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
Sidhant Kohli committed Jul 15, 2024
1 parent 15e4521 commit cffcb82
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 10 deletions.
3 changes: 0 additions & 3 deletions pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ spec:
source:
http: {}
- name: go-split
metadata:
annotations:
numaflow.numaproj.io/batch-map: "true"
scale:
min: 1
udf:
Expand Down
12 changes: 10 additions & 2 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/numaproj/numaflow-go/pkg"
batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -36,9 +37,16 @@ func (m *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file
// write server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case batch map
serverInfo := info.GetDefaultServerInfo()
serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.BatchMap)}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil {
return err
}

// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(m.opts.sockAddr, "")
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
uds = "unix"
address = "/var/run/numaflow/batchmap.sock"
defaultMaxMessageSize = 1024 * 1024 * 64
serverInfoFilePath = "/var/run/numaflow/batchmapper-server-info"
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

// Service implements the proto gen server interface and contains the map operation
Expand Down
7 changes: 7 additions & 0 deletions pkg/info/server_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,10 @@ func Read(opts ...Option) (*ServerInfo, error) {
}
return info, nil
}

func GetDefaultServerInfo() *ServerInfo {
serverInfo := &ServerInfo{Protocol: UDS, Language: Go, MinimumNumaflowVersion: MinimumNumaflowVersion,
Version: GetSDKVersion()}
return serverInfo

}
10 changes: 10 additions & 0 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ const (
Java Language = "java"
)

type MapMode string

const (
UnaryMap MapMode = "unary-map"
StreamMap MapMode = "stream-map"
BatchMap MapMode = "batch-map"
)

const MapModeMetadata = "MAP_MODE"

// MinimumNumaflowVersion specifies the minimum Numaflow version required by the current SDK version
const MinimumNumaflowVersion = "1.2.0-rc4"

Expand Down
12 changes: 10 additions & 2 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/numaproj/numaflow-go/pkg"
mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -35,9 +36,16 @@ func (m *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file
// write server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case streaming map
serverInfo := info.GetDefaultServerInfo()
serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.StreamMap)}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil {
return err
}

// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(m.opts.sockAddr, "")
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/mapstream.sock"
serverInfoFilePath = "/var/run/numaflow/mapstreamer-server-info"
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

// Service implements the proto gen server interface and contains the map
Expand Down
2 changes: 1 addition & 1 deletion pkg/shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) {
// If infoFilePath is not empty, write the server info to the file.
if infoFilePath != "" {
serverInfo := &info.ServerInfo{Protocol: info.UDS, Language: info.Go, MinimumNumaflowVersion: info.MinimumNumaflowVersion, Version: info.GetSDKVersion()}
serverInfo := info.GetDefaultServerInfo()
if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil {
return nil, err
}
Expand Down

0 comments on commit cffcb82

Please sign in to comment.