diff --git a/pkg/mapper/examples/batchmap/Dockerfile b/pkg/mapper/examples/batchmap-flatmap/Dockerfile similarity index 100% rename from pkg/mapper/examples/batchmap/Dockerfile rename to pkg/mapper/examples/batchmap-flatmap/Dockerfile diff --git a/pkg/mapper/examples/batchmap-flatmap/Makefile b/pkg/mapper/examples/batchmap-flatmap/Makefile new file mode 100644 index 00000000..81328294 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/Makefile @@ -0,0 +1,18 @@ +TAG ?= stable +PUSH ?= false + +.PHONY: build +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go + +.PHONY: image-push +image-push: build + docker buildx build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push + +.PHONY: image +image: build + docker build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --target flatmap . + @if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}"; fi + +clean: + -rm -rf ./dist diff --git a/pkg/mapper/examples/batchmap-flatmap/README.md b/pkg/mapper/examples/batchmap-flatmap/README.md new file mode 100644 index 00000000..0a639de2 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/README.md @@ -0,0 +1,3 @@ +# Map Batch Flatmap + +An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function. diff --git a/pkg/mapper/examples/batchmap/go.mod b/pkg/mapper/examples/batchmap-flatmap/go.mod similarity index 100% rename from pkg/mapper/examples/batchmap/go.mod rename to pkg/mapper/examples/batchmap-flatmap/go.mod diff --git a/pkg/mapper/examples/batchmap/go.sum b/pkg/mapper/examples/batchmap-flatmap/go.sum similarity index 100% rename from pkg/mapper/examples/batchmap/go.sum rename to pkg/mapper/examples/batchmap-flatmap/go.sum diff --git a/pkg/mapper/examples/batchmap/main.go b/pkg/mapper/examples/batchmap-flatmap/main.go similarity index 79% rename from pkg/mapper/examples/batchmap/main.go rename to pkg/mapper/examples/batchmap-flatmap/main.go index 35098f3a..e2ea415a 100644 --- a/pkg/mapper/examples/batchmap/main.go +++ b/pkg/mapper/examples/batchmap-flatmap/main.go @@ -3,21 +3,22 @@ package main import ( "context" "log" + "strings" "github.com/numaproj/numaflow-go/pkg/mapper" ) func mapFn(_ context.Context, datums []mapper.Datum) mapper.BatchResponses { batchResponses := mapper.BatchResponsesBuilder() - log.Println("MYDEBUG: length of input ", len(datums)) for _, d := range datums { - //msg := d.Value() + msg := d.Value() _ = d.EventTime() // Event time is available _ = d.Watermark() // Watermark is available results := mapper.NewBatchResponse(d.Id()) - //for i := 0; i < 2; i++ { - // results = results.Append(mapper.NewMessage(msg)) - //} + strs := strings.Split(string(msg), ",") + for _, s := range strs { + results = results.Append(mapper.NewMessage([]byte(s))) + } batchResponses = batchResponses.Append(results) } return batchResponses diff --git a/pkg/mapper/examples/batchmap/pipe.yaml b/pkg/mapper/examples/batchmap-flatmap/pipe.yaml similarity index 97% rename from pkg/mapper/examples/batchmap/pipe.yaml rename to pkg/mapper/examples/batchmap-flatmap/pipe.yaml index 64133802..8d09df3f 100644 --- a/pkg/mapper/examples/batchmap/pipe.yaml +++ b/pkg/mapper/examples/batchmap-flatmap/pipe.yaml @@ -17,7 +17,7 @@ spec: source: # A self data generating source generator: - rpu: 1000 + rpu: 2000 duration: 1s - name: batchmap scale: @@ -31,7 +31,7 @@ spec: udf: container: # image: "quay.io/numaio/numaflow-go/map-flatmap:stable" - image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchemptyv1" + image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchflatmapv1" imagePullPolicy: Always containerTemplate: resources: diff --git a/pkg/mapper/examples/batchmap/Makefile b/pkg/mapper/examples/batchmap/Makefile deleted file mode 100644 index 6c3fd22c..00000000 --- a/pkg/mapper/examples/batchmap/Makefile +++ /dev/null @@ -1,18 +0,0 @@ -TAG ?= mapbatchemptyv1 -PUSH ?= true - -.PHONY: build -build: - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go - -.PHONY: image-push -image-push: build - docker buildx build -t "quay.io/kohlisid/numaflow-go/map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push - -.PHONY: image -image: build - docker build -t "quay.io/kohlisid/numaflow-go/map-flatmap:${TAG}" --target flatmap . - @if [ "$(PUSH)" = "true" ]; then docker push "quay.io/kohlisid/numaflow-go/map-flatmap:${TAG}"; fi - -clean: - -rm -rf ./dist diff --git a/pkg/mapper/examples/batchmap/README.md b/pkg/mapper/examples/batchmap/README.md deleted file mode 100644 index 54a9efab..00000000 --- a/pkg/mapper/examples/batchmap/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Flatmap - -An example User Defined Function that demonstrates how to write a `flatmap` User Defined Function.