Skip to content

Commit

Permalink
feat: new Mapper example which serves as a slow container for testing…
Browse files Browse the repository at this point in the history
… purposes (#172)
  • Loading branch information
juliev0 authored Jan 17, 2025
1 parent d8c236b commit 2b9e9e9
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 1 deletion.
2 changes: 1 addition & 1 deletion .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
dockerfile_paths: [
"pkg/mapper/examples/even_odd", "pkg/mapper/examples/flatmap", "pkg/mapper/examples/forward_message",
"pkg/mapper/examples/retry", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream",
"pkg/mapper/examples/retry", "pkg/mapper/examples/slow_cat", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream",
"pkg/reducer/examples/counter", "pkg/reducer/examples/sum", "pkg/reducestreamer/examples/counter",
"pkg/reducestreamer/examples/sum", "pkg/sessionreducer/examples/counter", "pkg/sessionreducer/examples/sum",
"pkg/sideinput/examples/simple_sideinput/udf", "pkg/sideinput/examples/simple_sideinput", "pkg/sinker/examples/log",
Expand Down
20 changes: 20 additions & 0 deletions pkg/mapper/examples/slow_cat/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
####################################################################################################
# base
####################################################################################################
FROM alpine:3.20 AS base
ARG TARGETARCH
RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/slow-cat-example-${TARGETARCH} /bin/slow-cat-example
RUN chmod +x /bin/slow-cat-example

####################################################################################################
# slow-cat
####################################################################################################
FROM scratch AS slow-cat
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/slow-cat-example /bin/slow-cat-example
ENTRYPOINT [ "/bin/slow-cat-example" ]
22 changes: 22 additions & 0 deletions pkg/mapper/examples/slow_cat/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/map-slow-cat:${TAG}
ARCHITECTURES = amd64 arm64

.PHONY: build
build:
for arch in $(ARCHITECTURES); do \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/slow-cat-example-$${arch} main.go; \
done

.PHONY: image-push
image-push: build
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target slow-cat . --push

.PHONY: image
image: build
docker build -t ${IMAGE_REGISTRY} --target slow-cat .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

clean:
-rm -rf ./dist
3 changes: 3 additions & 0 deletions pkg/mapper/examples/slow_cat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Slow Cat

An example User Defined Function that outputs the same value as the input, but first does a sleep. This example is for the purpose of performing testing with a slow vertex.
17 changes: 17 additions & 0 deletions pkg/mapper/examples/slow_cat/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module even_odd

go 1.21

replace github.com/numaproj/numaflow-go => ../../../..

require github.com/numaproj/numaflow-go v0.9.0

require (
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
24 changes: 24 additions & 0 deletions pkg/mapper/examples/slow_cat/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
49 changes: 49 additions & 0 deletions pkg/mapper/examples/slow_cat/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package main

import (
"context"
"log"
"os"
"strconv"
"time"

"github.com/numaproj/numaflow-go/pkg/mapper"
)

const DEFAULT_SLEEP_SECONDS = 10

type SlowCat struct {
sleepSeconds int
}

func (e *SlowCat) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages {
time.Sleep(time.Duration(e.sleepSeconds) * time.Second)
return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys))
}

func main() {
err := mapper.NewServer(&SlowCat{sleepSeconds: getSleepSeconds()}).Start(context.Background())
if err != nil {
log.Panic("Failed to start map function server: ", err)
}
}

func getSleepSeconds() int {

// sleep time is configured according to environment variable (or default if not configured)

sleepSeconds := DEFAULT_SLEEP_SECONDS
secondsString := os.Getenv("SLEEP_SECONDS")
if secondsString == "" {
log.Printf("SLEEP_SECONDS environment variable not set, using default %d seconds\n", DEFAULT_SLEEP_SECONDS)
} else {
val, err := strconv.Atoi(secondsString)
if err != nil {
log.Printf("SLEEP_SECONDS environment variable %q not an int, using default %d seconds\n", secondsString, DEFAULT_SLEEP_SECONDS)
} else {
sleepSeconds = val
log.Printf("Using SLEEP_SECONDS value %d\n", sleepSeconds)
}
}
return sleepSeconds
}

0 comments on commit 2b9e9e9

Please sign in to comment.