From 59496b64518d8cc7d3885173b79b0c33797e38b4 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Mon, 8 Jul 2024 15:47:10 -0700 Subject: [PATCH] update interface Signed-off-by: Sidhant Kohli --- .github/workflows/build-push.yaml | 2 +- pkg/apis/proto/batchmap/v1/batchmap.pb.go | 4 +- pkg/apis/proto/batchmap/v1/batchmap.proto | 8 +- .../proto/batchmap/v1/batchmap_grpc.pb.go | 8 +- .../examples/batchmap-flatmap/Dockerfile | 8 +- .../examples/batchmap-flatmap/Makefile | 9 +- .../examples/batchmap-flatmap/README.md | 10 +- .../examples/batchmap-flatmap/go.mod | 4 +- .../examples/batchmap-flatmap/go.sum | 9 ++ .../examples/batchmap-flatmap/main.go | 4 +- pkg/batchmapper/interface.go | 8 +- pkg/batchmapper/server_test.go | 4 +- pkg/batchmapper/service.go | 106 ++++++++++++------ pkg/batchmapper/service_test.go | 52 +-------- 14 files changed, 126 insertions(+), 110 deletions(-) diff --git a/.github/workflows/build-push.yaml b/.github/workflows/build-push.yaml index 4896f619..8b27a413 100644 --- a/.github/workflows/build-push.yaml +++ b/.github/workflows/build-push.yaml @@ -25,7 +25,7 @@ jobs: "pkg/sinker/examples/fallback", "pkg/sideinput/examples/map_sideinput", "pkg/sideinput/examples/reduce_sideinput", "pkg/sideinput/examples/sideinput_function", "pkg/sideinput/examples/simple_source_with_sideinput", "pkg/sideinput/examples/sink_sideinput", "pkg/sinker/examples/redis-sink", "pkg/sideinput/examples/map_sideinput/udf", - "pkg/sideinput/examples/reduce_sideinput/udf" + "pkg/sideinput/examples/reduce_sideinput/udf", "pkg/batchmapper/examples/batchmap-flatmap" ] steps: diff --git a/pkg/apis/proto/batchmap/v1/batchmap.pb.go b/pkg/apis/proto/batchmap/v1/batchmap.pb.go index 39f4c430..1d34241c 100644 --- a/pkg/apis/proto/batchmap/v1/batchmap.pb.go +++ b/pkg/apis/proto/batchmap/v1/batchmap.pb.go @@ -23,7 +23,7 @@ const ( ) // * -// MapRequest represents a request element. +// BatchMapRequest represents a request element. type BatchMapRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -113,7 +113,7 @@ func (x *BatchMapRequest) GetId() string { } // * -// MapResponse represents a response element. +// BatchMapResponse represents a response element. type BatchMapResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache diff --git a/pkg/apis/proto/batchmap/v1/batchmap.proto b/pkg/apis/proto/batchmap/v1/batchmap.proto index 94a0a704..f7f4d10e 100644 --- a/pkg/apis/proto/batchmap/v1/batchmap.proto +++ b/pkg/apis/proto/batchmap/v1/batchmap.proto @@ -12,13 +12,13 @@ service BatchMap { rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); // BatchMapFn is a bi-directional streaming rpc which applies a - // Map function on each BatchMapRequest element of the stream and then returns streams - // back MapResponse elements. + // map function on each BatchMapRequest element of the stream and then streams + // back BatchMapResponse elements. rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse); } /** - * MapRequest represents a request element. + * BatchMapRequest represents a request element. */ message BatchMapRequest { repeated string keys = 1; @@ -31,7 +31,7 @@ message BatchMapRequest { } /** - * MapResponse represents a response element. + * BatchMapResponse represents a response element. */ message BatchMapResponse { message Result { diff --git a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go index c66551f8..11abd087 100644 --- a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go +++ b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go @@ -26,8 +26,8 @@ type BatchMapClient interface { // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) // BatchMapFn is a bi-directional streaming rpc which applies a - // Map function on each BatchMapRequest element of the stream and then returns streams - // back MapResponse elements. + // map function on each BatchMapRequest element of the stream and then streams + // back BatchMapResponse elements. BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) } @@ -86,8 +86,8 @@ type BatchMapServer interface { // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) // BatchMapFn is a bi-directional streaming rpc which applies a - // Map function on each BatchMapRequest element of the stream and then returns streams - // back MapResponse elements. + // map function on each BatchMapRequest element of the stream and then streams + // back BatchMapResponse elements. BatchMapFn(BatchMap_BatchMapFnServer) error mustEmbedUnimplementedBatchMapServer() } diff --git a/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile b/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile index c3b6cc45..98468775 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile +++ b/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile @@ -6,8 +6,8 @@ RUN apk update && apk upgrade && \ apk add ca-certificates && \ apk --no-cache add tzdata -COPY dist/flatmap-example /bin/flatmap-example -RUN chmod +x /bin/flatmap-example +COPY dist/batchmap-flatmap /bin/batchmap-flatmap +RUN chmod +x /bin/batchmap-flatmap #################################################################################################### # flatmap @@ -16,5 +16,5 @@ FROM scratch as flatmap ARG ARCH COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt -COPY --from=base /bin/flatmap-example /bin/flatmap-example -ENTRYPOINT [ "/bin/flatmap-example" ] +COPY --from=base /bin/batchmap-flatmap /bin/batchmap-flatmap +ENTRYPOINT [ "/bin/batchmap-flatmap" ] diff --git a/pkg/batchmapper/examples/batchmap-flatmap/Makefile b/pkg/batchmapper/examples/batchmap-flatmap/Makefile index 81328294..0984106d 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/Makefile +++ b/pkg/batchmapper/examples/batchmap-flatmap/Makefile @@ -1,18 +1,19 @@ TAG ?= stable PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG} .PHONY: build build: - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/batchmap-flatmap main.go .PHONY: image-push image-push: build - docker buildx build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target flatmap . --push .PHONY: image image: build - docker build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --target flatmap . - @if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}"; fi + docker build -t ${IMAGE_REGISTRY} --target flatmap . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi clean: -rm -rf ./dist diff --git a/pkg/batchmapper/examples/batchmap-flatmap/README.md b/pkg/batchmapper/examples/batchmap-flatmap/README.md index 0a639de2..093e9030 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/README.md +++ b/pkg/batchmapper/examples/batchmap-flatmap/README.md @@ -1,3 +1,11 @@ -# Map Batch Flatmap +# Batch Map Flatmap An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function. + + +Some considerations for batch map are as follows + +- The user will have to ensure that the BatchResponse is tagged with the correct request ID as this will be used by Numaflow for populating information required for system correctness like MessageID for the ISB deduplication. + + +- The user will have to ensure that all the length of the BatchResponses is equal to the number of requests received. This means that for **each request** there is a BatchResponse. \ No newline at end of file diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.mod b/pkg/batchmapper/examples/batchmap-flatmap/go.mod index 0ff2c778..cc6b876d 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/go.mod +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.mod @@ -1,4 +1,4 @@ -module flatmap +module batchmap-flatmap go 1.22 @@ -8,7 +8,9 @@ require github.com/numaproj/numaflow-go v0.8.0 require ( github.com/golang/protobuf v1.5.3 // indirect + go.uber.org/atomic v1.11.0 // indirect golang.org/x/net v0.9.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.sum b/pkg/batchmapper/examples/batchmap-flatmap/go.sum index 95c8479a..e884f1a5 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/go.sum +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.sum @@ -1,13 +1,21 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= @@ -22,3 +30,4 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/batchmapper/examples/batchmap-flatmap/main.go b/pkg/batchmapper/examples/batchmap-flatmap/main.go index 5caad62f..1a584b9a 100644 --- a/pkg/batchmapper/examples/batchmap-flatmap/main.go +++ b/pkg/batchmapper/examples/batchmap-flatmap/main.go @@ -8,9 +8,9 @@ import ( "github.com/numaproj/numaflow-go/pkg/batchmapper" ) -func batchMapFn(_ context.Context, datums []batchmapper.Datum) batchmapper.BatchResponses { +func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper.BatchResponses { batchResponses := batchmapper.BatchResponsesBuilder() - for _, d := range datums { + for d := range datums { msg := d.Value() _ = d.EventTime() // Event time is available _ = d.Watermark() // Watermark is available diff --git a/pkg/batchmapper/interface.go b/pkg/batchmapper/interface.go index 68af2b7f..a667f232 100644 --- a/pkg/batchmapper/interface.go +++ b/pkg/batchmapper/interface.go @@ -25,13 +25,13 @@ type Datum interface { // of messages, and they return the consolidated response for all of them together. type BatchMapper interface { // BatchMap is the function which processes a list of input messages - BatchMap(ctx context.Context, datums []Datum) BatchResponses + BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses } // BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper. -type BatchMapperFunc func(ctx context.Context, datums []Datum) BatchResponses +type BatchMapperFunc func(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses // BatchMap implements the functionality of BatchMap function. -func (mf BatchMapperFunc) BatchMap(ctx context.Context, datums []Datum) BatchResponses { - return mf(ctx, datums) +func (mf BatchMapperFunc) BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses { + return mf(ctx, datumStreamCh) } diff --git a/pkg/batchmapper/server_test.go b/pkg/batchmapper/server_test.go index fb907079..e51ec42c 100644 --- a/pkg/batchmapper/server_test.go +++ b/pkg/batchmapper/server_test.go @@ -20,9 +20,9 @@ func TestBatchMapServer_Start(t *testing.T) { _ = os.RemoveAll(serverInfoFile.Name()) }() - var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { batchResponses := BatchResponsesBuilder() - for _, d := range datums { + for d := range datums { results := NewBatchResponse(d.Id()) results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) batchResponses.Append(results) diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 685f3c04..a8748b18 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -2,9 +2,14 @@ package batchmapper import ( "context" + "fmt" "io" "log" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" @@ -32,58 +37,89 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*batchmappb.ReadyRe // BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them. func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error { ctx := stream.Context() + var g errgroup.Group + // totalRequests is a counter for keeping a track of the number of datum requests + // that were received on the stream. We use an atomic int as this needs to be synchronized + // between the request/response go routines. + totalRequests := atomic.NewInt32(0) + + // datumStreamCh is used to stream messages to the user code interface // As the BatchMap interface expects a list of request elements - // we read all the requests coming on the stream and keep appending them together - // and then finally send the array for processing once all the messages on the stream have been read. - datums := make([]Datum, 0) + // we read all the requests coming on the stream and keep streaming them to the user code on this channel. + datumStreamCh := make(chan Datum) + + // go routine to invoke the user handler function, and process the responses. + g.Go(func() error { + // Apply the user BatchMap implementation function + responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh) + + // If the number of responses received does not align with the request batch size, + // we will not be able to process the data correctly. + // This should be marked as an error and the container is restarted. + // As this is a user error, we restart the container to mitigate any transient error otherwise, this + // crash should indicate to the user that there is some issue. + if len(responses.Items()) != int(totalRequests.Load()) { + errMsg := fmt.Sprintf("batchMapFn: mismatch between length of batch requests and responses, "+ + "expected:%d, got:%d", int(totalRequests.Load()), len(responses.Items())) + log.Panic(errMsg) + } + + // iterate over the responses received and covert to the required proto format + for _, batchResp := range responses.Items() { + var elements []*batchmappb.BatchMapResponse_Result + for _, resp := range batchResp.Items() { + elements = append(elements, &batchmappb.BatchMapResponse_Result{ + Keys: resp.Keys(), + Value: resp.Value(), + Tags: resp.Tags(), + }) + } + singleRequestResp := &batchmappb.BatchMapResponse{ + Results: elements, + Id: batchResp.Id(), + } + // We stream back the result for a single request ID + // this would contain all the responses for that request. + err := stream.Send(singleRequestResp) + if err != nil { + log.Println("BatchMapFn: Got an error while Send() on stream", err) + return err + } + } + return nil + }) + + // loop to keep reading messages from the stream and sending it to the datumStreamCh for { d, err := stream.Recv() // if we see EOF on the stream we do not have any more messages coming up if err == io.EOF { + // close the input data channel to indicate that no more messages expected + close(datumStreamCh) break } if err != nil { + // close the input data channel to indicate that no more messages expected + close(datumStreamCh) log.Println("BatchMapFn: Got an error while recv() on stream", err) return err } var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys()) - datums = append(datums, hd) + // send the datum to the input channel + datumStreamCh <- hd + // Increase the counter for number of requests received + totalRequests.Inc() } - // Apply the user BatchMap implementation function - responses := fs.BatchMapper.BatchMap(ctx, datums) - - // If the number of responses received does not align with the request batch size, - // we will not be able to process the data correctly. - // This should be marked as an error and shown to the user. - if len(responses.Items()) != len(datums) { - errMsg := "batchMapFn: mismatch between length of batch requests and responses" - log.Panic(errMsg) + // wait for all the responses to be processed + err := g.Wait() + // if there was any error during processing return the error + if err != nil { + statusErr := status.Errorf(codes.Internal, err.Error()) + return statusErr } - // iterate over the responses received and covert to the required proto format - for _, batchResp := range responses.Items() { - var elements []*batchmappb.BatchMapResponse_Result - for _, resp := range batchResp.Items() { - elements = append(elements, &batchmappb.BatchMapResponse_Result{ - Keys: resp.Keys(), - Value: resp.Value(), - Tags: resp.Tags(), - }) - } - singleRequestResp := &batchmappb.BatchMapResponse{ - Results: elements, - Id: batchResp.Id(), - } - // We stream back the result for a single request ID - // this would contain all the responses for that request. - err := stream.Send(singleRequestResp) - if err != nil { - log.Println("BatchMapFn: Got an error while Send() on stream", err) - return err - } - } // Once all responses are sent we can return, this would indicate the end of the rpc and // send an EOF to the client on the stream return nil diff --git a/pkg/batchmapper/service_test.go b/pkg/batchmapper/service_test.go index 55034a5b..b2cdc154 100644 --- a/pkg/batchmapper/service_test.go +++ b/pkg/batchmapper/service_test.go @@ -99,9 +99,9 @@ func TestService_MapFnStream(t *testing.T) { }{ { name: "batch_map_stream_fn_forward_msg", - handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { batchResponses := BatchResponsesBuilder() - for _, d := range datums { + for d := range datums { results := NewBatchResponse(d.Id()) results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) batchResponses = batchResponses.Append(results) @@ -143,52 +143,11 @@ func TestService_MapFnStream(t *testing.T) { }, expectedErr: false, }, - { - name: "batch_map_mismatch_output_len", - handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { - batchResponses := BatchResponsesBuilder() - return batchResponses - }), - input: []*batchmappb.BatchMapRequest{{ - Keys: []string{"client"}, - Value: []byte(`test1`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Id: "test1", - }, { - Keys: []string{"client"}, - Value: []byte(`test2`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Id: "test2", - }}, - expected: []*batchmappb.BatchMapResponse{ - { - Results: []*batchmappb.BatchMapResponse_Result{ - { - Keys: []string{"client_test"}, - Value: []byte(`test1`), - }, - }, - Id: "test1", - }, - { - Results: []*batchmappb.BatchMapResponse_Result{ - { - Keys: []string{"client_test"}, - Value: []byte(`test2`), - }, - }, - Id: "test2", - }, - }, - expectedErr: true, - }, { name: "batch_map_stream_err", - handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { batchResponses := BatchResponsesBuilder() - for _, d := range datums { + for d := range datums { results := NewBatchResponse(d.Id()) results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) batchResponses = batchResponses.Append(results) @@ -240,7 +199,8 @@ func TestService_MapFnStream(t *testing.T) { // here's a trick for testing: // because we are not using gRPC, we directly set a new incoming ctx // instead of the regular outgoing context in the real gRPC connection. - ctx := context.Background() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() inputCh := make(chan *batchmappb.BatchMapRequest) outputCh := make(chan *batchmappb.BatchMapResponse) result := make([]*batchmappb.BatchMapResponse, 0)