From 15e45210b784d009e674bf56e7a5801f16dc72fe Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Wed, 10 Jul 2024 22:17:31 -0700 Subject: [PATCH] feat: implement batch map (#133) Signed-off-by: Sidhant Kohli Co-authored-by: Sidhant Kohli --- .github/workflows/build-push.yaml | 2 +- pkg/apis/proto/batchmap/v1/batchmap.pb.go | 456 ++++++++++++++++++ pkg/apis/proto/batchmap/v1/batchmap.proto | 52 ++ .../proto/batchmap/v1/batchmap_grpc.pb.go | 183 +++++++ .../batchmap/v1/batchmapmock/batchmapmock.go | 216 +++++++++ pkg/apis/proto/batchmap/v1/mockgen.go | 3 + pkg/batchmapper/doc.go | 5 + .../examples/batchmap-flatmap/Dockerfile | 20 + .../examples/batchmap-flatmap/Makefile | 19 + .../examples/batchmap-flatmap/README.md | 27 ++ .../examples/batchmap-flatmap/go.mod | 19 + .../examples/batchmap-flatmap/go.sum | 33 ++ .../examples/batchmap-flatmap/main.go | 33 ++ .../examples/batchmap-flatmap/pipeline.yaml | 34 ++ pkg/batchmapper/interface.go | 37 ++ pkg/batchmapper/message.go | 106 ++++ pkg/batchmapper/options.go | 39 ++ pkg/batchmapper/options_test.go | 18 + pkg/batchmapper/server.go | 58 +++ pkg/batchmapper/server_test.go | 37 ++ pkg/batchmapper/service.go | 126 +++++ pkg/batchmapper/service_test.go | 249 ++++++++++ pkg/batchmapper/types.go | 48 ++ pkg/sourcetransformer/server.go | 2 +- 24 files changed, 1820 insertions(+), 2 deletions(-) create mode 100644 pkg/apis/proto/batchmap/v1/batchmap.pb.go create mode 100644 pkg/apis/proto/batchmap/v1/batchmap.proto create mode 100644 pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go create mode 100644 pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go create mode 100644 pkg/apis/proto/batchmap/v1/mockgen.go create mode 100644 pkg/batchmapper/doc.go create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/Dockerfile create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/Makefile create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/README.md create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/go.mod create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/go.sum create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/main.go create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml create mode 100644 pkg/batchmapper/interface.go create mode 100644 pkg/batchmapper/message.go create mode 100644 pkg/batchmapper/options.go create mode 100644 pkg/batchmapper/options_test.go create mode 100644 pkg/batchmapper/server.go create mode 100644 pkg/batchmapper/server_test.go create mode 100644 pkg/batchmapper/service.go create mode 100644 pkg/batchmapper/service_test.go create mode 100644 pkg/batchmapper/types.go diff --git a/.github/workflows/build-push.yaml b/.github/workflows/build-push.yaml index 4896f619..8b27a413 100644 --- a/.github/workflows/build-push.yaml +++ b/.github/workflows/build-push.yaml @@ -25,7 +25,7 @@ jobs: "pkg/sinker/examples/fallback", "pkg/sideinput/examples/map_sideinput", "pkg/sideinput/examples/reduce_sideinput", "pkg/sideinput/examples/sideinput_function", "pkg/sideinput/examples/simple_source_with_sideinput", "pkg/sideinput/examples/sink_sideinput", "pkg/sinker/examples/redis-sink", "pkg/sideinput/examples/map_sideinput/udf", - "pkg/sideinput/examples/reduce_sideinput/udf" + "pkg/sideinput/examples/reduce_sideinput/udf", "pkg/batchmapper/examples/batchmap-flatmap" ] steps: diff --git a/pkg/apis/proto/batchmap/v1/batchmap.pb.go b/pkg/apis/proto/batchmap/v1/batchmap.pb.go new file mode 100644 index 00000000..1d34241c --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmap.pb.go @@ -0,0 +1,456 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: pkg/apis/proto/batchmap/v1/batchmap.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// * +// BatchMapRequest represents a request element. +type BatchMapRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // This ID is used uniquely identify a map request + Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *BatchMapRequest) Reset() { + *x = BatchMapRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMapRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMapRequest) ProtoMessage() {} + +func (x *BatchMapRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMapRequest.ProtoReflect.Descriptor instead. +func (*BatchMapRequest) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{0} +} + +func (x *BatchMapRequest) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *BatchMapRequest) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *BatchMapRequest) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *BatchMapRequest) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +func (x *BatchMapRequest) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *BatchMapRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +// * +// BatchMapResponse represents a response element. +type BatchMapResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Results []*BatchMapResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + // This ID is used to refer the responses to the request it corresponds to. + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *BatchMapResponse) Reset() { + *x = BatchMapResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMapResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMapResponse) ProtoMessage() {} + +func (x *BatchMapResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMapResponse.ProtoReflect.Descriptor instead. +func (*BatchMapResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchMapResponse) GetResults() []*BatchMapResponse_Result { + if x != nil { + return x.Results + } + return nil +} + +func (x *BatchMapResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +// * +// ReadyResponse is the health check result. +type ReadyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ready bool `protobuf:"varint,1,opt,name=ready,proto3" json:"ready,omitempty"` +} + +func (x *ReadyResponse) Reset() { + *x = ReadyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadyResponse) ProtoMessage() {} + +func (x *ReadyResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. +func (*ReadyResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{2} +} + +func (x *ReadyResponse) GetReady() bool { + if x != nil { + return x.Ready + } + return false +} + +type BatchMapResponse_Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` +} + +func (x *BatchMapResponse_Result) Reset() { + *x = BatchMapResponse_Result{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMapResponse_Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMapResponse_Result) ProtoMessage() {} + +func (x *BatchMapResponse_Result) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMapResponse_Result.ProtoReflect.Descriptor instead. +func (*BatchMapResponse_Result) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *BatchMapResponse_Result) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *BatchMapResponse_Result) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *BatchMapResponse_Result) GetTags() []string { + if x != nil { + return x.Tags + } + return nil +} + +var File_pkg_apis_proto_batchmap_v1_batchmap_proto protoreflect.FileDescriptor + +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc = []byte{ + 0x0a, 0x29, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x2f, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc1, 0x02, 0x0a, 0x0f, 0x42, 0x61, 0x74, 0x63, 0x68, + 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, + 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, + 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, + 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x43, 0x0a, 0x07, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, + 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, + 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xaa, 0x01, 0x0a, 0x10, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x3e, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x24, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, + 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x98, + 0x01, 0x0a, 0x08, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x12, 0x3d, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, + 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0a, 0x42, 0x61, + 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x46, 0x6e, 0x12, 0x1c, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, + 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, + 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x74, 0x63, + 0x68, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescOnce sync.Once + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData = file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc +) + +func file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP() []byte { + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescOnce.Do(func() { + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData) + }) + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData +} + +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes = []interface{}{ + (*BatchMapRequest)(nil), // 0: batchmap.v1.BatchMapRequest + (*BatchMapResponse)(nil), // 1: batchmap.v1.BatchMapResponse + (*ReadyResponse)(nil), // 2: batchmap.v1.ReadyResponse + nil, // 3: batchmap.v1.BatchMapRequest.HeadersEntry + (*BatchMapResponse_Result)(nil), // 4: batchmap.v1.BatchMapResponse.Result + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty +} +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs = []int32{ + 5, // 0: batchmap.v1.BatchMapRequest.event_time:type_name -> google.protobuf.Timestamp + 5, // 1: batchmap.v1.BatchMapRequest.watermark:type_name -> google.protobuf.Timestamp + 3, // 2: batchmap.v1.BatchMapRequest.headers:type_name -> batchmap.v1.BatchMapRequest.HeadersEntry + 4, // 3: batchmap.v1.BatchMapResponse.results:type_name -> batchmap.v1.BatchMapResponse.Result + 6, // 4: batchmap.v1.BatchMap.IsReady:input_type -> google.protobuf.Empty + 0, // 5: batchmap.v1.BatchMap.BatchMapFn:input_type -> batchmap.v1.BatchMapRequest + 2, // 6: batchmap.v1.BatchMap.IsReady:output_type -> batchmap.v1.ReadyResponse + 1, // 7: batchmap.v1.BatchMap.BatchMapFn:output_type -> batchmap.v1.BatchMapResponse + 6, // [6:8] is the sub-list for method output_type + 4, // [4:6] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_pkg_apis_proto_batchmap_v1_batchmap_proto_init() } +func file_pkg_apis_proto_batchmap_v1_batchmap_proto_init() { + if File_pkg_apis_proto_batchmap_v1_batchmap_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchMapRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchMapResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchMapResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes, + DependencyIndexes: file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs, + MessageInfos: file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes, + }.Build() + File_pkg_apis_proto_batchmap_v1_batchmap_proto = out.File + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc = nil + file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes = nil + file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs = nil +} diff --git a/pkg/apis/proto/batchmap/v1/batchmap.proto b/pkg/apis/proto/batchmap/v1/batchmap.proto new file mode 100644 index 00000000..f7f4d10e --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmap.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +package batchmap.v1; + +service BatchMap { + // IsReady is the heartbeat endpoint for gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); + + // BatchMapFn is a bi-directional streaming rpc which applies a + // map function on each BatchMapRequest element of the stream and then streams + // back BatchMapResponse elements. + rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse); +} + +/** + * BatchMapRequest represents a request element. + */ +message BatchMapRequest { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + map headers = 5; + // This ID is used uniquely identify a map request + string id = 6; +} + +/** + * BatchMapResponse represents a response element. + */ +message BatchMapResponse { + message Result { + repeated string keys = 1; + bytes value = 2; + repeated string tags = 3; + } + repeated Result results = 1; + // This ID is used to refer the responses to the request it corresponds to. + string id = 2; +} + +/** + * ReadyResponse is the health check result. + */ +message ReadyResponse { + bool ready = 1; +} \ No newline at end of file diff --git a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go new file mode 100644 index 00000000..11abd087 --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go @@ -0,0 +1,183 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: pkg/apis/proto/batchmap/v1/batchmap.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// BatchMapClient is the client API for BatchMap service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type BatchMapClient interface { + // IsReady is the heartbeat endpoint for gRPC. + IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) + // BatchMapFn is a bi-directional streaming rpc which applies a + // map function on each BatchMapRequest element of the stream and then streams + // back BatchMapResponse elements. + BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) +} + +type batchMapClient struct { + cc grpc.ClientConnInterface +} + +func NewBatchMapClient(cc grpc.ClientConnInterface) BatchMapClient { + return &batchMapClient{cc} +} + +func (c *batchMapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { + out := new(ReadyResponse) + err := c.cc.Invoke(ctx, "/batchmap.v1.BatchMap/IsReady", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *batchMapClient) BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) { + stream, err := c.cc.NewStream(ctx, &BatchMap_ServiceDesc.Streams[0], "/batchmap.v1.BatchMap/BatchMapFn", opts...) + if err != nil { + return nil, err + } + x := &batchMapBatchMapFnClient{stream} + return x, nil +} + +type BatchMap_BatchMapFnClient interface { + Send(*BatchMapRequest) error + Recv() (*BatchMapResponse, error) + grpc.ClientStream +} + +type batchMapBatchMapFnClient struct { + grpc.ClientStream +} + +func (x *batchMapBatchMapFnClient) Send(m *BatchMapRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *batchMapBatchMapFnClient) Recv() (*BatchMapResponse, error) { + m := new(BatchMapResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BatchMapServer is the server API for BatchMap service. +// All implementations must embed UnimplementedBatchMapServer +// for forward compatibility +type BatchMapServer interface { + // IsReady is the heartbeat endpoint for gRPC. + IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) + // BatchMapFn is a bi-directional streaming rpc which applies a + // map function on each BatchMapRequest element of the stream and then streams + // back BatchMapResponse elements. + BatchMapFn(BatchMap_BatchMapFnServer) error + mustEmbedUnimplementedBatchMapServer() +} + +// UnimplementedBatchMapServer must be embedded to have forward compatible implementations. +type UnimplementedBatchMapServer struct { +} + +func (UnimplementedBatchMapServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") +} +func (UnimplementedBatchMapServer) BatchMapFn(BatchMap_BatchMapFnServer) error { + return status.Errorf(codes.Unimplemented, "method BatchMapFn not implemented") +} +func (UnimplementedBatchMapServer) mustEmbedUnimplementedBatchMapServer() {} + +// UnsafeBatchMapServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to BatchMapServer will +// result in compilation errors. +type UnsafeBatchMapServer interface { + mustEmbedUnimplementedBatchMapServer() +} + +func RegisterBatchMapServer(s grpc.ServiceRegistrar, srv BatchMapServer) { + s.RegisterService(&BatchMap_ServiceDesc, srv) +} + +func _BatchMap_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BatchMapServer).IsReady(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/batchmap.v1.BatchMap/IsReady", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BatchMapServer).IsReady(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _BatchMap_BatchMapFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(BatchMapServer).BatchMapFn(&batchMapBatchMapFnServer{stream}) +} + +type BatchMap_BatchMapFnServer interface { + Send(*BatchMapResponse) error + Recv() (*BatchMapRequest, error) + grpc.ServerStream +} + +type batchMapBatchMapFnServer struct { + grpc.ServerStream +} + +func (x *batchMapBatchMapFnServer) Send(m *BatchMapResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *batchMapBatchMapFnServer) Recv() (*BatchMapRequest, error) { + m := new(BatchMapRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BatchMap_ServiceDesc is the grpc.ServiceDesc for BatchMap service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var BatchMap_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "batchmap.v1.BatchMap", + HandlerType: (*BatchMapServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "IsReady", + Handler: _BatchMap_IsReady_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "BatchMapFn", + Handler: _BatchMap_BatchMapFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "pkg/apis/proto/batchmap/v1/batchmap.proto", +} diff --git a/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go b/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go new file mode 100644 index 00000000..43f0315b --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go @@ -0,0 +1,216 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1 (interfaces: BatchMapClient,BatchMap_BatchMapFnClient) + +// Package batchmapmock is a generated GoMock package. +package batchmapmock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// MockBatchMapClient is a mock of BatchMapClient interface. +type MockBatchMapClient struct { + ctrl *gomock.Controller + recorder *MockBatchMapClientMockRecorder +} + +// MockBatchMapClientMockRecorder is the mock recorder for MockBatchMapClient. +type MockBatchMapClientMockRecorder struct { + mock *MockBatchMapClient +} + +// NewMockBatchMapClient creates a new mock instance. +func NewMockBatchMapClient(ctrl *gomock.Controller) *MockBatchMapClient { + mock := &MockBatchMapClient{ctrl: ctrl} + mock.recorder = &MockBatchMapClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchMapClient) EXPECT() *MockBatchMapClientMockRecorder { + return m.recorder +} + +// BatchMapFn mocks base method. +func (m *MockBatchMapClient) BatchMapFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.BatchMap_BatchMapFnClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "BatchMapFn", varargs...) + ret0, _ := ret[0].(v1.BatchMap_BatchMapFnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BatchMapFn indicates an expected call of BatchMapFn. +func (mr *MockBatchMapClientMockRecorder) BatchMapFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchMapFn", reflect.TypeOf((*MockBatchMapClient)(nil).BatchMapFn), varargs...) +} + +// IsReady mocks base method. +func (m *MockBatchMapClient) IsReady(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.ReadyResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "IsReady", varargs...) + ret0, _ := ret[0].(*v1.ReadyResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsReady indicates an expected call of IsReady. +func (mr *MockBatchMapClientMockRecorder) IsReady(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockBatchMapClient)(nil).IsReady), varargs...) +} + +// MockBatchMap_BatchMapFnClient is a mock of BatchMap_BatchMapFnClient interface. +type MockBatchMap_BatchMapFnClient struct { + ctrl *gomock.Controller + recorder *MockBatchMap_BatchMapFnClientMockRecorder +} + +// MockBatchMap_BatchMapFnClientMockRecorder is the mock recorder for MockBatchMap_BatchMapFnClient. +type MockBatchMap_BatchMapFnClientMockRecorder struct { + mock *MockBatchMap_BatchMapFnClient +} + +// NewMockBatchMap_BatchMapFnClient creates a new mock instance. +func NewMockBatchMap_BatchMapFnClient(ctrl *gomock.Controller) *MockBatchMap_BatchMapFnClient { + mock := &MockBatchMap_BatchMapFnClient{ctrl: ctrl} + mock.recorder = &MockBatchMap_BatchMapFnClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchMap_BatchMapFnClient) EXPECT() *MockBatchMap_BatchMapFnClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockBatchMap_BatchMapFnClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Recv() (*v1.BatchMapResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*v1.BatchMapResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockBatchMap_BatchMapFnClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Send(arg0 *v1.BatchMapRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockBatchMap_BatchMapFnClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Trailer)) +} diff --git a/pkg/apis/proto/batchmap/v1/mockgen.go b/pkg/apis/proto/batchmap/v1/mockgen.go new file mode 100644 index 00000000..04982aeb --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/mockgen.go @@ -0,0 +1,3 @@ +package v1 + +//go:generate mockgen -destination batchmapmock/batchmapmock.go -package batchmapmock github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1 BatchMapClient,BatchMap_BatchMapFnClient diff --git a/pkg/batchmapper/doc.go b/pkg/batchmapper/doc.go new file mode 100644 index 00000000..9b0138b1 --- /dev/null +++ b/pkg/batchmapper/doc.go @@ -0,0 +1,5 @@ +// Package batchmapper implements the server code for BatchMap Operation. + +// Examples: https://github.com/numaproj/numaflow-go/tree/main/pkg/batchmapper/examples/ + +package batchmapper diff --git a/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile b/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile new file mode 100644 index 00000000..855508a0 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile @@ -0,0 +1,20 @@ +#################################################################################################### +# base +#################################################################################################### +FROM alpine:3.12.3 as base +RUN apk update && apk upgrade && \ + apk add ca-certificates && \ + apk --no-cache add tzdata + +COPY dist/batchmap-flatmap /bin/batchmap-flatmap +RUN chmod +x /bin/batchmap-flatmap + +#################################################################################################### +# batchmap-flatmap +#################################################################################################### +FROM scratch as batch-flatmap +ARG ARCH +COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo +COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=base /bin/batchmap-flatmap /bin/batchmap-flatmap +ENTRYPOINT [ "/bin/batchmap-flatmap" ] diff --git a/pkg/batchmapper/examples/batchmap-flatmap/Makefile b/pkg/batchmapper/examples/batchmap-flatmap/Makefile new file mode 100644 index 00000000..ea6b9992 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/Makefile @@ -0,0 +1,19 @@ +TAG ?= stable +PUSH ?= false +IMAGE_REGISTRY = quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG} + +.PHONY: build +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/batchmap-flatmap main.go + +.PHONY: image-push +image-push: build + docker buildx build -t ${IMAGE_REGISTRY} --platform linux/amd64,linux/arm64 --target batch-flatmap . --push + +.PHONY: image +image: build + docker build -t ${IMAGE_REGISTRY} --target batch-flatmap . + @if [ "$(PUSH)" = "true" ]; then docker push ${IMAGE_REGISTRY}; fi + +clean: + -rm -rf ./dist diff --git a/pkg/batchmapper/examples/batchmap-flatmap/README.md b/pkg/batchmapper/examples/batchmap-flatmap/README.md new file mode 100644 index 00000000..135b5ecb --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/README.md @@ -0,0 +1,27 @@ +# Batch Map Flatmap + +An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function. + + +To start a vertex in batch map mode we need to add the annotations as following +```yaml + - name: go-split + metadata: + annotations: + numaflow.numaproj.io/batch-map: "true" + scale: + min: 1 + udf: + container: + # Split input message into an array with comma, see https://github.com/numaproj/numaflow-go/tree/main/pkg/batchmapper/examples/batchmap-flatmap + image: quay.io/numaio/numaflow-go/batch-map-flatmap:stable + imagePullPolicy: Always +``` + + +Some considerations for batch map are as follows + +- The user will have to ensure that the BatchResponse is tagged with the correct request ID as this will be used by Numaflow for populating information required for system correctness like MessageID for the ISB deduplication. + + +- The user will have to ensure that all the length of the BatchResponses is equal to the number of requests received. This means that for **each request** there is a BatchResponse. \ No newline at end of file diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.mod b/pkg/batchmapper/examples/batchmap-flatmap/go.mod new file mode 100644 index 00000000..cc6b876d --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.mod @@ -0,0 +1,19 @@ +module batchmap-flatmap + +go 1.22 + +replace github.com/numaproj/numaflow-go => ../../../.. + +require github.com/numaproj/numaflow-go v0.8.0 + +require ( + github.com/golang/protobuf v1.5.3 // indirect + go.uber.org/atomic v1.11.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + google.golang.org/grpc v1.57.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.sum b/pkg/batchmapper/examples/batchmap-flatmap/go.sum new file mode 100644 index 00000000..e884f1a5 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.sum @@ -0,0 +1,33 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/batchmapper/examples/batchmap-flatmap/main.go b/pkg/batchmapper/examples/batchmap-flatmap/main.go new file mode 100644 index 00000000..9d232055 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "context" + "log" + "strings" + + "github.com/numaproj/numaflow-go/pkg/batchmapper" +) + +func batchMapFn(_ context.Context, datums <-chan batchmapper.Datum) batchmapper.BatchResponses { + batchResponses := batchmapper.BatchResponsesBuilder() + for d := range datums { + msg := d.Value() + _ = d.EventTime() // Event time is available + _ = d.Watermark() // Watermark is available + batchResponse := batchmapper.NewBatchResponse(d.Id()) + strs := strings.Split(string(msg), ",") + for _, s := range strs { + batchResponse = batchResponse.Append(batchmapper.NewMessage([]byte(s))) + } + + batchResponses = batchResponses.Append(batchResponse) + } + return batchResponses +} + +func main() { + err := batchmapper.NewServer(batchmapper.BatchMapperFunc(batchMapFn)).Start(context.Background()) + if err != nil { + log.Panic("Failed to start batch map function server: ", err) + } +} diff --git a/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml new file mode 100644 index 00000000..7bac7625 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/pipeline.yaml @@ -0,0 +1,34 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: flatmap +spec: + vertices: + - name: in + source: + http: {} + - name: go-split + metadata: + annotations: + numaflow.numaproj.io/batch-map: "true" + scale: + min: 1 + udf: + container: + # Split input message into an array with comma, see https://github.com/numaproj/numaflow-go/tree/main/pkg/batchmapper/examples/batchmap-flatmap + image: quay.io/numaio/numaflow-go/batch-map-flatmap:stable + imagePullPolicy: Always + - name: go-udsink + scale: + min: 1 + sink: + udsink: + container: + # https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/log + image: quay.io/numaio/numaflow-go/sink-log:stable + imagePullPolicy: Always + edges: + - from: in + to: go-split + - from: go-split + to: go-udsink diff --git a/pkg/batchmapper/interface.go b/pkg/batchmapper/interface.go new file mode 100644 index 00000000..a667f232 --- /dev/null +++ b/pkg/batchmapper/interface.go @@ -0,0 +1,37 @@ +package batchmapper + +import ( + "context" + "time" +) + +// Datum contains methods to get the payload information. +type Datum interface { + // Value returns the payload of the message. + Value() []byte + // EventTime returns the event time of the message. + EventTime() time.Time + // Watermark returns the watermark of the message. + Watermark() time.Time + // Headers returns the headers of the message. + Headers() map[string]string + // Id returns the unique ID set for the given message + Id() string + // Keys returns the keys associated with a given datum + Keys() []string +} + +// BatchMapper is the interface for a Batch Map mode where the user is given a list +// of messages, and they return the consolidated response for all of them together. +type BatchMapper interface { + // BatchMap is the function which processes a list of input messages + BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses +} + +// BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper. +type BatchMapperFunc func(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses + +// BatchMap implements the functionality of BatchMap function. +func (mf BatchMapperFunc) BatchMap(ctx context.Context, datumStreamCh <-chan Datum) BatchResponses { + return mf(ctx, datumStreamCh) +} diff --git a/pkg/batchmapper/message.go b/pkg/batchmapper/message.go new file mode 100644 index 00000000..8d7b74ee --- /dev/null +++ b/pkg/batchmapper/message.go @@ -0,0 +1,106 @@ +package batchmapper + +import "fmt" + +var ( + DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ +) + +// Message is used to wrap the data return by Batch Map functions +type Message struct { + value []byte + keys []string + tags []string +} + +// NewMessage creates a Message with value +func NewMessage(value []byte) Message { + return Message{value: value} +} + +// MessageToDrop creates a Message to be dropped +func MessageToDrop() Message { + return Message{value: []byte{}, tags: []string{DROP}} +} + +// WithKeys is used to assign the keys to the message +func (m Message) WithKeys(keys []string) Message { + m.keys = keys + return m +} + +// WithTags is used to assign the tags to the message +// tags will be used for conditional forwarding +func (m Message) WithTags(tags []string) Message { + m.tags = tags + return m +} + +// Keys returns message keys +func (m Message) Keys() []string { + return m.keys +} + +// Value returns message value +func (m Message) Value() []byte { + return m.value +} + +// Tags returns message tags +func (m Message) Tags() []string { + return m.tags +} + +type Messages []Message + +// batchResponse is used to wrap the data return by batch map function along +// with the ID of the corresponding request +type batchResponse struct { + id string + messages []Message +} + +// Id returns request ID for the given list of responses +func (m batchResponse) Id() string { + return m.id +} + +// Append appends a Message to the messages list of a batchResponse +// object and then returns the updated object. +func (m batchResponse) Append(msg Message) batchResponse { + m.messages = append(m.messages, msg) + return m +} + +// Items returns the message list for a batchResponse +func (m batchResponse) Items() []Message { + return m.messages +} + +// BatchResponses is a list of batchResponse which signify the consolidated +// results for a batch of input messages. +type BatchResponses []batchResponse + +// NewBatchResponse is a utility function used to create a new batchResponse object +// Specifying an id is a mandatory requirement, as it is required to reference the +// responses back to a request. +func NewBatchResponse(id string) batchResponse { + return batchResponse{ + id: id, + messages: Messages{}, + } +} +func BatchResponsesBuilder() BatchResponses { + return BatchResponses{} +} + +// Append appends a batchResponse +func (m BatchResponses) Append(msg batchResponse) BatchResponses { + m = append(m, msg) + return m +} + +// Items returns the batchResponse list +func (m BatchResponses) Items() []batchResponse { + return m +} diff --git a/pkg/batchmapper/options.go b/pkg/batchmapper/options.go new file mode 100644 index 00000000..2a24d17c --- /dev/null +++ b/pkg/batchmapper/options.go @@ -0,0 +1,39 @@ +package batchmapper + +type options struct { + sockAddr string + maxMessageSize int + serverInfoFilePath string +} + +// Option is the interface to apply options. +type Option func(*options) + +func defaultOptions() *options { + return &options{ + sockAddr: address, + maxMessageSize: defaultMaxMessageSize, + serverInfoFilePath: serverInfoFilePath, + } +} + +// WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size. +func WithMaxMessageSize(size int) Option { + return func(opts *options) { + opts.maxMessageSize = size + } +} + +// WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes. +func WithSockAddr(addr string) Option { + return func(opts *options) { + opts.sockAddr = addr + } +} + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(opts *options) { + opts.serverInfoFilePath = f + } +} diff --git a/pkg/batchmapper/options_test.go b/pkg/batchmapper/options_test.go new file mode 100644 index 00000000..927cdb78 --- /dev/null +++ b/pkg/batchmapper/options_test.go @@ -0,0 +1,18 @@ +package batchmapper + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWithMaxMessageSize(t *testing.T) { + var ( + size = 1024 * 1024 * 10 + opts = &options{ + maxMessageSize: defaultMaxMessageSize, + } + ) + WithMaxMessageSize(1024 * 1024 * 10)(opts) + assert.Equal(t, size, opts.maxMessageSize) +} diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go new file mode 100644 index 00000000..2d3fb639 --- /dev/null +++ b/pkg/batchmapper/server.go @@ -0,0 +1,58 @@ +package batchmapper + +import ( + "context" + "fmt" + "log" + "os/signal" + "syscall" + + "github.com/numaproj/numaflow-go/pkg" + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + "github.com/numaproj/numaflow-go/pkg/shared" +) + +// server is a map gRPC server. +type server struct { + svc *Service + opts *options +} + +// NewServer creates a new batch map server. +func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server { + opts := defaultOptions() + for _, inputOption := range inputOptions { + inputOption(opts) + } + s := new(server) + s.svc = new(Service) + s.svc.BatchMapper = m + s.opts = opts + return s +} + +// Start starts the batch map server. +func (m *server) Start(ctx context.Context) error { + ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // write server info to the file + // start listening on unix domain socket + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + if err != nil { + return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) + } + // close the listener + defer func() { _ = lis.Close() }() + + // create a grpc server + grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) + defer log.Println("Successfully stopped the gRPC server") + defer grpcServer.GracefulStop() + + // register the batch map service + batchmappb.RegisterBatchMapServer(grpcServer, m.svc) + + // start the grpc server + return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) +} diff --git a/pkg/batchmapper/server_test.go b/pkg/batchmapper/server_test.go new file mode 100644 index 00000000..e51ec42c --- /dev/null +++ b/pkg/batchmapper/server_test.go @@ -0,0 +1,37 @@ +package batchmapper + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBatchMapServer_Start(t *testing.T) { + socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock") + defer func() { + _ = os.RemoveAll(socketFile.Name()) + }() + + serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + + var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for d := range datums { + results := NewBatchResponse(d.Id()) + results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses.Append(results) + } + return batchResponses + }) + // note: using actual uds connection + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + err := NewServer(batchMapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) + assert.NoError(t, err) +} diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go new file mode 100644 index 00000000..a8748b18 --- /dev/null +++ b/pkg/batchmapper/service.go @@ -0,0 +1,126 @@ +package batchmapper + +import ( + "context" + "fmt" + "io" + "log" + + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" + + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" +) + +const ( + uds = "unix" + address = "/var/run/numaflow/batchmap.sock" + defaultMaxMessageSize = 1024 * 1024 * 64 + serverInfoFilePath = "/var/run/numaflow/batchmapper-server-info" +) + +// Service implements the proto gen server interface and contains the map operation +// handler. +type Service struct { + batchmappb.UnimplementedBatchMapServer + BatchMapper BatchMapper +} + +// IsReady returns true to indicate the gRPC connection is ready. +func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*batchmappb.ReadyResponse, error) { + return &batchmappb.ReadyResponse{Ready: true}, nil +} + +// BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them. +func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error { + ctx := stream.Context() + var g errgroup.Group + + // totalRequests is a counter for keeping a track of the number of datum requests + // that were received on the stream. We use an atomic int as this needs to be synchronized + // between the request/response go routines. + totalRequests := atomic.NewInt32(0) + + // datumStreamCh is used to stream messages to the user code interface + // As the BatchMap interface expects a list of request elements + // we read all the requests coming on the stream and keep streaming them to the user code on this channel. + datumStreamCh := make(chan Datum) + + // go routine to invoke the user handler function, and process the responses. + g.Go(func() error { + // Apply the user BatchMap implementation function + responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh) + + // If the number of responses received does not align with the request batch size, + // we will not be able to process the data correctly. + // This should be marked as an error and the container is restarted. + // As this is a user error, we restart the container to mitigate any transient error otherwise, this + // crash should indicate to the user that there is some issue. + if len(responses.Items()) != int(totalRequests.Load()) { + errMsg := fmt.Sprintf("batchMapFn: mismatch between length of batch requests and responses, "+ + "expected:%d, got:%d", int(totalRequests.Load()), len(responses.Items())) + log.Panic(errMsg) + } + + // iterate over the responses received and covert to the required proto format + for _, batchResp := range responses.Items() { + var elements []*batchmappb.BatchMapResponse_Result + for _, resp := range batchResp.Items() { + elements = append(elements, &batchmappb.BatchMapResponse_Result{ + Keys: resp.Keys(), + Value: resp.Value(), + Tags: resp.Tags(), + }) + } + singleRequestResp := &batchmappb.BatchMapResponse{ + Results: elements, + Id: batchResp.Id(), + } + // We stream back the result for a single request ID + // this would contain all the responses for that request. + err := stream.Send(singleRequestResp) + if err != nil { + log.Println("BatchMapFn: Got an error while Send() on stream", err) + return err + } + } + return nil + }) + + // loop to keep reading messages from the stream and sending it to the datumStreamCh + for { + d, err := stream.Recv() + // if we see EOF on the stream we do not have any more messages coming up + if err == io.EOF { + // close the input data channel to indicate that no more messages expected + close(datumStreamCh) + break + } + if err != nil { + // close the input data channel to indicate that no more messages expected + close(datumStreamCh) + log.Println("BatchMapFn: Got an error while recv() on stream", err) + return err + } + var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys()) + // send the datum to the input channel + datumStreamCh <- hd + // Increase the counter for number of requests received + totalRequests.Inc() + } + + // wait for all the responses to be processed + err := g.Wait() + // if there was any error during processing return the error + if err != nil { + statusErr := status.Errorf(codes.Internal, err.Error()) + return statusErr + } + + // Once all responses are sent we can return, this would indicate the end of the rpc and + // send an EOF to the client on the stream + return nil +} diff --git a/pkg/batchmapper/service_test.go b/pkg/batchmapper/service_test.go new file mode 100644 index 00000000..af387948 --- /dev/null +++ b/pkg/batchmapper/service_test.go @@ -0,0 +1,249 @@ +package batchmapper + +import ( + "context" + "fmt" + "io" + "reflect" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" + + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" +) + +type BatchMapStreamFnServerTest struct { + ctx context.Context + outputCh chan *batchmappb.BatchMapResponse + inputCh chan *batchmappb.BatchMapRequest + grpc.ServerStream +} + +func NewBatchBatchMapStreamFnServerTest( + ctx context.Context, + inputCh chan *batchmappb.BatchMapRequest, + outputCh chan *batchmappb.BatchMapResponse, +) *BatchMapStreamFnServerTest { + return &BatchMapStreamFnServerTest{ + ctx: ctx, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (u *BatchMapStreamFnServerTest) Recv() (*batchmappb.BatchMapRequest, error) { + val, ok := <-u.inputCh + if !ok { + return val, io.EOF + } + return val, nil +} + +func (u *BatchMapStreamFnServerTest) Send(d *batchmappb.BatchMapResponse) error { + u.outputCh <- d + return nil +} + +func (u *BatchMapStreamFnServerTest) Context() context.Context { + return u.ctx +} + +type BatchMapFnServerErrTest struct { + ctx context.Context + inputCh chan *batchmappb.BatchMapRequest + outputCh chan *batchmappb.BatchMapResponse + grpc.ServerStream +} + +func NewBatchMapFnServerErrTest( + ctx context.Context, + inputCh chan *batchmappb.BatchMapRequest, + outputCh chan *batchmappb.BatchMapResponse, + +) *BatchMapFnServerErrTest { + return &BatchMapFnServerErrTest{ + ctx: ctx, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (u *BatchMapFnServerErrTest) Recv() (*batchmappb.BatchMapRequest, error) { + val, ok := <-u.inputCh + if !ok { + return val, io.EOF + } + return val, nil +} + +func (u *BatchMapFnServerErrTest) Send(_ *batchmappb.BatchMapResponse) error { + return fmt.Errorf("send error") +} + +func (u *BatchMapFnServerErrTest) Context() context.Context { + return u.ctx +} + +func TestService_BatchMapFn(t *testing.T) { + tests := []struct { + name string + handler BatchMapper + input []*batchmappb.BatchMapRequest + expected []*batchmappb.BatchMapResponse + expectedErr bool + }{ + { + name: "batch_map_stream_fn_forward_msg", + handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for d := range datums { + results := NewBatchResponse(d.Id()) + results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses = batchResponses.Append(results) + } + return batchResponses + }), + input: []*batchmappb.BatchMapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*batchmappb.BatchMapResponse{ + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: false, + }, + { + name: "batch_map_stream_err", + handler: BatchMapperFunc(func(ctx context.Context, datums <-chan Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for d := range datums { + results := NewBatchResponse(d.Id()) + results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses = batchResponses.Append(results) + } + return batchResponses + }), + input: []*batchmappb.BatchMapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*batchmappb.BatchMapResponse{ + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fs := &Service{ + BatchMapper: tt.handler, + } + // here's a trick for testing: + // because we are not using gRPC, we directly set a new incoming ctx + // instead of the regular outgoing context in the real gRPC connection. + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + inputCh := make(chan *batchmappb.BatchMapRequest) + outputCh := make(chan *batchmappb.BatchMapResponse) + result := make([]*batchmappb.BatchMapResponse, 0) + + var udfBatchMapFnStream batchmappb.BatchMap_BatchMapFnServer + if tt.expectedErr { + udfBatchMapFnStream = NewBatchMapFnServerErrTest(ctx, inputCh, outputCh) + } else { + udfBatchMapFnStream = NewBatchBatchMapStreamFnServerTest(ctx, inputCh, outputCh) + } + + var wg sync.WaitGroup + var err error + + wg.Add(1) + go func() { + defer wg.Done() + err = fs.BatchMapFn(udfBatchMapFnStream) + close(outputCh) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for msg := range outputCh { + result = append(result, msg) + } + }() + + for _, val := range tt.input { + inputCh <- val + } + close(inputCh) + wg.Wait() + + if tt.expectedErr { + // assert err is not nil + assert.NotNil(t, err) + return + } + + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("BatchMapFn() got = %v, want %v", result, tt.expected) + } + + }) + } +} diff --git a/pkg/batchmapper/types.go b/pkg/batchmapper/types.go new file mode 100644 index 00000000..1facd290 --- /dev/null +++ b/pkg/batchmapper/types.go @@ -0,0 +1,48 @@ +package batchmapper + +import "time" + +// handlerDatum implements the Datum interface and is used in the batch map function. +type handlerDatum struct { + value []byte + eventTime time.Time + watermark time.Time + headers map[string]string + id string + keys []string +} + +func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, id string, keys []string) Datum { + return &handlerDatum{ + value: value, + eventTime: eventTime, + watermark: watermark, + headers: headers, + id: id, + keys: keys, + } +} + +func (h *handlerDatum) Value() []byte { + return h.value +} + +func (h *handlerDatum) EventTime() time.Time { + return h.eventTime +} + +func (h *handlerDatum) Watermark() time.Time { + return h.watermark +} + +func (h *handlerDatum) Headers() map[string]string { + return h.headers +} + +func (h *handlerDatum) Id() string { + return h.id +} + +func (h *handlerDatum) Keys() []string { + return h.keys +} diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index f4bf8775..1df4f125 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -49,7 +49,7 @@ func (m *server) Start(ctx context.Context) error { defer log.Println("Successfully stopped the gRPC server") defer grpcServer.GracefulStop() - // register the map service + // register the source transformer service v1.RegisterSourceTransformServer(grpcServer, m.svc) // start the grpc server