Skip to content

Commit

Permalink
update interface
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
Sidhant Kohli committed Jul 8, 2024
1 parent 3de6796 commit e238dfc
Show file tree
Hide file tree
Showing 14 changed files with 126 additions and 110 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
"pkg/sinker/examples/fallback", "pkg/sideinput/examples/map_sideinput", "pkg/sideinput/examples/reduce_sideinput",
"pkg/sideinput/examples/sideinput_function", "pkg/sideinput/examples/simple_source_with_sideinput",
"pkg/sideinput/examples/sink_sideinput", "pkg/sinker/examples/redis-sink", "pkg/sideinput/examples/map_sideinput/udf",
"pkg/sideinput/examples/reduce_sideinput/udf"
"pkg/sideinput/examples/reduce_sideinput/udf", "pkg/batchmapper/examples/batchmap-flatmap"
]

steps:
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/proto/batchmap/v1/batchmap.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/apis/proto/batchmap/v1/batchmap.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ service BatchMap {
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);

// BatchMapFn is a bi-directional streaming rpc which applies a
// Map function on each BatchMapRequest element of the stream and then returns streams
// back MapResponse elements.
// map function on each BatchMapRequest element of the stream and then streams
// back BatchMapResponse elements.
rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse);
}

/**
* MapRequest represents a request element.
* BatchMapRequest represents a request element.
*/
message BatchMapRequest {
repeated string keys = 1;
Expand All @@ -31,7 +31,7 @@ message BatchMapRequest {
}

/**
* MapResponse represents a response element.
* BatchMapResponse represents a response element.
*/
message BatchMapResponse {
message Result {
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/batchmapper/examples/batchmap-flatmap/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ RUN apk update && apk upgrade && \
apk add ca-certificates && \
apk --no-cache add tzdata

COPY dist/flatmap-example /bin/flatmap-example
RUN chmod +x /bin/flatmap-example
COPY dist/batchmap-flatmap /bin/batchmap-flatmap
RUN chmod +x /bin/batchmap-flatmap

####################################################################################################
# flatmap
Expand All @@ -16,5 +16,5 @@ FROM scratch as flatmap
ARG ARCH
COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=base /bin/flatmap-example /bin/flatmap-example
ENTRYPOINT [ "/bin/flatmap-example" ]
COPY --from=base /bin/batchmap-flatmap /bin/batchmap-flatmap
ENTRYPOINT [ "/bin/batchmap-flatmap" ]
9 changes: 5 additions & 4 deletions pkg/batchmapper/examples/batchmap-flatmap/Makefile
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
TAG ?= stable
PUSH ?= false
IMAGE_REGISTRY = quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}

.PHONY: build
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/batchmap-flatmap main.go

.PHONY: image-push
image-push: build
docker buildx build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push
docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target flatmap . --push

.PHONY: image
image: build
docker build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --target flatmap .
@if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}"; fi
docker build -t ${IMAGE_REGISTRY} --target flatmap .
@if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi

clean:
-rm -rf ./dist
10 changes: 9 additions & 1 deletion pkg/batchmapper/examples/batchmap-flatmap/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# Map Batch Flatmap
# Batch Map Flatmap

An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function.


Some considerations for batch map are as follows

- The user will have to ensure that the BatchResponse is tagged with the correct request ID as this will be used by Numaflow for populating information required for system correctness like MessageID for the ISB deduplication.


- The user will have to ensure that all the length of the BatchResponses is equal to the number of requests received. This means that for **each request** there is a BatchResponse.
4 changes: 3 additions & 1 deletion pkg/batchmapper/examples/batchmap-flatmap/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module flatmap
module batchmap-flatmap

go 1.22

Expand All @@ -8,7 +8,9 @@ require github.com/numaproj/numaflow-go v0.8.0

require (
github.com/golang/protobuf v1.5.3 // indirect
go.uber.org/atomic v1.11.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/batchmapper/examples/batchmap-flatmap/go.sum
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
Expand All @@ -22,3 +30,4 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
4 changes: 2 additions & 2 deletions pkg/batchmapper/examples/batchmap-flatmap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
"github.com/numaproj/numaflow-go/pkg/batchmapper"
)

func batchMapFn(_ context.Context, datums []batchmapper.Datum) batchmapper.BatchResponses {
func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper.BatchResponses {
batchResponses := batchmapper.BatchResponsesBuilder()
for _, d := range datums {
for d := range datums {
msg := d.Value()
_ = d.EventTime() // Event time is available
_ = d.Watermark() // Watermark is available
Expand Down
8 changes: 4 additions & 4 deletions pkg/batchmapper/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ type Datum interface {
// of messages, and they return the consolidated response for all of them together.
type BatchMapper interface {
// BatchMap is the function which processes a list of input messages
BatchMap(ctx context.Context, datums []Datum) BatchResponses
BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses
}

// BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper.
type BatchMapperFunc func(ctx context.Context, datums []Datum) BatchResponses
type BatchMapperFunc func(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses

// BatchMap implements the functionality of BatchMap function.
func (mf BatchMapperFunc) BatchMap(ctx context.Context, datums []Datum) BatchResponses {
return mf(ctx, datums)
func (mf BatchMapperFunc) BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses {
return mf(ctx, datumStreamCh)
}
4 changes: 2 additions & 2 deletions pkg/batchmapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ func TestBatchMapServer_Start(t *testing.T) {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses {
var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses {
batchResponses := BatchResponsesBuilder()
for _, d := range datums {
for d := range datums {
results := NewBatchResponse(d.Id())
results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"}))
batchResponses.Append(results)
Expand Down
106 changes: 71 additions & 35 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ package batchmapper

import (
"context"
"fmt"
"io"
"log"

"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
Expand Down Expand Up @@ -32,58 +37,89 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*batchmappb.ReadyRe
// BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them.
func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error {
ctx := stream.Context()
var g errgroup.Group

// totalRequests is a counter for keeping a track of the number of datum requests
// that were received on the stream. We use an atomic int as this needs to be synchronized
// between the request/response go routines.
totalRequests := atomic.NewInt32(0)

// datumStreamCh is used to stream messages to the user code interface
// As the BatchMap interface expects a list of request elements
// we read all the requests coming on the stream and keep appending them together
// and then finally send the array for processing once all the messages on the stream have been read.
datums := make([]Datum, 0)
// we read all the requests coming on the stream and keep streaming them to the user code on this channel.
datumStreamCh := make(chan Datum)

// go routine to invoke the user handler function, and process the responses.
g.Go(func() error {
// Apply the user BatchMap implementation function
responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh)

// If the number of responses received does not align with the request batch size,
// we will not be able to process the data correctly.
// This should be marked as an error and the container is restarted.
// As this is a user error, we restart the container to mitigate any transient error otherwise, this
// crash should indicate to the user that there is some issue.
if len(responses.Items()) != int(totalRequests.Load()) {
errMsg := fmt.Sprintf("batchMapFn: mismatch between length of batch requests and responses, "+
"expected:%d, got:%d", int(totalRequests.Load()), len(responses.Items()))
log.Panic(errMsg)
}

// iterate over the responses received and covert to the required proto format
for _, batchResp := range responses.Items() {
var elements []*batchmappb.BatchMapResponse_Result
for _, resp := range batchResp.Items() {
elements = append(elements, &batchmappb.BatchMapResponse_Result{
Keys: resp.Keys(),
Value: resp.Value(),
Tags: resp.Tags(),
})
}
singleRequestResp := &batchmappb.BatchMapResponse{
Results: elements,
Id: batchResp.Id(),
}
// We stream back the result for a single request ID
// this would contain all the responses for that request.
err := stream.Send(singleRequestResp)
if err != nil {
log.Println("BatchMapFn: Got an error while Send() on stream", err)
return err
}
}
return nil
})

// loop to keep reading messages from the stream and sending it to the datumStreamCh
for {
d, err := stream.Recv()
// if we see EOF on the stream we do not have any more messages coming up
if err == io.EOF {
// close the input data channel to indicate that no more messages expected
close(datumStreamCh)
break
}
if err != nil {
// close the input data channel to indicate that no more messages expected
close(datumStreamCh)
log.Println("BatchMapFn: Got an error while recv() on stream", err)
return err
}
var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys())
datums = append(datums, hd)
// send the datum to the input channel
datumStreamCh <- hd
// Increase the counter for number of requests received
totalRequests.Inc()
}

// Apply the user BatchMap implementation function
responses := fs.BatchMapper.BatchMap(ctx, datums)

// If the number of responses received does not align with the request batch size,
// we will not be able to process the data correctly.
// This should be marked as an error and shown to the user.
if len(responses.Items()) != len(datums) {
errMsg := "batchMapFn: mismatch between length of batch requests and responses"
log.Panic(errMsg)
// wait for all the responses to be processed
err := g.Wait()
// if there was any error during processing return the error
if err != nil {
statusErr := status.Errorf(codes.Internal, err.Error())
return statusErr
}

// iterate over the responses received and covert to the required proto format
for _, batchResp := range responses.Items() {
var elements []*batchmappb.BatchMapResponse_Result
for _, resp := range batchResp.Items() {
elements = append(elements, &batchmappb.BatchMapResponse_Result{
Keys: resp.Keys(),
Value: resp.Value(),
Tags: resp.Tags(),
})
}
singleRequestResp := &batchmappb.BatchMapResponse{
Results: elements,
Id: batchResp.Id(),
}
// We stream back the result for a single request ID
// this would contain all the responses for that request.
err := stream.Send(singleRequestResp)
if err != nil {
log.Println("BatchMapFn: Got an error while Send() on stream", err)
return err
}
}
// Once all responses are sent we can return, this would indicate the end of the rpc and
// send an EOF to the client on the stream
return nil
Expand Down
Loading

0 comments on commit e238dfc

Please sign in to comment.