Skip to content

Commit

Permalink
feat: Fallback Sink (#120)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Apr 16, 2024
1 parent 31f81f1 commit 9747b4d
Show file tree
Hide file tree
Showing 15 changed files with 192 additions and 84 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/proto/map/v1/map.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/mapstream/v1/mapstream.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/reduce/v1/reduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sideinput/v1/sideinput.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 103 additions & 45 deletions pkg/apis/proto/sink/v1/sink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions pkg/apis/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,24 @@ message ReadyResponse {
bool ready = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
}

/**
* SinkResponse is the individual response of each message written to the sink.
*/
message SinkResponse {
message Result {
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed.
bool success = 2;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/proto/source/v1/source.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apis/proto/sourcetransform/v1/transform.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sinker/examples/log/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/sink-log:${TAG}" --platform linux/amd64,linux/arm64 --target log . --push

.PHONY: image
imageimage: build
image: build
docker build -t "quay.io/numaio/numaflow-go/sink-log:${TAG}" --target log .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/sink-log:${TAG}"; fi

Expand Down
2 changes: 2 additions & 0 deletions pkg/sinker/examples/log/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ func (l *logSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum)
fmt.Println("User Defined Sink:", string(d.Value()))
id := d.ID()
result = result.Append(sinksdk.ResponseOK(id))
// if we are not able to write to sink and if we have a fallback sink configured
// we can use sinksdk.ResponseFallback(id)) to write the message to fallback sink
}
return result
}
Expand Down
15 changes: 13 additions & 2 deletions pkg/sinker/options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sinker

import "os"

type options struct {
sockAddr string
maxMessageSize int
Expand All @@ -10,10 +12,19 @@ type options struct {
type Option func(*options)

func defaultOptions() *options {
defaultPath := serverInfoFilePath
defaultAddress := address

// If the container type is fallback sink, then use the fallback sink address and path.
if os.Getenv(EnvUDContainerType) == UDContainerFallbackSink {
defaultPath = fbServerInfoFilePath
defaultAddress = fbAddress
}

return &options{
sockAddr: address,
sockAddr: defaultAddress,
maxMessageSize: defaultMaxMessageSize,
serverInfoFilePath: serverInfoFilePath,
serverInfoFilePath: defaultPath,
}
}

Expand Down
34 changes: 25 additions & 9 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ import (
)

const (
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
uds = "unix"
defaultMaxMessageSize = 1024 * 1024 * 64 // 64MB
address = "/var/run/numaflow/sink.sock"
fbAddress = "/var/run/numaflow/fb-sink.sock"
serverInfoFilePath = "/var/run/numaflow/sinker-server-info"
fbServerInfoFilePath = "/var/run/numaflow/fb-sinker-server-info"
EnvUDContainerType = "NUMAFLOW_UD_CONTAINER_TYPE"
UDContainerFallbackSink = "fb-udsink"
)

// handlerDatum implements the Datum interface and is used in the sink functions.
Expand Down Expand Up @@ -78,11 +82,23 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
defer wg.Done()
messages := fs.Sinker.Sink(ctx, datumStreamCh)
for _, msg := range messages {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Success: msg.Success,
ErrMsg: msg.Err,
})
if msg.Fallback {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_FALLBACK,
})
} else if msg.Success {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_SUCCESS,
})
} else {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_FAILURE,
ErrMsg: msg.Err,
})
}
}
}()

Expand Down
Loading

0 comments on commit 9747b4d

Please sign in to comment.