diff --git a/pkg/info/types.go b/pkg/info/types.go index 4caacb4c..2f7b606c 100644 --- a/pkg/info/types.go +++ b/pkg/info/types.go @@ -1,16 +1,7 @@ package info const ( - ServerInfoFilePath = "/var/run/numaflow/server-info" - MapperServerInfoFilePath = "/var/run/numaflow/mapper-server-info" - MapstreamerServerInfoFilePath = "/var/run/numaflow/mapstreamer-sever-info" - ReducerServerInfoFilePath = "/var/run/numaflow/reducer-server-info" - ReducestreamerServerInfoFilePath = "/var/run/numaflow/reducestreamer-server-info" - SessionreducerServerInfoFilePath = "/var/run/numaflow/sessionreducer-server-ifno" - SideinputServerInfoFilePath = "/var/run/numaflow/sideinput-server-info" - SinkerServerInfoFilePath = "/var/run/numaflow/sinker-server-info" - SourcerServerInfoFilePath = "/var/run/numaflow/sourcer-server-info" - SourcetransformerServerInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info" + ServerInfoFilePath = "/var/run/numaflow/server-info" ) type Protocol string diff --git a/pkg/mapper/options.go b/pkg/mapper/options.go index 92cbd6c3..5b1b89b5 100644 --- a/pkg/mapper/options.go +++ b/pkg/mapper/options.go @@ -1,9 +1,5 @@ package mapper -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.MapperServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 4e2ed223..16aab8a0 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -12,6 +12,7 @@ const ( uds = "unix" address = "/var/run/numaflow/map.sock" defaultMaxMessageSize = 1024 * 1024 * 64 + serverInfoFilePath = "/var/run/numaflow/mapper-server-info" ) // Service implements the proto gen server interface and contains the map operation diff --git a/pkg/mapstreamer/options.go b/pkg/mapstreamer/options.go index 2724dea6..42a9c4a1 100644 --- a/pkg/mapstreamer/options.go +++ b/pkg/mapstreamer/options.go @@ -1,9 +1,5 @@ package mapstreamer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.MapstreamerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index b3dc9c2b..b6eb36a3 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -12,6 +12,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/mapstream.sock" + serverInfoFilePath = "/var/run/numaflow/mapstreamer-sever-info" ) // Service implements the proto gen server interface and contains the map diff --git a/pkg/reducer/options.go b/pkg/reducer/options.go index 3ea5168f..a5b17605 100644 --- a/pkg/reducer/options.go +++ b/pkg/reducer/options.go @@ -1,9 +1,5 @@ package reducer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ReducerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/reducer/service.go b/pkg/reducer/service.go index 1a5148a8..e61e9b4d 100644 --- a/pkg/reducer/service.go +++ b/pkg/reducer/service.go @@ -19,6 +19,7 @@ const ( winStartTime = "x-numaflow-win-start-time" winEndTime = "x-numaflow-win-end-time" delimiter = ":" + serverInfoFilePath = "/var/run/numaflow/reducer-server-info" ) // Service implements the proto gen server interface and contains the reduce operation handler. diff --git a/pkg/reducestreamer/options.go b/pkg/reducestreamer/options.go index b7d05a5a..c4d51713 100644 --- a/pkg/reducestreamer/options.go +++ b/pkg/reducestreamer/options.go @@ -1,9 +1,5 @@ package reducestreamer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.ReducestreamerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/reducestreamer/service.go b/pkg/reducestreamer/service.go index 641ce86d..38182c23 100644 --- a/pkg/reducestreamer/service.go +++ b/pkg/reducestreamer/service.go @@ -19,6 +19,7 @@ const ( winStartTime = "x-numaflow-win-start-time" winEndTime = "x-numaflow-win-end-time" delimiter = ":" + serverInfoFilePath = "/var/run/numaflow/reducestreamer-server-info" ) // Service implements the proto gen server interface and contains the reduceStream operation handler. diff --git a/pkg/sessionreducer/options.go b/pkg/sessionreducer/options.go index 58a3ec76..4572f7d2 100644 --- a/pkg/sessionreducer/options.go +++ b/pkg/sessionreducer/options.go @@ -1,9 +1,5 @@ package sessionreducer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.SessionreducerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sessionreducer/service.go b/pkg/sessionreducer/service.go index b3ddd08b..cf033ffa 100644 --- a/pkg/sessionreducer/service.go +++ b/pkg/sessionreducer/service.go @@ -17,6 +17,7 @@ const ( defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/sessionreduce.sock" delimiter = ":" + serverInfoFilePath = "/var/run/numaflow/sessionreducer-server-info" ) // Service implements the proto gen server interface and contains the sesionreduce operation handler. diff --git a/pkg/sideinput/options.go b/pkg/sideinput/options.go index fd4a669d..7740be62 100644 --- a/pkg/sideinput/options.go +++ b/pkg/sideinput/options.go @@ -1,7 +1,5 @@ package sideinput -import "github.com/numaproj/numaflow-go/pkg/info" - // options is the struct to hold the server options. type options struct { sockAddr string @@ -17,7 +15,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.SideinputServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 6967e790..7a1e5d53 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -13,6 +13,7 @@ const ( address = "/var/run/numaflow/sideinput.sock" DirPath = "/var/numaflow/side-inputs" defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB + serverInfoFilePath = "/var/run/numaflow/sideinput-server-info" ) // Service implements the proto gen server interface and contains the retrieve operation handler diff --git a/pkg/sinker/options.go b/pkg/sinker/options.go index bed2f22a..a913ae8b 100644 --- a/pkg/sinker/options.go +++ b/pkg/sinker/options.go @@ -1,9 +1,5 @@ package sinker -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.SinkerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index ef424afc..d008c1a3 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -15,6 +15,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB address = "/var/run/numaflow/sink.sock" + serverInfoFilePath = "/var/run/numaflow/sinker-server-info" ) // handlerDatum implements the Datum interface and is used in the sink functions. diff --git a/pkg/sourcer/options.go b/pkg/sourcer/options.go index 6980fdd3..b91af7ff 100644 --- a/pkg/sourcer/options.go +++ b/pkg/sourcer/options.go @@ -1,9 +1,5 @@ package sourcer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.SourcerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 962691b1..33d82743 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -14,6 +14,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB address = "/var/run/numaflow/source.sock" + serverInfoFilePath = "/var/run/numaflow/sourcer-server-info" ) // Service implements the proto gen server interface diff --git a/pkg/sourcetransformer/options.go b/pkg/sourcetransformer/options.go index f962f6a0..410b685c 100644 --- a/pkg/sourcetransformer/options.go +++ b/pkg/sourcetransformer/options.go @@ -1,9 +1,5 @@ package sourcetransformer -import ( - "github.com/numaproj/numaflow-go/pkg/info" -) - type options struct { sockAddr string maxMessageSize int @@ -17,7 +13,7 @@ func defaultOptions() *options { return &options{ sockAddr: address, maxMessageSize: defaultMaxMessageSize, - serverInfoFilePath: info.SourcetransformerServerInfoFilePath, + serverInfoFilePath: serverInfoFilePath, } } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 77c7bfec..327c7fdd 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -13,6 +13,7 @@ const ( uds = "unix" defaultMaxMessageSize = 1024 * 1024 * 64 address = "/var/run/numaflow/sourcetransform.sock" + serverInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info" ) // Service implements the proto gen server interface and contains the transformer operation