Skip to content

Commit

Permalink
move server info paths into packages
Browse files Browse the repository at this point in the history
Signed-off-by: a3hadi <[email protected]>
  • Loading branch information
ayildirim21 committed Feb 6, 2024
1 parent a17fed7 commit 50d710b
Show file tree
Hide file tree
Showing 19 changed files with 19 additions and 53 deletions.
11 changes: 1 addition & 10 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
package info

const (
ServerInfoFilePath = "/var/run/numaflow/server-info"
MapperServerInfoFilePath = "/var/run/numaflow/mapper-server-info"
MapstreamerServerInfoFilePath = "/var/run/numaflow/mapstreamer-sever-info"
ReducerServerInfoFilePath = "/var/run/numaflow/reducer-server-info"
ReducestreamerServerInfoFilePath = "/var/run/numaflow/reducestreamer-server-info"
SessionreducerServerInfoFilePath = "/var/run/numaflow/sessionreducer-server-ifno"
SideinputServerInfoFilePath = "/var/run/numaflow/sideinput-server-info"
SinkerServerInfoFilePath = "/var/run/numaflow/sinker-server-info"
SourcerServerInfoFilePath = "/var/run/numaflow/sourcer-server-info"
SourcetransformerServerInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info"
ServerInfoFilePath = "/var/run/numaflow/server-info"
)

type Protocol string
Expand Down
6 changes: 1 addition & 5 deletions pkg/mapper/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package mapper

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.MapperServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
uds = "unix"
address = "/var/run/numaflow/map.sock"
defaultMaxMessageSize = 1024 * 1024 * 64
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

// Service implements the proto gen server interface and contains the map operation
Expand Down
6 changes: 1 addition & 5 deletions pkg/mapstreamer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package mapstreamer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.MapstreamerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/mapstream.sock"
serverInfoFilePath = "/var/run/numaflow/mapstreamer-sever-info"
)

// Service implements the proto gen server interface and contains the map
Expand Down
6 changes: 1 addition & 5 deletions pkg/reducer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package reducer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ReducerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
winStartTime = "x-numaflow-win-start-time"
winEndTime = "x-numaflow-win-end-time"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/reducer-server-info"
)

// Service implements the proto gen server interface and contains the reduce operation handler.
Expand Down
6 changes: 1 addition & 5 deletions pkg/reducestreamer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package reducestreamer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ReducestreamerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
winStartTime = "x-numaflow-win-start-time"
winEndTime = "x-numaflow-win-end-time"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/reducestreamer-server-info"
)

// Service implements the proto gen server interface and contains the reduceStream operation handler.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sessionreducer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sessionreducer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.SessionreducerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/sessionreduce.sock"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/sessionreducer-server-info"
)

// Service implements the proto gen server interface and contains the sesionreduce operation handler.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package sideinput

import "github.com/numaproj/numaflow-go/pkg/info"

// options is the struct to hold the server options.
type options struct {
sockAddr string
Expand All @@ -17,7 +15,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.SideinputServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
address = "/var/run/numaflow/sideinput.sock"
DirPath = "/var/numaflow/side-inputs"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

// Service implements the proto gen server interface and contains the retrieve operation handler
Expand Down
6 changes: 1 addition & 5 deletions pkg/sinker/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sinker

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.SinkerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
)

// handlerDatum implements the Datum interface and is used in the sink functions.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sourcer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sourcer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.SourcerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/source.sock"
serverInfoFilePath = "/var/run/numaflow/sourcer-server-info"
)

// Service implements the proto gen server interface
Expand Down
6 changes: 1 addition & 5 deletions pkg/sourcetransformer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sourcetransformer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.SourcetransformerServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/sourcetransform.sock"
serverInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info"
)

// Service implements the proto gen server interface and contains the transformer operation
Expand Down

0 comments on commit 50d710b

Please sign in to comment.