Skip to content

Commit

Permalink
feat: Sink serve API (#178)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing authored Feb 23, 2025
1 parent f9fef63 commit 7a75e8e
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 11 deletions.
29 changes: 21 additions & 8 deletions pkg/apis/proto/sink/v1/sink.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/proto/sink/v1/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ message SinkResponse {
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
optional bytes serve_response = 4;
}
repeated Result results = 1;
optional Handshake handshake = 2;
Expand Down
20 changes: 20 additions & 0 deletions pkg/sinker/examples/serve/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.20 AS base
ARG TARGETARCH
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/serve-example-${TARGETARCH} /bin/serve-example
RUN chmod +x /bin/serve-example

####################################################################################################
# serve
####################################################################################################
FROM scratch AS serve
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/serve-example /bin/serve-example
ENTRYPOINT [ "/bin/serve-example" ]
22 changes: 22 additions & 0 deletions pkg/sinker/examples/serve/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/sink-serve:${TAG}
ARCHITECTURES = amd64 arm64

.PHONY: build
build:
for arch in $(ARCHITECTURES); do \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/serve-example-$${arch} main.go; \
done

.PHONY: image-push
image-push: build
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target serve . --push

.PHONY: image
image: build
docker build -t ${IMAGE_REGISTRY} --target serve .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

clean:
-rm -rf ./dist
3 changes: 3 additions & 0 deletions pkg/sinker/examples/serve/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Serve

An example User Defined Sink that returns payload of each incoming datum element to use as the result for a Serving source.
19 changes: 19 additions & 0 deletions pkg/sinker/examples/serve/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
module serve_sink

go 1.22

toolchain go1.23.1

replace github.com/numaproj/numaflow-go => ../../../..

require github.com/numaproj/numaflow-go v0.9.0

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
24 changes: 24 additions & 0 deletions pkg/sinker/examples/serve/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
30 changes: 30 additions & 0 deletions pkg/sinker/examples/serve/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package main

import (
"context"
"log"

sinksdk "github.com/numaproj/numaflow-go/pkg/sinker"
)

// serveSink is a sinker implementation that logs the input to stdout
type serveSink struct {
}

func (l *serveSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk.Datum) sinksdk.Responses {
result := sinksdk.ResponsesBuilder()
for d := range datumStreamCh {
id := d.ID()
result = result.Append(sinksdk.ResponseServe(id, d.Value()))
// if we are not able to write to sink and if we have a fallback sink configured
// we can use sinksdk.ResponseFallback(id)) to write the message to fallback sink
}
return result
}

func main() {
err := sinksdk.NewServer(&serveSink{}).Start(context.Background())
if err != nil {
log.Panic("Failed to start sink function server: ", err)
}
}
5 changes: 3 additions & 2 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer
})
} else if msg.Success {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Id: msg.ID,
Status: sinkpb.Status_SUCCESS,
Id: msg.ID,
Status: sinkpb.Status_SUCCESS,
ServeResponse: msg.ServeResponse,
})
} else {
resultList = append(resultList, &sinkpb.SinkResponse_Result{
Expand Down
7 changes: 6 additions & 1 deletion pkg/sinker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ type Response struct {
// Err represents the error message when "success" is false.
Err string `json:"err,omitempty"`
// Fallback is true if the message to be sent to the fallback sink.
Fallback bool `json:"fallback,omitempty"`
Fallback bool `json:"fallback,omitempty"`
ServeResponse []byte `json:"serve_reponse,omitempty"`
}

type Responses []Response
Expand Down Expand Up @@ -47,3 +48,7 @@ func ResponseFailure(id, errMsg string) Response {
func ResponseFallback(id string) Response {
return Response{ID: id, Fallback: true}
}

func ResponseServe(id string, result []byte) Response {
return Response{ID: id, Success: true, ServeResponse: result}
}

0 comments on commit 7a75e8e

Please sign in to comment.