From 2b9e9e9284b6ab0e8bfaefa409e81756bfddf5e5 Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Thu, 16 Jan 2025 19:26:54 -0800 Subject: [PATCH] feat: new Mapper example which serves as a slow container for testing purposes (#172) --- .github/workflows/build-push.yaml | 2 +- pkg/mapper/examples/slow_cat/Dockerfile | 20 ++++++++++ pkg/mapper/examples/slow_cat/Makefile | 22 +++++++++++ pkg/mapper/examples/slow_cat/README.md | 3 ++ pkg/mapper/examples/slow_cat/go.mod | 17 +++++++++ pkg/mapper/examples/slow_cat/go.sum | 24 ++++++++++++ pkg/mapper/examples/slow_cat/main.go | 49 +++++++++++++++++++++++++ 7 files changed, 136 insertions(+), 1 deletion(-) create mode 100644 pkg/mapper/examples/slow_cat/Dockerfile create mode 100644 pkg/mapper/examples/slow_cat/Makefile create mode 100644 pkg/mapper/examples/slow_cat/README.md create mode 100644 pkg/mapper/examples/slow_cat/go.mod create mode 100644 pkg/mapper/examples/slow_cat/go.sum create mode 100644 pkg/mapper/examples/slow_cat/main.go diff --git a/.github/workflows/build-push.yaml b/.github/workflows/build-push.yaml index 29a66275..4fb742c8 100644 --- a/.github/workflows/build-push.yaml +++ b/.github/workflows/build-push.yaml @@ -17,7 +17,7 @@ jobs: matrix: dockerfile_paths: [ "pkg/mapper/examples/even_odd", "pkg/mapper/examples/flatmap", "pkg/mapper/examples/forward_message", - "pkg/mapper/examples/retry", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream", + "pkg/mapper/examples/retry", "pkg/mapper/examples/slow_cat", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream", "pkg/reducer/examples/counter", "pkg/reducer/examples/sum", "pkg/reducestreamer/examples/counter", "pkg/reducestreamer/examples/sum", "pkg/sessionreducer/examples/counter", "pkg/sessionreducer/examples/sum", "pkg/sideinput/examples/simple_sideinput/udf", "pkg/sideinput/examples/simple_sideinput", "pkg/sinker/examples/log", diff --git a/pkg/mapper/examples/slow_cat/Dockerfile b/pkg/mapper/examples/slow_cat/Dockerfile new file mode 100644 index 00000000..53c96093 --- /dev/null +++ b/pkg/mapper/examples/slow_cat/Dockerfile @@ -0,0 +1,20 @@ +#################################################################################################### +# base +#################################################################################################### +FROM alpine:3.20 AS base +ARG TARGETARCH +RUN apk update && apk upgrade && \ + apk add ca-certificates && \ + apk --no-cache add tzdata + +COPY dist/slow-cat-example-${TARGETARCH} /bin/slow-cat-example +RUN chmod +x /bin/slow-cat-example + +#################################################################################################### +# slow-cat +#################################################################################################### +FROM scratch AS slow-cat +COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo +COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=base /bin/slow-cat-example /bin/slow-cat-example +ENTRYPOINT [ "/bin/slow-cat-example" ] diff --git a/pkg/mapper/examples/slow_cat/Makefile b/pkg/mapper/examples/slow_cat/Makefile new file mode 100644 index 00000000..b2359a90 --- /dev/null +++ b/pkg/mapper/examples/slow_cat/Makefile @@ -0,0 +1,22 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/map-slow-cat:${TAG} +ARCHITECTURES = amd64 arm64 + +.PHONY: build +build: + for arch in $(ARCHITECTURES); do \ + CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/slow-cat-example-$${arch} main.go; \ + done + +.PHONY: image-push +image-push: build + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target slow-cat . --push + +.PHONY: image +image: build + docker build -t ${IMAGE_REGISTRY} --target slow-cat . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi + +clean: + -rm -rf ./dist diff --git a/pkg/mapper/examples/slow_cat/README.md b/pkg/mapper/examples/slow_cat/README.md new file mode 100644 index 00000000..36ca63cb --- /dev/null +++ b/pkg/mapper/examples/slow_cat/README.md @@ -0,0 +1,3 @@ +# Slow Cat + +An example User Defined Function that outputs the same value as the input, but first does a sleep. This example is for the purpose of performing testing with a slow vertex. diff --git a/pkg/mapper/examples/slow_cat/go.mod b/pkg/mapper/examples/slow_cat/go.mod new file mode 100644 index 00000000..85dc57e0 --- /dev/null +++ b/pkg/mapper/examples/slow_cat/go.mod @@ -0,0 +1,17 @@ +module even_odd + +go 1.21 + +replace github.com/numaproj/numaflow-go => ../../../.. + +require github.com/numaproj/numaflow-go v0.9.0 + +require ( + golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.25.0 // indirect + golang.org/x/text v0.18.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect + google.golang.org/grpc v1.66.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect +) diff --git a/pkg/mapper/examples/slow_cat/go.sum b/pkg/mapper/examples/slow_cat/go.sum new file mode 100644 index 00000000..36997a49 --- /dev/null +++ b/pkg/mapper/examples/slow_cat/go.sum @@ -0,0 +1,24 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= +golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= +google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/mapper/examples/slow_cat/main.go b/pkg/mapper/examples/slow_cat/main.go new file mode 100644 index 00000000..69bca4b2 --- /dev/null +++ b/pkg/mapper/examples/slow_cat/main.go @@ -0,0 +1,49 @@ +package main + +import ( + "context" + "log" + "os" + "strconv" + "time" + + "github.com/numaproj/numaflow-go/pkg/mapper" +) + +const DEFAULT_SLEEP_SECONDS = 10 + +type SlowCat struct { + sleepSeconds int +} + +func (e *SlowCat) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages { + time.Sleep(time.Duration(e.sleepSeconds) * time.Second) + return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys)) +} + +func main() { + err := mapper.NewServer(&SlowCat{sleepSeconds: getSleepSeconds()}).Start(context.Background()) + if err != nil { + log.Panic("Failed to start map function server: ", err) + } +} + +func getSleepSeconds() int { + + // sleep time is configured according to environment variable (or default if not configured) + + sleepSeconds := DEFAULT_SLEEP_SECONDS + secondsString := os.Getenv("SLEEP_SECONDS") + if secondsString == "" { + log.Printf("SLEEP_SECONDS environment variable not set, using default %d seconds\n", DEFAULT_SLEEP_SECONDS) + } else { + val, err := strconv.Atoi(secondsString) + if err != nil { + log.Printf("SLEEP_SECONDS environment variable %q not an int, using default %d seconds\n", secondsString, DEFAULT_SLEEP_SECONDS) + } else { + sleepSeconds = val + log.Printf("Using SLEEP_SECONDS value %d\n", sleepSeconds) + } + } + return sleepSeconds +}