Skip to content

Commit

Permalink
update example
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid committed Jul 1, 2024
1 parent 503ff5a commit 61c4500
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 28 deletions.
File renamed without changes.
18 changes: 18 additions & 0 deletions pkg/mapper/examples/batchmap-flatmap/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
TAG ?= stable
PUSH ?= false

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --target flatmap .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}"; fi

clean:
-rm -rf ./dist
3 changes: 3 additions & 0 deletions pkg/mapper/examples/batchmap-flatmap/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Map Batch Flatmap

An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,22 @@ package main
import (
"context"
"log"
"strings"

"github.com/numaproj/numaflow-go/pkg/mapper"
)

func mapFn(_ context.Context, datums []mapper.Datum) mapper.BatchResponses {
batchResponses := mapper.BatchResponsesBuilder()
log.Println("MYDEBUG: length of input ", len(datums))
for _, d := range datums {
//msg := d.Value()
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
results := mapper.NewBatchResponse(d.Id())
//for i := 0; i < 2; i++ {
// results = results.Append(mapper.NewMessage(msg))
//}
strs := strings.Split(string(msg), ",")
for _, s := range strs {
results = results.Append(mapper.NewMessage([]byte(s)))
}
batchResponses = batchResponses.Append(results)
}
return batchResponses
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
source:
# A self data generating source
generator:
rpu: 1000
rpu: 2000
duration: 1s
- name: batchmap
scale:
Expand All @@ -31,7 +31,7 @@ spec:
udf:
container:
# image: "quay.io/numaio/numaflow-go/map-flatmap:stable"
image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchemptyv1"
image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchflatmapv1"
imagePullPolicy: Always
containerTemplate:
resources:
Expand Down
18 changes: 0 additions & 18 deletions pkg/mapper/examples/batchmap/Makefile

This file was deleted.

3 changes: 0 additions & 3 deletions pkg/mapper/examples/batchmap/README.md

This file was deleted.

0 comments on commit 61c4500

Please sign in to comment.