diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index 58f978de..3862583e 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.28.1 +// protoc v3.21.12 // source: pkg/apis/proto/map/v1/map.proto package v1 @@ -34,6 +34,7 @@ type MapRequest struct { EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` } func (x *MapRequest) Reset() { @@ -103,14 +104,25 @@ func (x *MapRequest) GetHeaders() map[string]string { return nil } -// * -// MapResponse represents a response element. +func (x *MapRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +// TODO(map-batch) - currently this is used by both batch map and unary map. +// Do we want to have a separate response struct for batch map responses +// which have only one element instead of a list of responses. +// In that case we need a different mechanism to indicate that all the responses for a given request +// have been completed. type MapResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields Results []*MapResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` } func (x *MapResponse) Reset() { @@ -152,6 +164,13 @@ func (x *MapResponse) GetResults() []*MapResponse_Result { return nil } +func (x *MapResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -273,7 +292,7 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xa2, 0x02, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb2, 0x02, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, @@ -288,33 +307,39 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8b, 0x01, 0x0a, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x9b, 0x01, 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x73, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x74, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, - 0x79, 0x32, 0x71, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x30, 0x0a, 0x05, 0x4d, 0x61, 0x70, 0x46, - 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, - 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x49, 0x73, - 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, 0x2e, - 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, - 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, - 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x79, 0x32, 0xad, 0x01, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x30, 0x0a, 0x05, 0x4d, 0x61, 0x70, + 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, + 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x46, 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, + 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, + 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, + 0x01, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, + 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -346,10 +371,12 @@ var file_pkg_apis_proto_map_v1_map_proto_depIdxs = []int32{ 4, // 3: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result 0, // 4: map.v1.Map.MapFn:input_type -> map.v1.MapRequest 6, // 5: map.v1.Map.IsReady:input_type -> google.protobuf.Empty - 1, // 6: map.v1.Map.MapFn:output_type -> map.v1.MapResponse - 2, // 7: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse - 6, // [6:8] is the sub-list for method output_type - 4, // [4:6] is the sub-list for method input_type + 0, // 6: map.v1.Map.MapStreamFn:input_type -> map.v1.MapRequest + 1, // 7: map.v1.Map.MapFn:output_type -> map.v1.MapResponse + 2, // 8: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse + 1, // 9: map.v1.Map.MapStreamFn:output_type -> map.v1.MapResponse + 7, // [7:10] is the sub-list for method output_type + 4, // [4:7] is the sub-list for method input_type 4, // [4:4] is the sub-list for extension type_name 4, // [4:4] is the sub-list for extension extendee 0, // [0:4] is the sub-list for field type_name diff --git a/pkg/apis/proto/map/v1/map.proto b/pkg/apis/proto/map/v1/map.proto index e93a9b01..a38833ce 100644 --- a/pkg/apis/proto/map/v1/map.proto +++ b/pkg/apis/proto/map/v1/map.proto @@ -13,6 +13,14 @@ service Map { // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); + + // MapStreamFn is a bi-directional streaming rpc which applies a + // Map function on each element of the stream and then returns streams + // back MapResponse elements. + // TODO(map-batch): in the target state when we move the current + // unary implementation to bi-di as well, we can rename this and + // use a single rpc for both. + rpc MapStreamFn(stream MapRequest) returns (stream MapResponse); } /** @@ -24,11 +32,17 @@ message MapRequest { google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; map headers = 5; + string id = 6; } /** * MapResponse represents a response element. */ +// TODO(map-batch) - currently this is used by both batch map and unary map. +// Do we want to have a separate response struct for batch map responses +// which have only one element instead of a list of responses. +// In that case we need a different mechanism to indicate that all the responses for a given request +// have been completed. message MapResponse { message Result { repeated string keys = 1; @@ -36,6 +50,7 @@ message MapResponse { repeated string tags = 3; } repeated Result results = 1; + string id = 2; } /** diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index d3844348..9f42fd43 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc v3.21.12 // source: pkg/apis/proto/map/v1/map.proto package v1 @@ -27,6 +27,13 @@ type MapClient interface { MapFn(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) + // MapStreamFn is a bi-directional streaming rpc which applies a + // Map function on each element of the stream and then returns streams + // back MapResponse elements. + // TODO(map-batch): in the target state when we move the current + // unary implementation to bi-di as well, we can rename this and + // use a single rpc for both. + MapStreamFn(ctx context.Context, opts ...grpc.CallOption) (Map_MapStreamFnClient, error) } type mapClient struct { @@ -55,6 +62,37 @@ func (c *mapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc return out, nil } +func (c *mapClient) MapStreamFn(ctx context.Context, opts ...grpc.CallOption) (Map_MapStreamFnClient, error) { + stream, err := c.cc.NewStream(ctx, &Map_ServiceDesc.Streams[0], "/map.v1.Map/MapStreamFn", opts...) + if err != nil { + return nil, err + } + x := &mapMapStreamFnClient{stream} + return x, nil +} + +type Map_MapStreamFnClient interface { + Send(*MapRequest) error + Recv() (*MapResponse, error) + grpc.ClientStream +} + +type mapMapStreamFnClient struct { + grpc.ClientStream +} + +func (x *mapMapStreamFnClient) Send(m *MapRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *mapMapStreamFnClient) Recv() (*MapResponse, error) { + m := new(MapResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // MapServer is the server API for Map service. // All implementations must embed UnimplementedMapServer // for forward compatibility @@ -63,6 +101,13 @@ type MapServer interface { MapFn(context.Context, *MapRequest) (*MapResponse, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) + // MapStreamFn is a bi-directional streaming rpc which applies a + // Map function on each element of the stream and then returns streams + // back MapResponse elements. + // TODO(map-batch): in the target state when we move the current + // unary implementation to bi-di as well, we can rename this and + // use a single rpc for both. + MapStreamFn(Map_MapStreamFnServer) error mustEmbedUnimplementedMapServer() } @@ -76,6 +121,9 @@ func (UnimplementedMapServer) MapFn(context.Context, *MapRequest) (*MapResponse, func (UnimplementedMapServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") } +func (UnimplementedMapServer) MapStreamFn(Map_MapStreamFnServer) error { + return status.Errorf(codes.Unimplemented, "method MapStreamFn not implemented") +} func (UnimplementedMapServer) mustEmbedUnimplementedMapServer() {} // UnsafeMapServer may be embedded to opt out of forward compatibility for this service. @@ -125,6 +173,32 @@ func _Map_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interfa return interceptor(ctx, in, info, handler) } +func _Map_MapStreamFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(MapServer).MapStreamFn(&mapMapStreamFnServer{stream}) +} + +type Map_MapStreamFnServer interface { + Send(*MapResponse) error + Recv() (*MapRequest, error) + grpc.ServerStream +} + +type mapMapStreamFnServer struct { + grpc.ServerStream +} + +func (x *mapMapStreamFnServer) Send(m *MapResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *mapMapStreamFnServer) Recv() (*MapRequest, error) { + m := new(MapRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // Map_ServiceDesc is the grpc.ServiceDesc for Map service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -141,6 +215,13 @@ var Map_ServiceDesc = grpc.ServiceDesc{ Handler: _Map_IsReady_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "MapStreamFn", + Handler: _Map_MapStreamFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "pkg/apis/proto/map/v1/map.proto", } diff --git a/pkg/apis/proto/map/v1/mapmock/mapmock.go b/pkg/apis/proto/map/v1/mapmock/mapmock.go index 1ae465da..6af8169a 100644 --- a/pkg/apis/proto/map/v1/mapmock/mapmock.go +++ b/pkg/apis/proto/map/v1/mapmock/mapmock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1 (interfaces: MapClient) +// Source: github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1 (interfaces: MapClient,Map_MapStreamFnClient) // Package mapmock is a generated GoMock package. package mapmock @@ -11,6 +11,7 @@ import ( gomock "github.com/golang/mock/gomock" v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" emptypb "google.golang.org/protobuf/types/known/emptypb" ) @@ -76,3 +77,160 @@ func (mr *MockMapClientMockRecorder) MapFn(arg0, arg1 interface{}, arg2 ...inter varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MapFn", reflect.TypeOf((*MockMapClient)(nil).MapFn), varargs...) } + +// MapStreamFn mocks base method. +func (m *MockMapClient) MapStreamFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.Map_MapStreamFnClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "MapStreamFn", varargs...) + ret0, _ := ret[0].(v1.Map_MapStreamFnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MapStreamFn indicates an expected call of MapStreamFn. +func (mr *MockMapClientMockRecorder) MapStreamFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MapStreamFn", reflect.TypeOf((*MockMapClient)(nil).MapStreamFn), varargs...) +} + +// MockMap_MapStreamFnClient is a mock of Map_MapStreamFnClient interface. +type MockMap_MapStreamFnClient struct { + ctrl *gomock.Controller + recorder *MockMap_MapStreamFnClientMockRecorder +} + +// MockMap_MapStreamFnClientMockRecorder is the mock recorder for MockMap_MapStreamFnClient. +type MockMap_MapStreamFnClientMockRecorder struct { + mock *MockMap_MapStreamFnClient +} + +// NewMockMap_MapStreamFnClient creates a new mock instance. +func NewMockMap_MapStreamFnClient(ctrl *gomock.Controller) *MockMap_MapStreamFnClient { + mock := &MockMap_MapStreamFnClient{ctrl: ctrl} + mock.recorder = &MockMap_MapStreamFnClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMap_MapStreamFnClient) EXPECT() *MockMap_MapStreamFnClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockMap_MapStreamFnClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockMap_MapStreamFnClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockMap_MapStreamFnClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockMap_MapStreamFnClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockMap_MapStreamFnClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockMap_MapStreamFnClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockMap_MapStreamFnClient) Recv() (*v1.MapResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*v1.MapResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockMap_MapStreamFnClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockMap_MapStreamFnClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockMap_MapStreamFnClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockMap_MapStreamFnClient) Send(arg0 *v1.MapRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockMap_MapStreamFnClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockMap_MapStreamFnClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockMap_MapStreamFnClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockMap_MapStreamFnClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockMap_MapStreamFnClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockMap_MapStreamFnClient)(nil).Trailer)) +} diff --git a/pkg/apis/proto/map/v1/mockgen.go b/pkg/apis/proto/map/v1/mockgen.go index f0daeae9..e4743511 100644 --- a/pkg/apis/proto/map/v1/mockgen.go +++ b/pkg/apis/proto/map/v1/mockgen.go @@ -1,3 +1,3 @@ package v1 -//go:generate mockgen -destination mapmock/mapmock.go -package mapmock github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1 MapClient +//go:generate mockgen -destination mapmock/mapmock.go -package mapmock github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1 MapClient,Map_MapStreamFnClient diff --git a/pkg/mapper/batch_service_test.go b/pkg/mapper/batch_service_test.go new file mode 100644 index 00000000..98ab8f99 --- /dev/null +++ b/pkg/mapper/batch_service_test.go @@ -0,0 +1,290 @@ +package mapper + +import ( + "context" + "fmt" + "io" + "reflect" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" + + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" +) + +type BatchMapStreamFnServerTest struct { + ctx context.Context + outputCh chan *mappb.MapResponse + inputCh chan *mappb.MapRequest + grpc.ServerStream +} + +func NewBatchBatchMapStreamFnServerTest( + ctx context.Context, + inputCh chan *mappb.MapRequest, + outputCh chan *mappb.MapResponse, +) *BatchMapStreamFnServerTest { + return &BatchMapStreamFnServerTest{ + ctx: ctx, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (u *BatchMapStreamFnServerTest) Recv() (*mappb.MapRequest, error) { + val, ok := <-u.inputCh + if !ok { + return val, io.EOF + } + return val, nil +} + +func (u *BatchMapStreamFnServerTest) Send(d *mappb.MapResponse) error { + u.outputCh <- d + return nil +} + +func (u *BatchMapStreamFnServerTest) Context() context.Context { + return u.ctx +} + +type MapStreamFnServerErrTest struct { + ctx context.Context + inputCh chan *mappb.MapRequest + outputCh chan *mappb.MapResponse + grpc.ServerStream +} + +func NewMapStreamFnServerErrTest( + ctx context.Context, + inputCh chan *mappb.MapRequest, + outputCh chan *mappb.MapResponse, + +) *MapStreamFnServerErrTest { + return &MapStreamFnServerErrTest{ + ctx: ctx, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (u *MapStreamFnServerErrTest) Recv() (*mappb.MapRequest, error) { + val, ok := <-u.inputCh + if !ok { + return val, io.EOF + } + return val, nil +} + +func (u *MapStreamFnServerErrTest) Send(_ *mappb.MapResponse) error { + return fmt.Errorf("send error") +} + +func (u *MapStreamFnServerErrTest) Context() context.Context { + return u.ctx +} + +func TestService_MapFnStream(t *testing.T) { + tests := []struct { + name string + handler BatchMapper + input []*mappb.MapRequest + expected []*mappb.MapResponse + expectedErr bool + streamErr bool + }{ + { + name: "map_stream_fn_forward_msg", + handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses = batchResponses.Append(results) + } + return batchResponses + }), + input: []*mappb.MapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*mappb.MapResponse{ + { + Results: []*mappb.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*mappb.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: false, + }, + { + name: "batch_map_mismatch_output_len", + handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + return batchResponses + }), + input: []*mappb.MapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*mappb.MapResponse{ + { + Results: []*mappb.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*mappb.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: true, + }, + { + name: "batch_map_stream_err", + handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses = batchResponses.Append(results) + } + return batchResponses + }), + input: []*mappb.MapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*mappb.MapResponse{ + { + Results: []*mappb.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*mappb.MapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: true, + streamErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fs := &Service{ + BatchMapper: tt.handler, + } + // here's a trick for testing: + // because we are not using gRPC, we directly set a new incoming ctx + // instead of the regular outgoing context in the real gRPC connection. + ctx := context.Background() + inputCh := make(chan *mappb.MapRequest) + outputCh := make(chan *mappb.MapResponse) + result := make([]*mappb.MapResponse, 0) + + var udfMapStreamFnStream mappb.Map_MapStreamFnServer + if tt.streamErr { + udfMapStreamFnStream = NewMapStreamFnServerErrTest(ctx, inputCh, outputCh) + } else { + udfMapStreamFnStream = NewBatchBatchMapStreamFnServerTest(ctx, inputCh, outputCh) + } + + var wg sync.WaitGroup + var err error + + wg.Add(1) + go func() { + defer wg.Done() + err = fs.MapStreamFn(udfMapStreamFnStream) + close(outputCh) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for msg := range outputCh { + result = append(result, msg) + } + }() + + for _, val := range tt.input { + inputCh <- val + } + close(inputCh) + wg.Wait() + + if err != nil { + assert.True(t, tt.expectedErr, "MapStreamFn() error = %v, expectedErr %v", err, tt.expectedErr) + return + } + + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("MapStreamFn() got = %v, want %v", result, tt.expected) + } + + }) + } +} diff --git a/pkg/mapper/examples/batchmap-flatmap/Dockerfile b/pkg/mapper/examples/batchmap-flatmap/Dockerfile new file mode 100644 index 00000000..c3b6cc45 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/Dockerfile @@ -0,0 +1,20 @@ +#################################################################################################### +# base +#################################################################################################### +FROM alpine:3.12.3 as base +RUN apk update && apk upgrade && \ + apk add ca-certificates && \ + apk --no-cache add tzdata + +COPY dist/flatmap-example /bin/flatmap-example +RUN chmod +x /bin/flatmap-example + +#################################################################################################### +# flatmap +#################################################################################################### +FROM scratch as flatmap +ARG ARCH +COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo +COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=base /bin/flatmap-example /bin/flatmap-example +ENTRYPOINT [ "/bin/flatmap-example" ] diff --git a/pkg/mapper/examples/batchmap-flatmap/Makefile b/pkg/mapper/examples/batchmap-flatmap/Makefile new file mode 100644 index 00000000..81328294 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/Makefile @@ -0,0 +1,18 @@ +TAG ?= stable +PUSH ?= false + +.PHONY: build +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go + +.PHONY: image-push +image-push: build + docker buildx build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push + +.PHONY: image +image: build + docker build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --target flatmap . + @if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}"; fi + +clean: + -rm -rf ./dist diff --git a/pkg/mapper/examples/batchmap-flatmap/README.md b/pkg/mapper/examples/batchmap-flatmap/README.md new file mode 100644 index 00000000..0a639de2 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/README.md @@ -0,0 +1,3 @@ +# Map Batch Flatmap + +An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function. diff --git a/pkg/mapper/examples/batchmap-flatmap/go.mod b/pkg/mapper/examples/batchmap-flatmap/go.mod new file mode 100644 index 00000000..88f3f383 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/go.mod @@ -0,0 +1,17 @@ +module flatmap + +go 1.20 + +replace github.com/numaproj/numaflow-go => ../../../.. + +require github.com/numaproj/numaflow-go v0.7.0-rc2 + +require ( + github.com/golang/protobuf v1.5.3 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + google.golang.org/grpc v1.57.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/pkg/mapper/examples/batchmap-flatmap/go.sum b/pkg/mapper/examples/batchmap-flatmap/go.sum new file mode 100644 index 00000000..95c8479a --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/go.sum @@ -0,0 +1,24 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/mapper/examples/batchmap-flatmap/main.go b/pkg/mapper/examples/batchmap-flatmap/main.go new file mode 100644 index 00000000..e2ea415a --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "log" + "strings" + + "github.com/numaproj/numaflow-go/pkg/mapper" +) + +func mapFn(_ context.Context, datums []mapper.Datum) mapper.BatchResponses { + batchResponses := mapper.BatchResponsesBuilder() + for _, d := range datums { + msg := d.Value() + _ = d.EventTime() // Event time is available + _ = d.Watermark() // Watermark is available + results := mapper.NewBatchResponse(d.Id()) + strs := strings.Split(string(msg), ",") + for _, s := range strs { + results = results.Append(mapper.NewMessage([]byte(s))) + } + batchResponses = batchResponses.Append(results) + } + return batchResponses +} + +func main() { + err := mapper.NewBatchServer(mapper.BatchMapperFunc(mapFn)).Start(context.Background()) + if err != nil { + log.Panic("Failed to start map function server: ", err) + } +} diff --git a/pkg/mapper/examples/batchmap-flatmap/pipe.yaml b/pkg/mapper/examples/batchmap-flatmap/pipe.yaml new file mode 100644 index 00000000..7b7cd3b9 --- /dev/null +++ b/pkg/mapper/examples/batchmap-flatmap/pipe.yaml @@ -0,0 +1,34 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: flatmap +spec: + vertices: + - name: in + source: + http: {} + - name: go-split + metadata: + annotations: + numaflow.numaproj.io/batch-map: "true" + scale: + min: 1 + udf: + container: + # Split input message into an array with comma, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/flatmap + image: quay.io/numaio/numaflow-go/batch-map-flatmap:stable + imagePullPolicy: Always + - name: go-udsink + scale: + min: 1 + sink: + udsink: + container: + # https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/log + image: quay.io/numaio/numaflow-go/sink-log:stable + imagePullPolicy: Always + edges: + - from: in + to: go-split + - from: go-split + to: go-udsink diff --git a/pkg/mapper/interface.go b/pkg/mapper/interface.go index e4f8355f..1d8d0f0f 100644 --- a/pkg/mapper/interface.go +++ b/pkg/mapper/interface.go @@ -15,9 +15,15 @@ type Datum interface { Watermark() time.Time // Headers returns the headers of the message. Headers() map[string]string + // Id returns the unique ID set for the given message + Id() string + // Keys returns the keys associated with a given datum + Keys() []string } -// Mapper is the interface of map function implementation. +// Mapper is the interface of map function implementation. This is the traditional interface +// where a single message is passed as input and the responses corresponding to that request +// are returned. type Mapper interface { // Map is the function to process each coming message. Map(ctx context.Context, keys []string, datum Datum) Messages @@ -30,3 +36,18 @@ type MapperFunc func(ctx context.Context, keys []string, datum Datum) Messages func (mf MapperFunc) Map(ctx context.Context, keys []string, datum Datum) Messages { return mf(ctx, keys, datum) } + +// BatchMapper is the interface for a Batch Map mode where the user is given a list +// of messages, and they return the consolidated response for all of them together. +type BatchMapper interface { + // BatchMap is the function which processes a list of input messages + BatchMap(ctx context.Context, datums []Datum) BatchResponses +} + +// BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper. +type BatchMapperFunc func(ctx context.Context, datums []Datum) BatchResponses + +// BatchMap implements the functionality of BatchMap function. +func (mf BatchMapperFunc) BatchMap(ctx context.Context, datums []Datum) BatchResponses { + return mf(ctx, datums) +} diff --git a/pkg/mapper/message.go b/pkg/mapper/message.go index 4693e050..643b3a84 100644 --- a/pkg/mapper/message.go +++ b/pkg/mapper/message.go @@ -6,6 +6,8 @@ var ( DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ ) +// Common structures used in map functions + // Message is used to wrap the data return by Map functions type Message struct { value []byte @@ -51,6 +53,8 @@ func (m Message) Tags() []string { return m.tags } +// Utility structures for unary map use case + type Messages []Message // MessagesBuilder returns an empty instance of Messages @@ -68,3 +72,57 @@ func (m Messages) Append(msg Message) Messages { func (m Messages) Items() []Message { return m } + +// Utility structures for batch map mode + +// batchResponse is used to wrap the data return by batch map function along +// with the ID of the corresponding request +type batchResponse struct { + id string + messages []Message +} + +// Id returns request ID for the given list of responses +func (m batchResponse) Id() string { + return m.id +} + +// Append appends a Message to the messages list of a batchResponse +// object and then returns the updated object. +func (m batchResponse) Append(msg Message) batchResponse { + m.messages = append(m.messages, msg) + return m +} + +// Items returns the message list for a batchResponse +func (m batchResponse) Items() []Message { + return m.messages +} + +// BatchResponses is a list of batchResponse which signify the consolidated +// results for a batch of input messages. +type BatchResponses []batchResponse + +// NewBatchResponse is a utility function used to create a new batchResponse object +// Specifying an id is a mandatory requirement, as it is required to reference the +// responses back to a request. +func NewBatchResponse(id string) batchResponse { + return batchResponse{ + id: id, + messages: MessagesBuilder(), + } +} +func BatchResponsesBuilder() BatchResponses { + return BatchResponses{} +} + +// Append appends a Message +func (m BatchResponses) Append(msg batchResponse) BatchResponses { + m = append(m, msg) + return m +} + +// Items returns the message list +func (m BatchResponses) Items() []batchResponse { + return m +} diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 60399b24..54b1fd52 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -19,7 +19,10 @@ type server struct { } // NewServer creates a new map server. +// TODO(map-batch): We call the unary map server as -> NewServer to keep backward compatibility +// Otherwise all users will have to update their current code when this change is released. func NewServer(m Mapper, inputOptions ...Option) numaflow.Server { + // opts := defaultOptions() for _, inputOption := range inputOptions { inputOption(opts) @@ -31,6 +34,21 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server { return s } +// NewBatchServer creates a new batch map server. +// TODO(map-batch): as this would be a streaming server should we see if there are some options (like maxMessageSize) +// which are different than unary server which are optimal for this use case. +func NewBatchServer(m BatchMapper, inputOptions ...Option) numaflow.Server { + opts := defaultOptions() + for _, inputOption := range inputOptions { + inputOption(opts) + } + s := new(server) + s.svc = new(Service) + s.svc.BatchMapper = m + s.opts = opts + return s +} + // Start starts the map server. func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) diff --git a/pkg/mapper/server_test.go b/pkg/mapper/server_test.go index c4736e85..922473a3 100644 --- a/pkg/mapper/server_test.go +++ b/pkg/mapper/server_test.go @@ -30,3 +30,30 @@ func TestMapServer_Start(t *testing.T) { err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) } + +func TestBatchMapServer_Start(t *testing.T) { + socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock") + defer func() { + _ = os.RemoveAll(socketFile.Name()) + }() + + serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + + var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses.Append(results) + } + return batchResponses + }) + // note: using actual uds connection + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + err := NewBatchServer(batchMapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) + assert.NoError(t, err) +} diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 636f494e..0006a563 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -2,6 +2,9 @@ package mapper import ( "context" + "fmt" + "io" + "log" "google.golang.org/protobuf/types/known/emptypb" @@ -13,13 +16,15 @@ const ( address = "/var/run/numaflow/map.sock" defaultMaxMessageSize = 1024 * 1024 * 64 serverInfoFilePath = "/var/run/numaflow/mapper-server-info" + emptyId = "" ) // Service implements the proto gen server interface and contains the map operation // handler. type Service struct { mappb.UnimplementedMapServer - Mapper Mapper + Mapper Mapper + BatchMapper BatchMapper } // IsReady returns true to indicate the gRPC connection is ready. @@ -29,7 +34,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons // MapFn applies a user defined function to each request element and returns a list of results. func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapResponse, error) { - var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders()) + var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), emptyId, d.GetKeys()) messages := fs.Mapper.Map(ctx, d.GetKeys(), hd) var elements []*mappb.MapResponse_Result for _, m := range messages.Items() { @@ -44,3 +49,64 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe } return datumList, nil } + +// MapStreamFn applies a user defined function to a stream of request element and streams back the responses for them. +func (fs *Service) MapStreamFn(stream mappb.Map_MapStreamFnServer) error { + ctx := stream.Context() + + // As the BatchMap interface expects a list of request elements + // we read all the requests coming on the stream and keep appending them together + // and then finally send the array for processing. + datums := make([]Datum, 0) + for { + d, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Println("MapStreamFn: Got an error while recv() on stream", err) + return err + } + var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys()) + datums = append(datums, hd) + } + + // Apply the user BatchMap implementation function + responses := fs.BatchMapper.BatchMap(ctx, datums) + + // If the number of responses received does not align with the request batch size, + // we will not be able to process the data correctly. + // This should be marked as an error and shown to the user. + // TODO(map-batch): We could potentially panic here as well + if len(responses.Items()) != len(datums) { + errMsg := "batchMapFn: mismatch between length of batch requests and responses" + log.Println(errMsg) + return fmt.Errorf(errMsg) + } + + // iterate over the responses received and covert to the required proto format + for _, batchResp := range responses.Items() { + var elements []*mappb.MapResponse_Result + for _, resp := range batchResp.Items() { + elements = append(elements, &mappb.MapResponse_Result{ + Keys: resp.Keys(), + Value: resp.Value(), + Tags: resp.Tags(), + }) + } + singleRequestResp := &mappb.MapResponse{ + Results: elements, + Id: batchResp.Id(), + } + // We stream back the result for a single request ID + // this would contain all the responses for that request. + err := stream.Send(singleRequestResp) + if err != nil { + log.Println("MapStreamFn: Got an error while Send() on stream", err) + return err + } + } + // Once all responses are sent we can return, this would indicate the end of the rpc and + // send an EOF to the client on the stream + return nil +} diff --git a/pkg/mapper/types.go b/pkg/mapper/types.go index fd18c024..2a12992e 100644 --- a/pkg/mapper/types.go +++ b/pkg/mapper/types.go @@ -8,14 +8,18 @@ type handlerDatum struct { eventTime time.Time watermark time.Time headers map[string]string + id string + keys []string } -func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string) Datum { +func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, id string, keys []string) Datum { return &handlerDatum{ value: value, eventTime: eventTime, watermark: watermark, headers: headers, + id: id, + keys: keys, } } @@ -34,3 +38,11 @@ func (h *handlerDatum) Watermark() time.Time { func (h *handlerDatum) Headers() map[string]string { return h.headers } + +func (h *handlerDatum) Id() string { + return h.id +} + +func (h *handlerDatum) Keys() []string { + return h.keys +}