Skip to content

Commit

Permalink
renamed 'cat sleep' to 'slow cat' and moved logic to constructor for …
Browse files Browse the repository at this point in the history
…fewer redundant calls

Signed-off-by: Julie Vogelman <[email protected]>
  • Loading branch information
juliev0 committed Jan 17, 2025
1 parent 09404c6 commit de00f03
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
matrix:
dockerfile_paths: [
"pkg/mapper/examples/even_odd", "pkg/mapper/examples/flatmap", "pkg/mapper/examples/forward_message",
"pkg/mapper/examples/retry", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream",
"pkg/mapper/examples/retry", "pkg/mapper/examples/slow_cat", "pkg/mapper/examples/tickgen", "pkg/mapstreamer/examples/flatmap_stream",
"pkg/reducer/examples/counter", "pkg/reducer/examples/sum", "pkg/reducestreamer/examples/counter",
"pkg/reducestreamer/examples/sum", "pkg/sessionreducer/examples/counter", "pkg/sessionreducer/examples/sum",
"pkg/sideinput/examples/simple_sideinput/udf", "pkg/sideinput/examples/simple_sideinput", "pkg/sinker/examples/log",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/cat-sleep-example-${TARGETARCH} /bin/cat-sleep-example
RUN chmod +x /bin/cat-sleep-example
COPY dist/slow-cat-example-${TARGETARCH} /bin/slow-cat-example
RUN chmod +x /bin/slow-cat-example

####################################################################################################
# cat-sleep
# slow-cat
####################################################################################################
FROM scratch AS cat-sleep
FROM scratch AS slow-cat
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/cat-sleep-example /bin/cat-sleep-example
ENTRYPOINT [ "/bin/cat-sleep-example" ]
COPY --from=base /bin/slow-cat-example /bin/slow-cat-example
ENTRYPOINT [ "/bin/slow-cat-example" ]
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/map-cat-sleep:${TAG}
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/map-slow-cat:${TAG}
ARCHITECTURES = amd64 arm64

.PHONY: build
build:
for arch in $(ARCHITECTURES); do \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/cat-sleep-example-$${arch} main.go; \
CGO_ENABLED=0 GOOS=linux GOARCH=$${arch} go build -v -o ./dist/slow-cat-example-$${arch} main.go; \
done

.PHONY: image-push
image-push: build
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target cat-sleep . --push
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target slow-cat . --push

.PHONY: image
image: build
docker build -t ${IMAGE_REGISTRY} --target cat-sleep .
docker build -t ${IMAGE_REGISTRY} --target slow-cat .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

clean:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
# Cat with Sleep
# Slow Cat

An example User Defined Function that outputs the same value as the input, but first does a sleep. This example is for the purpose of performing testing with a slow vertex.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,26 @@ import (

const DEFAULT_SLEEP_SECONDS = 10

type CatSleep struct {
type SlowCat struct {
sleepSeconds int
}

func (e *CatSleep) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages {
func (e *SlowCat) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages {
time.Sleep(time.Duration(e.sleepSeconds) * time.Second)
return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys))
}

func main() {
err := mapper.NewServer(&SlowCat{sleepSeconds: getSleepSeconds()}).Start(context.Background())
if err != nil {
log.Panic("Failed to start map function server: ", err)
}
}

func getSleepSeconds() int {

// sleep time is configured according to environment variable (or default if not configured)

// sleep for as long as the environment variable indicates (or default if not configured)
sleepSeconds := DEFAULT_SLEEP_SECONDS
secondsString := os.Getenv("SLEEP_SECONDS")
if secondsString == "" {
Expand All @@ -28,15 +42,8 @@ func (e *CatSleep) Map(ctx context.Context, keys []string, d mapper.Datum) mappe
log.Printf("SLEEP_SECONDS environment variable %q not an int, using default %d seconds\n", secondsString, DEFAULT_SLEEP_SECONDS)
} else {
sleepSeconds = val
log.Printf("Using SLEEP_SECONDS value %d\n", sleepSeconds)
}
}
time.Sleep(time.Duration(sleepSeconds) * time.Second)
return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys))
}

func main() {
err := mapper.NewServer(&CatSleep{}).Start(context.Background())
if err != nil {
log.Panic("Failed to start map function server: ", err)
}
return sleepSeconds
}

0 comments on commit de00f03

Please sign in to comment.