Skip to content

Commit

Permalink
fb-sink
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Apr 14, 2024
1 parent 31f81f1 commit 34c9ea4
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 83 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/proto/map/v1/map.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/mapstream/v1/mapstream.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/reduce/v1/reduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sideinput/v1/sideinput.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 103 additions & 45 deletions pkg/apis/proto/sink/v1/sink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions pkg/apis/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,24 @@ message ReadyResponse {
bool ready = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
}

/**
* SinkResponse is the individual response of each message written to the sink.
*/
message SinkResponse {
message Result {
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed.
bool success = 2;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/proto/source/v1/source.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sourcetransform/v1/transform.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions pkg/sinker/options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sinker

import "os"

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -10,10 +12,18 @@ type options struct {
type Option func(*options)

func defaultOptions() *options {
defaultPath := serverInfoFilePath
defaultAddress := address

if os.Getenv(EnvUDContainerType) == UDContainerFallbackSink {
defaultPath = fbServerInfoFilePath
defaultAddress = fbAddress
}

return &options{
sockAddr: address,
sockAddr: defaultAddress,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: serverInfoFilePath,
serverInfoFilePath: defaultPath,
}
}

Expand Down
34 changes: 25 additions & 9 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
)

const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
fbAddress = "/var/run/numaflow/fb-sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
fbServerInfoFilePath = "/var/run/numaflow/fb-sinker-server-info"
EnvUDContainerType = "EnvUDContainerType"
UDContainerFallbackSink = "fb-sink"
)

// handlerDatum implements the Datum interface and is used in the sink functions.
Expand Down Expand Up @@ -78,11 +82,23 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
defer wg.Done()
messages := fs.Sinker.Sink(ctx, datumStreamCh)
for _, msg := range messages {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Success: msg.Success,
ErrMsg: msg.Err,
})
if msg.Fallback {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_FALLBACK,
})
} else if msg.Success {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_SUCCESS,
})
} else {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_FAILURE,
ErrMsg: msg.Err,
})
}
}
}()

Expand Down
Loading

0 comments on commit 34c9ea4

Please sign in to comment.