Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: create new server info file paths for all services #98

Merged
merged 8 commits into from
Feb 12, 2024
Merged
4 changes: 1 addition & 3 deletions pkg/info/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ type options struct {
}

func defaultOptions() *options {
return &options{
svrInfoFilePath: ServerInfoFilePath,
}
return &options{}
}

type Option func(*options)
Expand Down
4 changes: 0 additions & 4 deletions pkg/info/types.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package info

const (
ServerInfoFilePath = "/var/run/numaflow/server-info"
)

type Protocol string

const (
Expand Down
6 changes: 1 addition & 5 deletions pkg/mapper/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package mapper

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
uds = "unix"
address = "/var/run/numaflow/map.sock"
defaultMaxMessageSize = 1024 * 1024 * 64
serverInfoFilePath = "/var/run/numaflow/mapper-server-info"
)

// Service implements the proto gen server interface and contains the map operation
Expand Down
6 changes: 1 addition & 5 deletions pkg/mapstreamer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package mapstreamer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/mapstream.sock"
serverInfoFilePath = "/var/run/numaflow/mapstreamer-server-info"
)

// Service implements the proto gen server interface and contains the map
Expand Down
6 changes: 1 addition & 5 deletions pkg/reducer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package reducer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
winStartTime = "x-numaflow-win-start-time"
winEndTime = "x-numaflow-win-end-time"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/reducer-server-info"
)

// Service implements the proto gen server interface and contains the reduce operation handler.
Expand Down
6 changes: 1 addition & 5 deletions pkg/reducestreamer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package reducestreamer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/reducestreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
winStartTime = "x-numaflow-win-start-time"
winEndTime = "x-numaflow-win-end-time"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/reducestreamer-server-info"
)

// Service implements the proto gen server interface and contains the reduceStream operation handler.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sessionreducer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sessionreducer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sessionreducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/sessionreduce.sock"
delimiter = ":"
serverInfoFilePath = "/var/run/numaflow/sessionreducer-server-info"
)

// Service implements the proto gen server interface and contains the sesionreduce operation handler.
Expand Down
4 changes: 1 addition & 3 deletions pkg/sideinput/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package sideinput

import "github.com/numaproj/numaflow-go/pkg/info"

// options is the struct to hold the server options.
type options struct {
sockAddr string
Expand All @@ -17,7 +15,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
address = "/var/run/numaflow/sideinput.sock"
DirPath = "/var/numaflow/side-inputs"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
serverInfoFilePath = "/var/run/numaflow/sideinput-server-info"
)

// Service implements the proto gen server interface and contains the retrieve operation handler
Expand Down
6 changes: 1 addition & 5 deletions pkg/sinker/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sinker

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
)

// handlerDatum implements the Datum interface and is used in the sink functions.
Expand Down
6 changes: 1 addition & 5 deletions pkg/sourcer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sourcer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/source.sock"
serverInfoFilePath = "/var/run/numaflow/sourcer-server-info"
)

// Service implements the proto gen server interface
Expand Down
6 changes: 1 addition & 5 deletions pkg/sourcetransformer/options.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package sourcetransformer

import (
"github.com/numaproj/numaflow-go/pkg/info"
)

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -17,7 +13,7 @@ func defaultOptions() *options {
return &options{
sockAddr: address,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: info.ServerInfoFilePath,
serverInfoFilePath: serverInfoFilePath,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64
address = "/var/run/numaflow/sourcetransform.sock"
serverInfoFilePath = "/var/run/numaflow/sourcetransformer-server-info"
)

// Service implements the proto gen server interface and contains the transformer operation
Expand Down
Loading