diff --git a/.github/workflows/build-push.yaml b/.github/workflows/build-push.yaml index 29a66275..4fb742c8 100644 --- a/.github/workflows/build-push.yaml +++ b/.github/workflows/build-push.yaml @@ -17,7 +17,7 @@ jobs: matrix: dockerfile_paths: [ "pkg/mapper/examples/even_odd", "pkg/mapper/examples/flatmap", "pkg/mapper/examples/forward_message", - "pkg/mapper/examples/retry", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream", + "pkg/mapper/examples/retry", "pkg/mapper/examples/slow_cat", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream", "pkg/reducer/examples/counter", "pkg/reducer/examples/sum", "pkg/reducestreamer/examples/counter", "pkg/reducestreamer/examples/sum", "pkg/sessionreducer/examples/counter", "pkg/sessionreducer/examples/sum", "pkg/sideinput/examples/simple_sideinput/udf", "pkg/sideinput/examples/simple_sideinput", "pkg/sinker/examples/log", diff --git a/pkg/mapper/examples/cat_sleep/Dockerfile b/pkg/mapper/examples/slow_cat/Dockerfile similarity index 74% rename from pkg/mapper/examples/cat_sleep/Dockerfile rename to pkg/mapper/examples/slow_cat/Dockerfile index c5acbc15..53c96093 100644 --- a/pkg/mapper/examples/cat_sleep/Dockerfile +++ b/pkg/mapper/examples/slow_cat/Dockerfile @@ -7,14 +7,14 @@ RUN apk update && apk upgrade && \ apk add ca-certificates && \ apk --no-cache add tzdata -COPY dist/cat-sleep-example-${TARGETARCH} /bin/cat-sleep-example -RUN chmod +x /bin/cat-sleep-example +COPY dist/slow-cat-example-${TARGETARCH} /bin/slow-cat-example +RUN chmod +x /bin/slow-cat-example #################################################################################################### -# cat-sleep +# slow-cat #################################################################################################### -FROM scratch AS cat-sleep +FROM scratch AS slow-cat COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt -COPY --from=base /bin/cat-sleep-example /bin/cat-sleep-example -ENTRYPOINT [ "/bin/cat-sleep-example" ] +COPY --from=base /bin/slow-cat-example /bin/slow-cat-example +ENTRYPOINT [ "/bin/slow-cat-example" ] diff --git a/pkg/mapper/examples/cat_sleep/Makefile b/pkg/mapper/examples/slow_cat/Makefile similarity index 66% rename from pkg/mapper/examples/cat_sleep/Makefile rename to pkg/mapper/examples/slow_cat/Makefile index 4eb892be..b2359a90 100644 --- a/pkg/mapper/examples/cat_sleep/Makefile +++ b/pkg/mapper/examples/slow_cat/Makefile @@ -1,21 +1,21 @@ TAG ?= stable PUSH ?= false -IMAGE_REGISTRY = quay.io/numaio/numaflow-go/map-cat-sleep:${TAG} +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/map-slow-cat:${TAG} ARCHITECTURES = amd64 arm64 .PHONY: build build: for arch in $(ARCHITECTURES); do \ - CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/cat-sleep-example-$${arch} main.go; \ + CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/slow-cat-example-$${arch} main.go; \ done .PHONY: image-push image-push: build - docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target cat-sleep . --push + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target slow-cat . --push .PHONY: image image: build - docker build -t ${IMAGE_REGISTRY} --target cat-sleep . + docker build -t ${IMAGE_REGISTRY} --target slow-cat . @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi clean: diff --git a/pkg/mapper/examples/cat_sleep/README.md b/pkg/mapper/examples/slow_cat/README.md similarity index 91% rename from pkg/mapper/examples/cat_sleep/README.md rename to pkg/mapper/examples/slow_cat/README.md index b5597f6c..36ca63cb 100644 --- a/pkg/mapper/examples/cat_sleep/README.md +++ b/pkg/mapper/examples/slow_cat/README.md @@ -1,3 +1,3 @@ -# Cat with Sleep +# Slow Cat An example User Defined Function that outputs the same value as the input, but first does a sleep. This example is for the purpose of performing testing with a slow vertex. diff --git a/pkg/mapper/examples/cat_sleep/go.mod b/pkg/mapper/examples/slow_cat/go.mod similarity index 100% rename from pkg/mapper/examples/cat_sleep/go.mod rename to pkg/mapper/examples/slow_cat/go.mod diff --git a/pkg/mapper/examples/cat_sleep/go.sum b/pkg/mapper/examples/slow_cat/go.sum similarity index 100% rename from pkg/mapper/examples/cat_sleep/go.sum rename to pkg/mapper/examples/slow_cat/go.sum diff --git a/pkg/mapper/examples/cat_sleep/main.go b/pkg/mapper/examples/slow_cat/main.go similarity index 61% rename from pkg/mapper/examples/cat_sleep/main.go rename to pkg/mapper/examples/slow_cat/main.go index b7ab86fa..69bca4b2 100644 --- a/pkg/mapper/examples/cat_sleep/main.go +++ b/pkg/mapper/examples/slow_cat/main.go @@ -12,12 +12,26 @@ import ( const DEFAULT_SLEEP_SECONDS = 10 -type CatSleep struct { +type SlowCat struct { + sleepSeconds int } -func (e *CatSleep) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages { +func (e *SlowCat) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages { + time.Sleep(time.Duration(e.sleepSeconds) * time.Second) + return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys)) +} + +func main() { + err := mapper.NewServer(&SlowCat{sleepSeconds: getSleepSeconds()}).Start(context.Background()) + if err != nil { + log.Panic("Failed to start map function server: ", err) + } +} + +func getSleepSeconds() int { + + // sleep time is configured according to environment variable (or default if not configured) - // sleep for as long as the environment variable indicates (or default if not configured) sleepSeconds := DEFAULT_SLEEP_SECONDS secondsString := os.Getenv("SLEEP_SECONDS") if secondsString == "" { @@ -28,15 +42,8 @@ func (e *CatSleep) Map(ctx context.Context, keys []string, d mapper.Datum) mappe log.Printf("SLEEP_SECONDS environment variable %q not an int, using default %d seconds\n", secondsString, DEFAULT_SLEEP_SECONDS) } else { sleepSeconds = val + log.Printf("Using SLEEP_SECONDS value %d\n", sleepSeconds) } } - time.Sleep(time.Duration(sleepSeconds) * time.Second) - return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys)) -} - -func main() { - err := mapper.NewServer(&CatSleep{}).Start(context.Background()) - if err != nil { - log.Panic("Failed to start map function server: ", err) - } + return sleepSeconds }