Skip to content

Commit

Permalink
chore: add sideinput and redis-e2e-test sink examples (#125)
Browse files Browse the repository at this point in the history
Signed-off-by: a3hadi <[email protected]>
  • Loading branch information
ayildirim21 authored Apr 23, 2024
1 parent e4b7585 commit 38f9fa2
Show file tree
Hide file tree
Showing 64 changed files with 1,921 additions and 9 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ jobs:
"pkg/mapper/examples/retry", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream",
"pkg/reducer/examples/counter", "pkg/reducer/examples/sum", "pkg/reducestreamer/examples/counter",
"pkg/reducestreamer/examples/sum", "pkg/sessionreducer/examples/counter", "pkg/sessionreducer/examples/sum",
"pkg/sideinput/examples/simple-sideinput/udf", "pkg/sideinput/examples/simple-sideinput", "pkg/sinker/examples/log",
"pkg/sideinput/examples/simple_sideinput/udf", "pkg/sideinput/examples/simple_sideinput", "pkg/sinker/examples/log",
"pkg/sourcer/examples/simple_source", "pkg/sourcetransformer/examples/assign_event_time", "pkg/sourcetransformer/examples/event_time_filter",
"pkg/sinker/examples/fallback"
"pkg/sinker/examples/fallback", "pkg/sideinput/examples/map_sideinput", "pkg/sideinput/examples/reduce_sideinput",
"pkg/sideinput/examples/sideinput_function", "pkg/sideinput/examples/simple_source_with_sideinput",
"pkg/sideinput/examples/sink_sideinput", "pkg/sinker/examples/redis-sink"
]

steps:
Expand Down
2 changes: 1 addition & 1 deletion pkg/mapper/examples/forward_message/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module even_odd
module forward_message

go 1.20

Expand Down
2 changes: 1 addition & 1 deletion pkg/sessionreducer/examples/sum/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module counter
module sum

go 1.20

Expand Down
20 changes: 20 additions & 0 deletions pkg/sideinput/examples/map_sideinput/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.12.3 as base
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/map-sideinput-example /bin/map-sideinput-example
RUN chmod +x /bin/map-sideinput-example

####################################################################################################
# sideinput
####################################################################################################
FROM scratch as sideinput
ARG ARCH
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/map-sideinput-example /bin/map-sideinput-example
ENTRYPOINT [ "/bin/map-sideinput-example" ]
18 changes: 18 additions & 0 deletions pkg/sideinput/examples/map_sideinput/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
TAG ?= stable
PUSH ?= false

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/map-sideinput-example main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/map-sideinput:${TAG}" --platform linux/amd64,linux/arm64 --target sideinput . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/map-sideinput:${TAG}" --target sideinput .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/map-sideinput:${TAG}"; fi

clean:
-rm -rf ./dist
49 changes: 49 additions & 0 deletions pkg/sideinput/examples/map_sideinput/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Map SideInput Example
An example that demonstrates how to write a [sideinput](https://numaflow.numaproj.io/user-guide/reference/side-inputs/) function along with a sample User Defined function which watches and used the corresponding side input with Mapper function.

### SideInput
```golang
// handle is the side input handler function.
func handle(_ context.Context) sideinputsdk.Message {
// generate message based on even and odd counter
counter++
if counter%2 == 0 {
return sideinputsdk.BroadcastMessage([]byte("even"))
}
// BroadcastMessage() is used to broadcast the message with the given value to other side input vertices.
// val must be converted to []byte.
return sideinputsdk.BroadcastMessage([]byte("odd"))
}
```

By using [UDF](#user-defined-function), User perform the retrieval/update for the side input value and perform some operation based via mapper function on it and broadcast/drop a message to other vertices.

### User Defined Function
The UDF vertex will watch for changes to this file and whenever there is a change it will read the file to obtain the new side input value.

### Pipeline spec
In the spec we need to define the side input vertex and the UDF vertex. The UDF vertex will have the side input vertex as a side input.

```yaml
spec:
sideInputs:
- name: myticker
container:
image: "quay.io/numaio/numaflow-go/map-sideinput:stable"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
```
Vertex spec for the UDF vertex:
```yaml
- name: si-e2e
udf:
container:
image: "quay.io/numaio/numaflow-go/map-sideinput-udf:stable"
imagePullPolicy: Always
sideInputs:
- myticker
```
17 changes: 17 additions & 0 deletions pkg/sideinput/examples/map_sideinput/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module map_sideinput

go 1.21.2

replace github.com/numaproj/numaflow-go => ../../../..

require github.com/numaproj/numaflow-go v0.7.0-rc2

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
29 changes: 29 additions & 0 deletions pkg/sideinput/examples/map_sideinput/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
45 changes: 45 additions & 0 deletions pkg/sideinput/examples/map_sideinput/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2023 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"log"

sideinputsdk "github.com/numaproj/numaflow-go/pkg/sideinput"
)

var counter = 0

// handle is the side input handler function.
func handle(_ context.Context) sideinputsdk.Message {
// generate message based on even and odd counter
counter++
if counter%2 == 0 {
return sideinputsdk.BroadcastMessage([]byte("even"))
}
// BroadcastMessage() is used to broadcast the message with the given value to other side input vertices.
// val must be converted to []byte.
return sideinputsdk.BroadcastMessage([]byte("odd"))
}

func main() {
// Start the side input server.
err := sideinputsdk.NewSideInputServer(sideinputsdk.RetrieveFunc(handle)).Start(context.Background())
if err != nil {
log.Panic("Failed to start side input server: ", err)
}
}
20 changes: 20 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.12.3 as base
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/map-sideinput-udf /bin/map-sideinput-udf
RUN chmod +x /bin/map-sideinput-udf

####################################################################################################
# udf-sideinput
####################################################################################################
FROM scratch as sideinput-udf
ARG ARCH
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/map-sideinput-udf /bin/map-sideinput-udf
ENTRYPOINT [ "/bin/map-sideinput-udf" ]
18 changes: 18 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
TAG ?= stable
PUSH ?= false

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/map-sideinput-udf main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/map-sideinput-udf:${TAG}" --platform linux/amd64,linux/arm64 --target sideinput-udf . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/map-sideinput-udf:${TAG}" --target sideinput-udf .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/map-sideinput-udf:${TAG}"; fi

clean:
-rm -rf ./dist
20 changes: 20 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module udf

go 1.21.2

replace github.com/numaproj/numaflow-go => ../../../../..

require (
github.com/fsnotify/fsnotify v1.6.0
github.com/numaproj/numaflow-go v0.7.0-rc2
)

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
32 changes: 32 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit 38f9fa2

Please sign in to comment.