Skip to content

Commit

Permalink
impl
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
Sidhant Kohli committed Jul 17, 2024
1 parent 67cda42 commit abe8cb2
Show file tree
Hide file tree
Showing 13 changed files with 31 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
udf:
container:
# Split input message into an array with comma, see https://github.com/numaproj/numaflow-go/tree/main/pkg/batchmapper/examples/batchmap-flatmap
image: quay.io/numaio/numaflow-go/map-flatmap:stable
image: quay.io/numaio/numaflow-go/batch-map-flatmap:stable
imagePullPolicy: Always
- name: go-udsink
scale:
Expand Down
9 changes: 3 additions & 6 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@ func (m *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file, we need to add metadata to ensure selection of the
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case batch map
serverInfo := info.GetDefaultServerInfo()
serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.BatchMap)}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil {
return err
}
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.BatchMap)}

// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, "")
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ const (
BatchMap MapMode = "batch-map"
)

// MapModeMetadata field is used to indicate which map mode is enabled
const MapModeMetadata = "MAP_MODE"
// MapModeKey is the key used in the server info metadata map to indicate which map mode is enabled.
const MapModeKey = "MAP_MODE"

// MinimumNumaflowVersion specifies the minimum Numaflow version required by the current SDK version
const MinimumNumaflowVersion = "1.2.0-rc4"
Expand Down
9 changes: 3 additions & 6 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,13 @@ func (m *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file, we need to add metadata to ensure selection of the
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case unary map
serverInfo := info.GetDefaultServerInfo()
serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.UnaryMap)}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil {
return err
}
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.UnaryMap)}

// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, "")
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
9 changes: 3 additions & 6 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,13 @@ func (m *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()

// write server info to the file, we need to add metadata to ensure selection of the
// create a server info to the file, we need to add metadata to ensure selection of the
// correct map mode, in this case streaming map
serverInfo := info.GetDefaultServerInfo()
serverInfo.Metadata = map[string]string{info.MapModeMetadata: string(info.StreamMap)}
if err := info.Write(serverInfo, info.WithServerInfoFilePath(m.opts.serverInfoFilePath)); err != nil {
return err
}
serverInfo.Metadata = map[string]string{info.MapModeKey: string(info.StreamMap)}

// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, "")
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, serverInfo)
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/reducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

numaflow "github.com/numaproj/numaflow-go/pkg"
reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -37,7 +38,7 @@ func (r *server) Start(ctx context.Context) error {

// write server info to the file
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/reducestreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/numaproj/numaflow-go/pkg"
reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -37,7 +38,7 @@ func (r *server) Start(ctx context.Context) error {

// write server info to the file
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sessionreducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/numaproj/numaflow-go/pkg"
sessionreducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/sessionreduce/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -37,7 +38,7 @@ func (r *server) Start(ctx context.Context) error {

// write server info to the file
// start listening on unix domain socket
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(r.opts.sockAddr, r.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ const (
uds = "unix"
)

func PrepareServer(sockAddr string, infoFilePath string) (net.Listener, error) {
func PrepareServer(sockAddr string, infoFilePath string, serverInfo *info.ServerInfo) (net.Listener, error) {
// If serverInfo is not provided, then create a default server info instance.
if serverInfo != nil {
serverInfo = info.GetDefaultServerInfo()
}
// If infoFilePath is not empty, write the server info to the file.
if infoFilePath != "" {
serverInfo := info.GetDefaultServerInfo()
if err := info.Write(serverInfo, info.WithServerInfoFilePath(infoFilePath)); err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/numaproj/numaflow-go/pkg"
sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -37,7 +38,7 @@ func (s *server) Start(ctx context.Context) error {
defer stop()

// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sinker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

numaflow "github.com/numaproj/numaflow-go/pkg"
sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -36,7 +37,7 @@ func (s *sinkServer) Start(ctx context.Context) error {

// write server info to the file
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sourcer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

numaflow "github.com/numaproj/numaflow-go/pkg"
sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -38,7 +39,7 @@ func (s *server) Start(ctx context.Context) error {

// write server info to the file
// start listening on unix domain socket
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(s.opts.sockAddr, s.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sourcetransformer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/numaproj/numaflow-go/pkg"
v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1"
"github.com/numaproj/numaflow-go/pkg/info"
"github.com/numaproj/numaflow-go/pkg/shared"
)

Expand Down Expand Up @@ -37,7 +38,7 @@ func (m *server) Start(ctx context.Context) error {

// write server info to the file
// start listening on unix domain socket
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath)
lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath, info.GetDefaultServerInfo())
if err != nil {
return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err)
}
Expand Down

0 comments on commit abe8cb2

Please sign in to comment.