Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add sideinput and redis-e2e-test sink examples #125

Merged
merged 3 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ jobs:
"pkg/mapper/examples/retry", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream",
"pkg/reducer/examples/counter", "pkg/reducer/examples/sum", "pkg/reducestreamer/examples/counter",
"pkg/reducestreamer/examples/sum", "pkg/sessionreducer/examples/counter", "pkg/sessionreducer/examples/sum",
"pkg/sideinput/examples/simple-sideinput/udf", "pkg/sideinput/examples/simple-sideinput", "pkg/sinker/examples/log",
"pkg/sideinput/examples/simple_sideinput/udf", "pkg/sideinput/examples/simple_sideinput", "pkg/sinker/examples/log",
"pkg/sourcer/examples/simple_source", "pkg/sourcetransformer/examples/assign_event_time", "pkg/sourcetransformer/examples/event_time_filter",
"pkg/sinker/examples/fallback"
"pkg/sinker/examples/fallback", "pkg/sideinput/examples/map_sideinput", "pkg/sideinput/examples/reduce_sideinput",
"pkg/sideinput/examples/sideinput_function", "pkg/sideinput/examples/simple_source_with_sideinput",
"pkg/sideinput/examples/sink_sideinput", "pkg/sinker/examples/redis-sink"
]

steps:
Expand Down
2 changes: 1 addition & 1 deletion pkg/mapper/examples/forward_message/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module even_odd
module forward_message

go 1.20

Expand Down
2 changes: 1 addition & 1 deletion pkg/sessionreducer/examples/sum/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module counter
module sum

go 1.20

Expand Down
20 changes: 20 additions & 0 deletions pkg/sideinput/examples/map_sideinput/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.12.3 as base
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/map-sideinput-example /bin/map-sideinput-example
RUN chmod +x /bin/map-sideinput-example

####################################################################################################
# sideinput
####################################################################################################
FROM scratch as sideinput
ARG ARCH
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/map-sideinput-example /bin/map-sideinput-example
ENTRYPOINT [ "/bin/map-sideinput-example" ]
18 changes: 18 additions & 0 deletions pkg/sideinput/examples/map_sideinput/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
TAG ?= stable
PUSH ?= false

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/map-sideinput-example main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/map-sideinput:${TAG}" --platform linux/amd64,linux/arm64 --target sideinput . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/map-sideinput:${TAG}" --target sideinput .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/map-sideinput:${TAG}"; fi

clean:
-rm -rf ./dist
49 changes: 49 additions & 0 deletions pkg/sideinput/examples/map_sideinput/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Map SideInput Example
An example that demonstrates how to write a [sideinput](https://numaflow.numaproj.io/user-guide/reference/side-inputs/) function along with a sample User Defined function which watches and used the corresponding side input with Mapper function.

### SideInput
```golang
// handle is the side input handler function.
func handle(_ context.Context) sideinputsdk.Message {
// generate message based on even and odd counter
counter++
if counter%2 == 0 {
return sideinputsdk.BroadcastMessage([]byte("even"))
}
// BroadcastMessage() is used to broadcast the message with the given value to other side input vertices.
// val must be converted to []byte.
return sideinputsdk.BroadcastMessage([]byte("odd"))
}
```

By using [UDF](#user-defined-function), User perform the retrieval/update for the side input value and perform some operation based via mapper function on it and broadcast/drop a message to other vertices.

### User Defined Function
The UDF vertex will watch for changes to this file and whenever there is a change it will read the file to obtain the new side input value.

### Pipeline spec
In the spec we need to define the side input vertex and the UDF vertex. The UDF vertex will have the side input vertex as a side input.

```yaml
spec:
sideInputs:
- name: myticker
container:
image: "quay.io/numaio/numaflow-go/map-sideinput:stable"
imagePullPolicy: Always
trigger:
schedule: "@every 5s"
```
Vertex spec for the UDF vertex:
```yaml
- name: si-e2e
udf:
container:
image: "quay.io/numaio/numaflow-go/map-sideinput-udf:stable"
imagePullPolicy: Always
sideInputs:
- myticker
```



17 changes: 17 additions & 0 deletions pkg/sideinput/examples/map_sideinput/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module map_sideinput

go 1.21.2

replace github.com/numaproj/numaflow-go => ../../../..

require github.com/numaproj/numaflow-go v0.7.0-rc2

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
29 changes: 29 additions & 0 deletions pkg/sideinput/examples/map_sideinput/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
45 changes: 45 additions & 0 deletions pkg/sideinput/examples/map_sideinput/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2023 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"log"

sideinputsdk "github.com/numaproj/numaflow-go/pkg/sideinput"
)

var counter = 0

// handle is the side input handler function.
func handle(_ context.Context) sideinputsdk.Message {
// generate message based on even and odd counter
counter++
if counter%2 == 0 {
return sideinputsdk.BroadcastMessage([]byte("even"))
}
// BroadcastMessage() is used to broadcast the message with the given value to other side input vertices.
// val must be converted to []byte.
return sideinputsdk.BroadcastMessage([]byte("odd"))
}

func main() {
// Start the side input server.
err := sideinputsdk.NewSideInputServer(sideinputsdk.RetrieveFunc(handle)).Start(context.Background())
if err != nil {
log.Panic("Failed to start side input server: ", err)
}
}
20 changes: 20 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.12.3 as base
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/map-sideinput-udf /bin/map-sideinput-udf
RUN chmod +x /bin/map-sideinput-udf

####################################################################################################
# udf-sideinput
####################################################################################################
FROM scratch as sideinput-udf
ARG ARCH
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/map-sideinput-udf /bin/map-sideinput-udf
ENTRYPOINT [ "/bin/map-sideinput-udf" ]
18 changes: 18 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
TAG ?= stable
PUSH ?= false

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/map-sideinput-udf main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/map-sideinput-udf:${TAG}" --platform linux/amd64,linux/arm64 --target sideinput-udf . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/map-sideinput-udf:${TAG}" --target sideinput-udf .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/map-sideinput-udf:${TAG}"; fi

clean:
-rm -rf ./dist
20 changes: 20 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module udf

go 1.21.2

replace github.com/numaproj/numaflow-go => ../../../../..

require (
github.com/fsnotify/fsnotify v1.6.0
github.com/numaproj/numaflow-go v0.7.0-rc2
)

require (
github.com/golang/protobuf v1.5.3 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
32 changes: 32 additions & 0 deletions pkg/sideinput/examples/map_sideinput/udf/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading
Loading