diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index 5fcd7916..3862583e 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -111,8 +111,11 @@ func (x *MapRequest) GetId() string { return "" } -// * -// MapResponse represents a response element. +// TODO(map-batch) - currently this is used by both batch map and unary map. +// Do we want to have a separate response struct for batch map responses +// which have only one element instead of a list of responses. +// In that case we need a different mechanism to indicate that all the responses for a given request +// have been completed. type MapResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -321,22 +324,22 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, - 0x79, 0x32, 0xac, 0x01, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x30, 0x0a, 0x05, 0x4d, 0x61, 0x70, + 0x79, 0x32, 0xad, 0x01, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x30, 0x0a, 0x05, 0x4d, 0x61, 0x70, 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, - 0x70, 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, - 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, - 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, - 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, - 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, + 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, + 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, + 0x01, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, + 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -368,10 +371,10 @@ var file_pkg_apis_proto_map_v1_map_proto_depIdxs = []int32{ 4, // 3: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result 0, // 4: map.v1.Map.MapFn:input_type -> map.v1.MapRequest 6, // 5: map.v1.Map.IsReady:input_type -> google.protobuf.Empty - 0, // 6: map.v1.Map.BatchMapFn:input_type -> map.v1.MapRequest + 0, // 6: map.v1.Map.MapStreamFn:input_type -> map.v1.MapRequest 1, // 7: map.v1.Map.MapFn:output_type -> map.v1.MapResponse 2, // 8: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse - 1, // 9: map.v1.Map.BatchMapFn:output_type -> map.v1.MapResponse + 1, // 9: map.v1.Map.MapStreamFn:output_type -> map.v1.MapResponse 7, // [7:10] is the sub-list for method output_type 4, // [4:7] is the sub-list for method input_type 4, // [4:4] is the sub-list for extension type_name diff --git a/pkg/apis/proto/map/v1/map.proto b/pkg/apis/proto/map/v1/map.proto index a6cccd8f..a38833ce 100644 --- a/pkg/apis/proto/map/v1/map.proto +++ b/pkg/apis/proto/map/v1/map.proto @@ -14,13 +14,13 @@ service Map { // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); - // BatchMapFn is a bi-directional streaming rpc which applies a - // batchMap function on each element of the stream and then returns streams + // MapStreamFn is a bi-directional streaming rpc which applies a + // Map function on each element of the stream and then returns streams // back MapResponse elements. // TODO(map-batch): in the target state when we move the current // unary implementation to bi-di as well, we can rename this and // use a single rpc for both. - rpc BatchMapFn(stream MapRequest) returns (stream MapResponse); + rpc MapStreamFn(stream MapRequest) returns (stream MapResponse); } /** diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index a9d37150..9f42fd43 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -27,13 +27,13 @@ type MapClient interface { MapFn(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) - // BatchMapFn is a bi-directional streaming rpc which applies a - // batchMap function on each element of the stream and then returns streams + // MapStreamFn is a bi-directional streaming rpc which applies a + // Map function on each element of the stream and then returns streams // back MapResponse elements. // TODO(map-batch): in the target state when we move the current // unary implementation to bi-di as well, we can rename this and // use a single rpc for both. - BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (Map_BatchMapFnClient, error) + MapStreamFn(ctx context.Context, opts ...grpc.CallOption) (Map_MapStreamFnClient, error) } type mapClient struct { @@ -62,30 +62,30 @@ func (c *mapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc return out, nil } -func (c *mapClient) BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (Map_BatchMapFnClient, error) { - stream, err := c.cc.NewStream(ctx, &Map_ServiceDesc.Streams[0], "/map.v1.Map/BatchMapFn", opts...) +func (c *mapClient) MapStreamFn(ctx context.Context, opts ...grpc.CallOption) (Map_MapStreamFnClient, error) { + stream, err := c.cc.NewStream(ctx, &Map_ServiceDesc.Streams[0], "/map.v1.Map/MapStreamFn", opts...) if err != nil { return nil, err } - x := &mapBatchMapFnClient{stream} + x := &mapMapStreamFnClient{stream} return x, nil } -type Map_BatchMapFnClient interface { +type Map_MapStreamFnClient interface { Send(*MapRequest) error Recv() (*MapResponse, error) grpc.ClientStream } -type mapBatchMapFnClient struct { +type mapMapStreamFnClient struct { grpc.ClientStream } -func (x *mapBatchMapFnClient) Send(m *MapRequest) error { +func (x *mapMapStreamFnClient) Send(m *MapRequest) error { return x.ClientStream.SendMsg(m) } -func (x *mapBatchMapFnClient) Recv() (*MapResponse, error) { +func (x *mapMapStreamFnClient) Recv() (*MapResponse, error) { m := new(MapResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -101,13 +101,13 @@ type MapServer interface { MapFn(context.Context, *MapRequest) (*MapResponse, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) - // BatchMapFn is a bi-directional streaming rpc which applies a - // batchMap function on each element of the stream and then returns streams + // MapStreamFn is a bi-directional streaming rpc which applies a + // Map function on each element of the stream and then returns streams // back MapResponse elements. // TODO(map-batch): in the target state when we move the current // unary implementation to bi-di as well, we can rename this and // use a single rpc for both. - BatchMapFn(Map_BatchMapFnServer) error + MapStreamFn(Map_MapStreamFnServer) error mustEmbedUnimplementedMapServer() } @@ -121,8 +121,8 @@ func (UnimplementedMapServer) MapFn(context.Context, *MapRequest) (*MapResponse, func (UnimplementedMapServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") } -func (UnimplementedMapServer) BatchMapFn(Map_BatchMapFnServer) error { - return status.Errorf(codes.Unimplemented, "method BatchMapFn not implemented") +func (UnimplementedMapServer) MapStreamFn(Map_MapStreamFnServer) error { + return status.Errorf(codes.Unimplemented, "method MapStreamFn not implemented") } func (UnimplementedMapServer) mustEmbedUnimplementedMapServer() {} @@ -173,25 +173,25 @@ func _Map_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interfa return interceptor(ctx, in, info, handler) } -func _Map_BatchMapFn_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(MapServer).BatchMapFn(&mapBatchMapFnServer{stream}) +func _Map_MapStreamFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MapServer).MapStreamFn(&mapMapStreamFnServer{stream}) } -type Map_BatchMapFnServer interface { +type Map_MapStreamFnServer interface { Send(*MapResponse) error Recv() (*MapRequest, error) grpc.ServerStream } -type mapBatchMapFnServer struct { +type mapMapStreamFnServer struct { grpc.ServerStream } -func (x *mapBatchMapFnServer) Send(m *MapResponse) error { +func (x *mapMapStreamFnServer) Send(m *MapResponse) error { return x.ServerStream.SendMsg(m) } -func (x *mapBatchMapFnServer) Recv() (*MapRequest, error) { +func (x *mapMapStreamFnServer) Recv() (*MapRequest, error) { m := new(MapRequest) if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err @@ -217,8 +217,8 @@ var Map_ServiceDesc = grpc.ServiceDesc{ }, Streams: []grpc.StreamDesc{ { - StreamName: "BatchMapFn", - Handler: _Map_BatchMapFn_Handler, + StreamName: "MapStreamFn", + Handler: _Map_MapStreamFn_Handler, ServerStreams: true, ClientStreams: true, }, diff --git a/pkg/apis/proto/map/v1/mapmock/mapmock.go b/pkg/apis/proto/map/v1/mapmock/mapmock.go index 3644d177..178f13d0 100644 --- a/pkg/apis/proto/map/v1/mapmock/mapmock.go +++ b/pkg/apis/proto/map/v1/mapmock/mapmock.go @@ -37,26 +37,6 @@ func (m *MockMapClient) EXPECT() *MockMapClientMockRecorder { return m.recorder } -// BatchMapFn mocks base method. -func (m *MockMapClient) BatchMapFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.Map_BatchMapFnClient, error) { - m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "BatchMapFn", varargs...) - ret0, _ := ret[0].(v1.Map_BatchMapFnClient) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// BatchMapFn indicates an expected call of BatchMapFn. -func (mr *MockMapClientMockRecorder) BatchMapFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchMapFn", reflect.TypeOf((*MockMapClient)(nil).BatchMapFn), varargs...) -} - // IsReady mocks base method. func (m *MockMapClient) IsReady(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.ReadyResponse, error) { m.ctrl.T.Helper() @@ -96,3 +76,23 @@ func (mr *MockMapClientMockRecorder) MapFn(arg0, arg1 interface{}, arg2 ...inter varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MapFn", reflect.TypeOf((*MockMapClient)(nil).MapFn), varargs...) } + +// MapStreamFn mocks base method. +func (m *MockMapClient) MapStreamFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.Map_MapStreamFnClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MapStreamFn", varargs...) + ret0, _ := ret[0].(v1.Map_MapStreamFnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MapStreamFn indicates an expected call of MapStreamFn. +func (mr *MockMapClientMockRecorder) MapStreamFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MapStreamFn", reflect.TypeOf((*MockMapClient)(nil).MapStreamFn), varargs...) +} diff --git a/pkg/mapper/examples/batchmap/Makefile b/pkg/mapper/examples/batchmap/Makefile index 0621dba4..6c3fd22c 100644 --- a/pkg/mapper/examples/batchmap/Makefile +++ b/pkg/mapper/examples/batchmap/Makefile @@ -1,4 +1,4 @@ -TAG ?= mapbatchv5 +TAG ?= mapbatchemptyv1 PUSH ?= true .PHONY: build diff --git a/pkg/mapper/examples/batchmap/main.go b/pkg/mapper/examples/batchmap/main.go index 948a7182..35098f3a 100644 --- a/pkg/mapper/examples/batchmap/main.go +++ b/pkg/mapper/examples/batchmap/main.go @@ -11,20 +11,20 @@ func mapFn(_ context.Context, datums []mapper.Datum) mapper.BatchResponses { batchResponses := mapper.BatchResponsesBuilder() log.Println("MYDEBUG: length of input ", len(datums)) for _, d := range datums { - msg := d.Value() + //msg := d.Value() _ = d.EventTime() // Event time is available _ = d.Watermark() // Watermark is available results := mapper.NewBatchResponse(d.Id()) - for i := 0; i < 2; i++ { - results = results.Append(mapper.NewMessage(msg)) - } + //for i := 0; i < 2; i++ { + // results = results.Append(mapper.NewMessage(msg)) + //} batchResponses = batchResponses.Append(results) } return batchResponses } func main() { - err := mapper.NewBatchMapServer(mapper.BatchMapperFunc(mapFn)).Start(context.Background()) + err := mapper.NewBatchServer(mapper.BatchMapperFunc(mapFn)).Start(context.Background()) if err != nil { log.Panic("Failed to start map function server: ", err) } diff --git a/pkg/mapper/examples/batchmap/pipe.yaml b/pkg/mapper/examples/batchmap/pipe.yaml index a6ff8a08..64133802 100644 --- a/pkg/mapper/examples/batchmap/pipe.yaml +++ b/pkg/mapper/examples/batchmap/pipe.yaml @@ -3,8 +3,8 @@ kind: Pipeline metadata: name: simple-pipeline spec: - watermark: - disabled: true # Optional, defaults to false. +# watermark: +# disabled: true # Optional, defaults to false. limits: readBatchSize: 500 vertices: @@ -31,7 +31,7 @@ spec: udf: container: # image: "quay.io/numaio/numaflow-go/map-flatmap:stable" - image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchv5" + image: "quay.io/kohlisid/numaflow-go/map-flatmap:mapbatchemptyv1" imagePullPolicy: Always containerTemplate: resources: diff --git a/pkg/mapper/interface.go b/pkg/mapper/interface.go index 606a9ddb..1d8d0f0f 100644 --- a/pkg/mapper/interface.go +++ b/pkg/mapper/interface.go @@ -17,6 +17,8 @@ type Datum interface { Headers() map[string]string // Id returns the unique ID set for the given message Id() string + // Keys returns the keys associated with a given datum + Keys() []string } // Mapper is the interface of map function implementation. This is the traditional interface diff --git a/pkg/mapper/message.go b/pkg/mapper/message.go index ef88c373..643b3a84 100644 --- a/pkg/mapper/message.go +++ b/pkg/mapper/message.go @@ -6,9 +6,7 @@ var ( DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ ) -// =========================================================================================== // Common structures used in map functions -// =========================================================================================== // Message is used to wrap the data return by Map functions type Message struct { @@ -55,9 +53,7 @@ func (m Message) Tags() []string { return m.tags } -// =========================================================================================== // Utility structures for unary map use case -// =========================================================================================== type Messages []Message @@ -77,9 +73,7 @@ func (m Messages) Items() []Message { return m } -// =========================================================================================== // Utility structures for batch map mode -// =========================================================================================== // batchResponse is used to wrap the data return by batch map function along // with the ID of the corresponding request diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 33b105f4..54b1fd52 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -34,10 +34,10 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server { return s } -// NewBatchMapServer creates a new batch map server. +// NewBatchServer creates a new batch map server. // TODO(map-batch): as this would be a streaming server should we see if there are some options (like maxMessageSize) // which are different than unary server which are optimal for this use case. -func NewBatchMapServer(m BatchMapper, inputOptions ...Option) numaflow.Server { +func NewBatchServer(m BatchMapper, inputOptions ...Option) numaflow.Server { opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) diff --git a/pkg/mapper/server_test.go b/pkg/mapper/server_test.go index c4736e85..6ff2fe59 100644 --- a/pkg/mapper/server_test.go +++ b/pkg/mapper/server_test.go @@ -30,3 +30,31 @@ func TestMapServer_Start(t *testing.T) { err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) } + +func TestBatchMapServer_Start(t *testing.T) { + socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock") + defer func() { + _ = os.RemoveAll(socketFile.Name()) + }() + + serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + + var mapHandler = BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses.Append(results) + } + + return batchResponses + }) + // note: using actual uds connection + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + err := NewBatchServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) + assert.NoError(t, err) +} diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 8935157d..0006a563 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -2,6 +2,7 @@ package mapper import ( "context" + "fmt" "io" "log" @@ -33,7 +34,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons // MapFn applies a user defined function to each request element and returns a list of results. func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapResponse, error) { - var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), emptyId) + var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), emptyId, d.GetKeys()) messages := fs.Mapper.Map(ctx, d.GetKeys(), hd) var elements []*mappb.MapResponse_Result for _, m := range messages.Items() { @@ -49,8 +50,8 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe return datumList, nil } -// BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them. -func (fs *Service) BatchMapFn(stream mappb.Map_BatchMapFnServer) error { +// MapStreamFn applies a user defined function to a stream of request element and streams back the responses for them. +func (fs *Service) MapStreamFn(stream mappb.Map_MapStreamFnServer) error { ctx := stream.Context() // As the BatchMap interface expects a list of request elements @@ -63,16 +64,26 @@ func (fs *Service) BatchMapFn(stream mappb.Map_BatchMapFnServer) error { break } if err != nil { - log.Println("BatchMapFn: Got an error while recv() on stream", err) + log.Println("MapStreamFn: Got an error while recv() on stream", err) return err } - var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId()) + var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys()) datums = append(datums, hd) } // Apply the user BatchMap implementation function responses := fs.BatchMapper.BatchMap(ctx, datums) + // If the number of responses received does not align with the request batch size, + // we will not be able to process the data correctly. + // This should be marked as an error and shown to the user. + // TODO(map-batch): We could potentially panic here as well + if len(responses.Items()) != len(datums) { + errMsg := "batchMapFn: mismatch between length of batch requests and responses" + log.Println(errMsg) + return fmt.Errorf(errMsg) + } + // iterate over the responses received and covert to the required proto format for _, batchResp := range responses.Items() { var elements []*mappb.MapResponse_Result @@ -91,7 +102,7 @@ func (fs *Service) BatchMapFn(stream mappb.Map_BatchMapFnServer) error { // this would contain all the responses for that request. err := stream.Send(singleRequestResp) if err != nil { - log.Println("BatchMapFn: Got an error while Send() on stream", err) + log.Println("MapStreamFn: Got an error while Send() on stream", err) return err } } diff --git a/pkg/mapper/types.go b/pkg/mapper/types.go index 50107fef..2a12992e 100644 --- a/pkg/mapper/types.go +++ b/pkg/mapper/types.go @@ -9,15 +9,17 @@ type handlerDatum struct { watermark time.Time headers map[string]string id string + keys []string } -func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, id string) Datum { +func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, id string, keys []string) Datum { return &handlerDatum{ value: value, eventTime: eventTime, watermark: watermark, headers: headers, id: id, + keys: keys, } } @@ -40,3 +42,7 @@ func (h *handlerDatum) Headers() map[string]string { func (h *handlerDatum) Id() string { return h.id } + +func (h *handlerDatum) Keys() []string { + return h.keys +}