diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index fb5a4867..86b62584 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -450,7 +450,8 @@ type SinkResponse_Result struct { // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. Status Status `protobuf:"varint,2,opt,name=status,proto3,enum=sink.v1.Status" json:"status,omitempty"` // err_msg is the error message, set it if success is set to false. - ErrMsg string `protobuf:"bytes,3,opt,name=err_msg,json=errMsg,proto3" json:"err_msg,omitempty"` + ErrMsg string `protobuf:"bytes,3,opt,name=err_msg,json=errMsg,proto3" json:"err_msg,omitempty"` + ServeResponse []byte `protobuf:"bytes,4,opt,name=serve_response,json=serveResponse,proto3,oneof" json:"serve_response,omitempty"` } func (x *SinkResponse_Result) Reset() { @@ -506,6 +507,13 @@ func (x *SinkResponse_Result) GetErrMsg() string { return "" } +func (x *SinkResponse_Result) GetServeResponse() []byte { + if x != nil { + return x.ServeResponse + } + return nil +} + var File_pkg_apis_proto_sink_v1_sink_proto protoreflect.FileDescriptor var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ @@ -554,7 +562,7 @@ var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0x26, 0x0a, 0x12, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x22, 0xac, 0x02, 0x0a, 0x0c, 0x53, 0x69, 0x6e, 0x6b, 0x52, + 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x22, 0xec, 0x02, 0x0a, 0x0c, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, @@ -566,12 +574,16 @@ var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x01, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, - 0x1a, 0x5a, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, - 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x73, 0x69, 0x6e, - 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x65, 0x72, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x42, 0x0c, 0x0a, 0x0a, + 0x1a, 0x99, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x06, 0x73, + 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x73, 0x69, + 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x65, 0x72, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x2a, 0x0a, + 0x0e, 0x73, 0x65, 0x72, 0x76, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x0d, 0x73, 0x65, 0x72, 0x76, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x88, 0x01, 0x01, 0x42, 0x11, 0x0a, 0x0f, 0x5f, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2a, 0x30, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, @@ -733,6 +745,7 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[0].OneofWrappers = []any{} file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4].OneofWrappers = []any{} + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[7].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/pkg/apis/proto/sink/v1/sink.proto b/pkg/apis/proto/sink/v1/sink.proto index a042fa4d..73e91314 100644 --- a/pkg/apis/proto/sink/v1/sink.proto +++ b/pkg/apis/proto/sink/v1/sink.proto @@ -79,6 +79,7 @@ message SinkResponse { Status status = 2; // err_msg is the error message, set it if success is set to false. string err_msg = 3; + optional bytes serve_response = 4; } repeated Result results = 1; optional Handshake handshake = 2; diff --git a/pkg/sinker/examples/serve/Dockerfile b/pkg/sinker/examples/serve/Dockerfile new file mode 100644 index 00000000..7e9b5266 --- /dev/null +++ b/pkg/sinker/examples/serve/Dockerfile @@ -0,0 +1,20 @@ +#################################################################################################### +# base +#################################################################################################### +FROM alpine:3.20 AS base +ARG TARGETARCH +RUN apk update && apk upgrade && \ + apk add ca-certificates && \ + apk --no-cache add tzdata + +COPY dist/serve-example-${TARGETARCH} /bin/serve-example +RUN chmod +x /bin/serve-example + +#################################################################################################### +# serve +#################################################################################################### +FROM scratch AS serve +COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo +COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=base /bin/serve-example /bin/serve-example +ENTRYPOINT [ "/bin/serve-example" ] diff --git a/pkg/sinker/examples/serve/Makefile b/pkg/sinker/examples/serve/Makefile new file mode 100644 index 00000000..4c9b03b5 --- /dev/null +++ b/pkg/sinker/examples/serve/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/sink-serve:${TAG} +ARCHITECTURES = amd64 arm64 + +.PHONY: build +build: + for arch in $(ARCHITECTURES); do \ + CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serve-example-$${arch} main.go; \ + done + +.PHONY: image-push +image-push: build + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target serve . --push + +.PHONY: image +image: build + docker build -t ${IMAGE_REGISTRY} --target serve . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi + +clean: + -rm -rf ./dist diff --git a/pkg/sinker/examples/serve/README.md b/pkg/sinker/examples/serve/README.md new file mode 100644 index 00000000..38dcb54e --- /dev/null +++ b/pkg/sinker/examples/serve/README.md @@ -0,0 +1,3 @@ +# Serve + +An example User Defined Sink that returns payload of each incoming datum element to use as the result for a Serving source. diff --git a/pkg/sinker/examples/serve/go.mod b/pkg/sinker/examples/serve/go.mod new file mode 100644 index 00000000..df72efd8 --- /dev/null +++ b/pkg/sinker/examples/serve/go.mod @@ -0,0 +1,19 @@ +module serve_sink + +go 1.22 + +toolchain go1.23.1 + +replace github.com/numaproj/numaflow-go => ../../../.. + +require github.com/numaproj/numaflow-go v0.9.0 + +require ( + golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/grpc v1.66.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/pkg/sinker/examples/serve/go.sum b/pkg/sinker/examples/serve/go.sum new file mode 100644 index 00000000..36997a49 --- /dev/null +++ b/pkg/sinker/examples/serve/go.sum @@ -0,0 +1,24 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/sinker/examples/serve/main.go b/pkg/sinker/examples/serve/main.go new file mode 100644 index 00000000..1e5d3920 --- /dev/null +++ b/pkg/sinker/examples/serve/main.go @@ -0,0 +1,30 @@ +package main + +import ( + "context" + "log" + + sinksdk "github.com/numaproj/numaflow-go/pkg/sinker" +) + +// serveSink is a sinker implementation that logs the input to stdout +type serveSink struct { +} + +func (l *serveSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses { + result := sinksdk.ResponsesBuilder() + for d := range datumStreamCh { + id := d.ID() + result = result.Append(sinksdk.ResponseServe(id, d.Value())) + // if we are not able to write to sink and if we have a fallback sink configured + // we can use sinksdk.ResponseFallback(id)) to write the message to fallback sink + } + return result +} + +func main() { + err := sinksdk.NewServer(&serveSink{}).Start(context.Background()) + if err != nil { + log.Panic("Failed to start sink function server: ", err) + } +} diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index df689ba4..161638ee 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -219,8 +219,9 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer }) } else if msg.Success { resultList = append(resultList, &sinkpb.SinkResponse_Result{ - Id: msg.ID, - Status: sinkpb.Status_SUCCESS, + Id: msg.ID, + Status: sinkpb.Status_SUCCESS, + ServeResponse: msg.ServeResponse, }) } else { resultList = append(resultList, &sinkpb.SinkResponse_Result{ diff --git a/pkg/sinker/types.go b/pkg/sinker/types.go index 125b914c..6d0e6dee 100644 --- a/pkg/sinker/types.go +++ b/pkg/sinker/types.go @@ -9,7 +9,8 @@ type Response struct { // Err represents the error message when "success" is false. Err string `json:"err,omitempty"` // Fallback is true if the message to be sent to the fallback sink. - Fallback bool `json:"fallback,omitempty"` + Fallback bool `json:"fallback,omitempty"` + ServeResponse []byte `json:"serve_reponse,omitempty"` } type Responses []Response @@ -47,3 +48,7 @@ func ResponseFailure(id, errMsg string) Response { func ResponseFallback(id string) Response { return Response{ID: id, Fallback: true} } + +func ResponseServe(id string, result []byte) Response { + return Response{ID: id, Success: true, ServeResponse: result} +}