From db0172250ad3840e1814707f55813ff236ca4ba2 Mon Sep 17 00:00:00 2001 From: Sidhant Kohli Date: Tue, 2 Jul 2024 11:40:41 -0700 Subject: [PATCH] seperate server/proto Signed-off-by: Sidhant Kohli --- pkg/apis/proto/batchmap/v1/batchmap.pb.go | 456 ++++++++++++++++++ pkg/apis/proto/batchmap/v1/batchmap.proto | 52 ++ .../proto/batchmap/v1/batchmap_grpc.pb.go | 183 +++++++ .../batchmap/v1/batchmapmock/batchmapmock.go | 216 +++++++++ pkg/apis/proto/batchmap/v1/mockgen.go | 3 + pkg/batchmapper/doc.go | 5 + .../examples/batchmap-flatmap/Dockerfile | 20 + .../examples/batchmap-flatmap/Makefile | 18 + .../examples/batchmap-flatmap/README.md | 3 + .../examples/batchmap-flatmap/go.mod | 17 + .../examples/batchmap-flatmap/go.sum | 24 + .../examples/batchmap-flatmap/main.go | 32 ++ .../examples/batchmap-flatmap/pipe.yaml | 34 ++ pkg/batchmapper/interface.go | 37 ++ pkg/batchmapper/message.go | 106 ++++ pkg/batchmapper/options.go | 39 ++ pkg/batchmapper/options_test.go | 18 + pkg/batchmapper/server.go | 60 +++ pkg/batchmapper/server_test.go | 37 ++ pkg/batchmapper/service.go | 93 ++++ pkg/batchmapper/service_test.go | 290 +++++++++++ pkg/batchmapper/types.go | 48 ++ 22 files changed, 1791 insertions(+) create mode 100644 pkg/apis/proto/batchmap/v1/batchmap.pb.go create mode 100644 pkg/apis/proto/batchmap/v1/batchmap.proto create mode 100644 pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go create mode 100644 pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go create mode 100644 pkg/apis/proto/batchmap/v1/mockgen.go create mode 100644 pkg/batchmapper/doc.go create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/Dockerfile create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/Makefile create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/README.md create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/go.mod create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/go.sum create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/main.go create mode 100644 pkg/batchmapper/examples/batchmap-flatmap/pipe.yaml create mode 100644 pkg/batchmapper/interface.go create mode 100644 pkg/batchmapper/message.go create mode 100644 pkg/batchmapper/options.go create mode 100644 pkg/batchmapper/options_test.go create mode 100644 pkg/batchmapper/server.go create mode 100644 pkg/batchmapper/server_test.go create mode 100644 pkg/batchmapper/service.go create mode 100644 pkg/batchmapper/service_test.go create mode 100644 pkg/batchmapper/types.go diff --git a/pkg/apis/proto/batchmap/v1/batchmap.pb.go b/pkg/apis/proto/batchmap/v1/batchmap.pb.go new file mode 100644 index 00000000..39f4c430 --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmap.pb.go @@ -0,0 +1,456 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: pkg/apis/proto/batchmap/v1/batchmap.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// * +// MapRequest represents a request element. +type BatchMapRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // This ID is used uniquely identify a map request + Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *BatchMapRequest) Reset() { + *x = BatchMapRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMapRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMapRequest) ProtoMessage() {} + +func (x *BatchMapRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMapRequest.ProtoReflect.Descriptor instead. +func (*BatchMapRequest) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{0} +} + +func (x *BatchMapRequest) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *BatchMapRequest) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *BatchMapRequest) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *BatchMapRequest) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +func (x *BatchMapRequest) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *BatchMapRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +// * +// MapResponse represents a response element. +type BatchMapResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Results []*BatchMapResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + // This ID is used to refer the responses to the request it corresponds to. + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *BatchMapResponse) Reset() { + *x = BatchMapResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMapResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMapResponse) ProtoMessage() {} + +func (x *BatchMapResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMapResponse.ProtoReflect.Descriptor instead. +func (*BatchMapResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchMapResponse) GetResults() []*BatchMapResponse_Result { + if x != nil { + return x.Results + } + return nil +} + +func (x *BatchMapResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +// * +// ReadyResponse is the health check result. +type ReadyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ready bool `protobuf:"varint,1,opt,name=ready,proto3" json:"ready,omitempty"` +} + +func (x *ReadyResponse) Reset() { + *x = ReadyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadyResponse) ProtoMessage() {} + +func (x *ReadyResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. +func (*ReadyResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{2} +} + +func (x *ReadyResponse) GetReady() bool { + if x != nil { + return x.Ready + } + return false +} + +type BatchMapResponse_Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` +} + +func (x *BatchMapResponse_Result) Reset() { + *x = BatchMapResponse_Result{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchMapResponse_Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchMapResponse_Result) ProtoMessage() {} + +func (x *BatchMapResponse_Result) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchMapResponse_Result.ProtoReflect.Descriptor instead. +func (*BatchMapResponse_Result) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *BatchMapResponse_Result) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *BatchMapResponse_Result) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *BatchMapResponse_Result) GetTags() []string { + if x != nil { + return x.Tags + } + return nil +} + +var File_pkg_apis_proto_batchmap_v1_batchmap_proto protoreflect.FileDescriptor + +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc = []byte{ + 0x0a, 0x29, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x2f, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x2f, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0b, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc1, 0x02, 0x0a, 0x0f, 0x42, 0x61, 0x74, 0x63, 0x68, + 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, + 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, + 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, + 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x43, 0x0a, 0x07, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, + 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, + 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0xaa, 0x01, 0x0a, 0x10, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x3e, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x24, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, + 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, + 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, + 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, + 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, + 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x98, + 0x01, 0x0a, 0x08, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x12, 0x3d, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, + 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4d, 0x0a, 0x0a, 0x42, 0x61, + 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x46, 0x6e, 0x12, 0x1c, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, + 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6d, 0x61, + 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, + 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x62, 0x61, 0x74, 0x63, + 0x68, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescOnce sync.Once + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData = file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc +) + +func file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescGZIP() []byte { + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescOnce.Do(func() { + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData) + }) + return file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDescData +} + +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes = []interface{}{ + (*BatchMapRequest)(nil), // 0: batchmap.v1.BatchMapRequest + (*BatchMapResponse)(nil), // 1: batchmap.v1.BatchMapResponse + (*ReadyResponse)(nil), // 2: batchmap.v1.ReadyResponse + nil, // 3: batchmap.v1.BatchMapRequest.HeadersEntry + (*BatchMapResponse_Result)(nil), // 4: batchmap.v1.BatchMapResponse.Result + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 6: google.protobuf.Empty +} +var file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs = []int32{ + 5, // 0: batchmap.v1.BatchMapRequest.event_time:type_name -> google.protobuf.Timestamp + 5, // 1: batchmap.v1.BatchMapRequest.watermark:type_name -> google.protobuf.Timestamp + 3, // 2: batchmap.v1.BatchMapRequest.headers:type_name -> batchmap.v1.BatchMapRequest.HeadersEntry + 4, // 3: batchmap.v1.BatchMapResponse.results:type_name -> batchmap.v1.BatchMapResponse.Result + 6, // 4: batchmap.v1.BatchMap.IsReady:input_type -> google.protobuf.Empty + 0, // 5: batchmap.v1.BatchMap.BatchMapFn:input_type -> batchmap.v1.BatchMapRequest + 2, // 6: batchmap.v1.BatchMap.IsReady:output_type -> batchmap.v1.ReadyResponse + 1, // 7: batchmap.v1.BatchMap.BatchMapFn:output_type -> batchmap.v1.BatchMapResponse + 6, // [6:8] is the sub-list for method output_type + 4, // [4:6] is the sub-list for method input_type + 4, // [4:4] is the sub-list for extension type_name + 4, // [4:4] is the sub-list for extension extendee + 0, // [0:4] is the sub-list for field type_name +} + +func init() { file_pkg_apis_proto_batchmap_v1_batchmap_proto_init() } +func file_pkg_apis_proto_batchmap_v1_batchmap_proto_init() { + if File_pkg_apis_proto_batchmap_v1_batchmap_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchMapRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchMapResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchMapResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes, + DependencyIndexes: file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs, + MessageInfos: file_pkg_apis_proto_batchmap_v1_batchmap_proto_msgTypes, + }.Build() + File_pkg_apis_proto_batchmap_v1_batchmap_proto = out.File + file_pkg_apis_proto_batchmap_v1_batchmap_proto_rawDesc = nil + file_pkg_apis_proto_batchmap_v1_batchmap_proto_goTypes = nil + file_pkg_apis_proto_batchmap_v1_batchmap_proto_depIdxs = nil +} diff --git a/pkg/apis/proto/batchmap/v1/batchmap.proto b/pkg/apis/proto/batchmap/v1/batchmap.proto new file mode 100644 index 00000000..94a0a704 --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmap.proto @@ -0,0 +1,52 @@ +syntax = "proto3"; + +option go_package = "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"; + +import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + +package batchmap.v1; + +service BatchMap { + // IsReady is the heartbeat endpoint for gRPC. + rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); + + // BatchMapFn is a bi-directional streaming rpc which applies a + // Map function on each BatchMapRequest element of the stream and then returns streams + // back MapResponse elements. + rpc BatchMapFn(stream BatchMapRequest) returns (stream BatchMapResponse); +} + +/** + * MapRequest represents a request element. + */ +message BatchMapRequest { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + map headers = 5; + // This ID is used uniquely identify a map request + string id = 6; +} + +/** + * MapResponse represents a response element. + */ +message BatchMapResponse { + message Result { + repeated string keys = 1; + bytes value = 2; + repeated string tags = 3; + } + repeated Result results = 1; + // This ID is used to refer the responses to the request it corresponds to. + string id = 2; +} + +/** + * ReadyResponse is the health check result. + */ +message ReadyResponse { + bool ready = 1; +} \ No newline at end of file diff --git a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go new file mode 100644 index 00000000..c66551f8 --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go @@ -0,0 +1,183 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: pkg/apis/proto/batchmap/v1/batchmap.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// BatchMapClient is the client API for BatchMap service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type BatchMapClient interface { + // IsReady is the heartbeat endpoint for gRPC. + IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) + // BatchMapFn is a bi-directional streaming rpc which applies a + // Map function on each BatchMapRequest element of the stream and then returns streams + // back MapResponse elements. + BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) +} + +type batchMapClient struct { + cc grpc.ClientConnInterface +} + +func NewBatchMapClient(cc grpc.ClientConnInterface) BatchMapClient { + return &batchMapClient{cc} +} + +func (c *batchMapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { + out := new(ReadyResponse) + err := c.cc.Invoke(ctx, "/batchmap.v1.BatchMap/IsReady", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *batchMapClient) BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) { + stream, err := c.cc.NewStream(ctx, &BatchMap_ServiceDesc.Streams[0], "/batchmap.v1.BatchMap/BatchMapFn", opts...) + if err != nil { + return nil, err + } + x := &batchMapBatchMapFnClient{stream} + return x, nil +} + +type BatchMap_BatchMapFnClient interface { + Send(*BatchMapRequest) error + Recv() (*BatchMapResponse, error) + grpc.ClientStream +} + +type batchMapBatchMapFnClient struct { + grpc.ClientStream +} + +func (x *batchMapBatchMapFnClient) Send(m *BatchMapRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *batchMapBatchMapFnClient) Recv() (*BatchMapResponse, error) { + m := new(BatchMapResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BatchMapServer is the server API for BatchMap service. +// All implementations must embed UnimplementedBatchMapServer +// for forward compatibility +type BatchMapServer interface { + // IsReady is the heartbeat endpoint for gRPC. + IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) + // BatchMapFn is a bi-directional streaming rpc which applies a + // Map function on each BatchMapRequest element of the stream and then returns streams + // back MapResponse elements. + BatchMapFn(BatchMap_BatchMapFnServer) error + mustEmbedUnimplementedBatchMapServer() +} + +// UnimplementedBatchMapServer must be embedded to have forward compatible implementations. +type UnimplementedBatchMapServer struct { +} + +func (UnimplementedBatchMapServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") +} +func (UnimplementedBatchMapServer) BatchMapFn(BatchMap_BatchMapFnServer) error { + return status.Errorf(codes.Unimplemented, "method BatchMapFn not implemented") +} +func (UnimplementedBatchMapServer) mustEmbedUnimplementedBatchMapServer() {} + +// UnsafeBatchMapServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to BatchMapServer will +// result in compilation errors. +type UnsafeBatchMapServer interface { + mustEmbedUnimplementedBatchMapServer() +} + +func RegisterBatchMapServer(s grpc.ServiceRegistrar, srv BatchMapServer) { + s.RegisterService(&BatchMap_ServiceDesc, srv) +} + +func _BatchMap_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BatchMapServer).IsReady(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/batchmap.v1.BatchMap/IsReady", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BatchMapServer).IsReady(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +func _BatchMap_BatchMapFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(BatchMapServer).BatchMapFn(&batchMapBatchMapFnServer{stream}) +} + +type BatchMap_BatchMapFnServer interface { + Send(*BatchMapResponse) error + Recv() (*BatchMapRequest, error) + grpc.ServerStream +} + +type batchMapBatchMapFnServer struct { + grpc.ServerStream +} + +func (x *batchMapBatchMapFnServer) Send(m *BatchMapResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *batchMapBatchMapFnServer) Recv() (*BatchMapRequest, error) { + m := new(BatchMapRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// BatchMap_ServiceDesc is the grpc.ServiceDesc for BatchMap service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var BatchMap_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "batchmap.v1.BatchMap", + HandlerType: (*BatchMapServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "IsReady", + Handler: _BatchMap_IsReady_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "BatchMapFn", + Handler: _BatchMap_BatchMapFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "pkg/apis/proto/batchmap/v1/batchmap.proto", +} diff --git a/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go b/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go new file mode 100644 index 00000000..43f0315b --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/batchmapmock/batchmapmock.go @@ -0,0 +1,216 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1 (interfaces: BatchMapClient,BatchMap_BatchMapFnClient) + +// Package batchmapmock is a generated GoMock package. +package batchmapmock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// MockBatchMapClient is a mock of BatchMapClient interface. +type MockBatchMapClient struct { + ctrl *gomock.Controller + recorder *MockBatchMapClientMockRecorder +} + +// MockBatchMapClientMockRecorder is the mock recorder for MockBatchMapClient. +type MockBatchMapClientMockRecorder struct { + mock *MockBatchMapClient +} + +// NewMockBatchMapClient creates a new mock instance. +func NewMockBatchMapClient(ctrl *gomock.Controller) *MockBatchMapClient { + mock := &MockBatchMapClient{ctrl: ctrl} + mock.recorder = &MockBatchMapClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchMapClient) EXPECT() *MockBatchMapClientMockRecorder { + return m.recorder +} + +// BatchMapFn mocks base method. +func (m *MockBatchMapClient) BatchMapFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.BatchMap_BatchMapFnClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0} + for _, a := range arg1 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "BatchMapFn", varargs...) + ret0, _ := ret[0].(v1.BatchMap_BatchMapFnClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BatchMapFn indicates an expected call of BatchMapFn. +func (mr *MockBatchMapClientMockRecorder) BatchMapFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0}, arg1...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchMapFn", reflect.TypeOf((*MockBatchMapClient)(nil).BatchMapFn), varargs...) +} + +// IsReady mocks base method. +func (m *MockBatchMapClient) IsReady(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.ReadyResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "IsReady", varargs...) + ret0, _ := ret[0].(*v1.ReadyResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsReady indicates an expected call of IsReady. +func (mr *MockBatchMapClientMockRecorder) IsReady(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockBatchMapClient)(nil).IsReady), varargs...) +} + +// MockBatchMap_BatchMapFnClient is a mock of BatchMap_BatchMapFnClient interface. +type MockBatchMap_BatchMapFnClient struct { + ctrl *gomock.Controller + recorder *MockBatchMap_BatchMapFnClientMockRecorder +} + +// MockBatchMap_BatchMapFnClientMockRecorder is the mock recorder for MockBatchMap_BatchMapFnClient. +type MockBatchMap_BatchMapFnClientMockRecorder struct { + mock *MockBatchMap_BatchMapFnClient +} + +// NewMockBatchMap_BatchMapFnClient creates a new mock instance. +func NewMockBatchMap_BatchMapFnClient(ctrl *gomock.Controller) *MockBatchMap_BatchMapFnClient { + mock := &MockBatchMap_BatchMapFnClient{ctrl: ctrl} + mock.recorder = &MockBatchMap_BatchMapFnClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchMap_BatchMapFnClient) EXPECT() *MockBatchMap_BatchMapFnClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockBatchMap_BatchMapFnClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Recv() (*v1.BatchMapResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*v1.BatchMapResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockBatchMap_BatchMapFnClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).RecvMsg), arg0) +} + +// Send mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Send(arg0 *v1.BatchMapRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Send", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Send indicates an expected call of Send. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Send(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Send", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Send), arg0) +} + +// SendMsg mocks base method. +func (m *MockBatchMap_BatchMapFnClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockBatchMap_BatchMapFnClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockBatchMap_BatchMapFnClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockBatchMap_BatchMapFnClient)(nil).Trailer)) +} diff --git a/pkg/apis/proto/batchmap/v1/mockgen.go b/pkg/apis/proto/batchmap/v1/mockgen.go new file mode 100644 index 00000000..04982aeb --- /dev/null +++ b/pkg/apis/proto/batchmap/v1/mockgen.go @@ -0,0 +1,3 @@ +package v1 + +//go:generate mockgen -destination batchmapmock/batchmapmock.go -package batchmapmock github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1 BatchMapClient,BatchMap_BatchMapFnClient diff --git a/pkg/batchmapper/doc.go b/pkg/batchmapper/doc.go new file mode 100644 index 00000000..9b0138b1 --- /dev/null +++ b/pkg/batchmapper/doc.go @@ -0,0 +1,5 @@ +// Package batchmapper implements the server code for BatchMap Operation. + +// Examples: https://github.com/numaproj/numaflow-go/tree/main/pkg/batchmapper/examples/ + +package batchmapper diff --git a/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile b/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile new file mode 100644 index 00000000..c3b6cc45 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/Dockerfile @@ -0,0 +1,20 @@ +#################################################################################################### +# base +#################################################################################################### +FROM alpine:3.12.3 as base +RUN apk update && apk upgrade && \ + apk add ca-certificates && \ + apk --no-cache add tzdata + +COPY dist/flatmap-example /bin/flatmap-example +RUN chmod +x /bin/flatmap-example + +#################################################################################################### +# flatmap +#################################################################################################### +FROM scratch as flatmap +ARG ARCH +COPY --from=base /usr/share/zoneinfo /usr/share/zoneinfo +COPY --from=base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt +COPY --from=base /bin/flatmap-example /bin/flatmap-example +ENTRYPOINT [ "/bin/flatmap-example" ] diff --git a/pkg/batchmapper/examples/batchmap-flatmap/Makefile b/pkg/batchmapper/examples/batchmap-flatmap/Makefile new file mode 100644 index 00000000..81328294 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/Makefile @@ -0,0 +1,18 @@ +TAG ?= stable +PUSH ?= false + +.PHONY: build +build: + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -o ./dist/flatmap-example main.go + +.PHONY: image-push +image-push: build + docker buildx build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --platform linux/amd64,linux/arm64 --target flatmap . --push + +.PHONY: image +image: build + docker build -t "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}" --target flatmap . + @if [ "$(PUSH)" = "true" ]; then docker push "quay.io/numaio/numaflow-go/batch-map-flatmap:${TAG}"; fi + +clean: + -rm -rf ./dist diff --git a/pkg/batchmapper/examples/batchmap-flatmap/README.md b/pkg/batchmapper/examples/batchmap-flatmap/README.md new file mode 100644 index 00000000..0a639de2 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/README.md @@ -0,0 +1,3 @@ +# Map Batch Flatmap + +An example User Defined Function that demonstrates how to write a batch map based `flatmap` User Defined Function. diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.mod b/pkg/batchmapper/examples/batchmap-flatmap/go.mod new file mode 100644 index 00000000..88f3f383 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.mod @@ -0,0 +1,17 @@ +module flatmap + +go 1.20 + +replace github.com/numaproj/numaflow-go => ../../../.. + +require github.com/numaproj/numaflow-go v0.7.0-rc2 + +require ( + github.com/golang/protobuf v1.5.3 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect + google.golang.org/grpc v1.57.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect +) diff --git a/pkg/batchmapper/examples/batchmap-flatmap/go.sum b/pkg/batchmapper/examples/batchmap-flatmap/go.sum new file mode 100644 index 00000000..95c8479a --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/go.sum @@ -0,0 +1,24 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= +google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= +google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/pkg/batchmapper/examples/batchmap-flatmap/main.go b/pkg/batchmapper/examples/batchmap-flatmap/main.go new file mode 100644 index 00000000..5caad62f --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/main.go @@ -0,0 +1,32 @@ +package main + +import ( + "context" + "log" + "strings" + + "github.com/numaproj/numaflow-go/pkg/batchmapper" +) + +func batchMapFn(_ context.Context, datums []batchmapper.Datum) batchmapper.BatchResponses { + batchResponses := batchmapper.BatchResponsesBuilder() + for _, d := range datums { + msg := d.Value() + _ = d.EventTime() // Event time is available + _ = d.Watermark() // Watermark is available + results := batchmapper.NewBatchResponse(d.Id()) + strs := strings.Split(string(msg), ",") + for _, s := range strs { + results = results.Append(batchmapper.NewMessage([]byte(s))) + } + batchResponses = batchResponses.Append(results) + } + return batchResponses +} + +func main() { + err := batchmapper.NewServer(batchmapper.BatchMapperFunc(batchMapFn)).Start(context.Background()) + if err != nil { + log.Panic("Failed to start map function server: ", err) + } +} diff --git a/pkg/batchmapper/examples/batchmap-flatmap/pipe.yaml b/pkg/batchmapper/examples/batchmap-flatmap/pipe.yaml new file mode 100644 index 00000000..7b7cd3b9 --- /dev/null +++ b/pkg/batchmapper/examples/batchmap-flatmap/pipe.yaml @@ -0,0 +1,34 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: flatmap +spec: + vertices: + - name: in + source: + http: {} + - name: go-split + metadata: + annotations: + numaflow.numaproj.io/batch-map: "true" + scale: + min: 1 + udf: + container: + # Split input message into an array with comma, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/flatmap + image: quay.io/numaio/numaflow-go/batch-map-flatmap:stable + imagePullPolicy: Always + - name: go-udsink + scale: + min: 1 + sink: + udsink: + container: + # https://github.com/numaproj/numaflow-go/tree/main/pkg/sinker/examples/log + image: quay.io/numaio/numaflow-go/sink-log:stable + imagePullPolicy: Always + edges: + - from: in + to: go-split + - from: go-split + to: go-udsink diff --git a/pkg/batchmapper/interface.go b/pkg/batchmapper/interface.go new file mode 100644 index 00000000..68af2b7f --- /dev/null +++ b/pkg/batchmapper/interface.go @@ -0,0 +1,37 @@ +package batchmapper + +import ( + "context" + "time" +) + +// Datum contains methods to get the payload information. +type Datum interface { + // Value returns the payload of the message. + Value() []byte + // EventTime returns the event time of the message. + EventTime() time.Time + // Watermark returns the watermark of the message. + Watermark() time.Time + // Headers returns the headers of the message. + Headers() map[string]string + // Id returns the unique ID set for the given message + Id() string + // Keys returns the keys associated with a given datum + Keys() []string +} + +// BatchMapper is the interface for a Batch Map mode where the user is given a list +// of messages, and they return the consolidated response for all of them together. +type BatchMapper interface { + // BatchMap is the function which processes a list of input messages + BatchMap(ctx context.Context, datums []Datum) BatchResponses +} + +// BatchMapperFunc is a utility type used to convert a batch map function to a BatchMapper. +type BatchMapperFunc func(ctx context.Context, datums []Datum) BatchResponses + +// BatchMap implements the functionality of BatchMap function. +func (mf BatchMapperFunc) BatchMap(ctx context.Context, datums []Datum) BatchResponses { + return mf(ctx, datums) +} diff --git a/pkg/batchmapper/message.go b/pkg/batchmapper/message.go new file mode 100644 index 00000000..37e67295 --- /dev/null +++ b/pkg/batchmapper/message.go @@ -0,0 +1,106 @@ +package batchmapper + +import "fmt" + +var ( + DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__ +) + +// Message is used to wrap the data return by Map functions +type Message struct { + value []byte + keys []string + tags []string +} + +// NewMessage creates a Message with value +func NewMessage(value []byte) Message { + return Message{value: value} +} + +// MessageToDrop creates a Message to be dropped +func MessageToDrop() Message { + return Message{value: []byte{}, tags: []string{DROP}} +} + +// WithKeys is used to assign the keys to the message +func (m Message) WithKeys(keys []string) Message { + m.keys = keys + return m +} + +// WithTags is used to assign the tags to the message +// tags will be used for conditional forwarding +func (m Message) WithTags(tags []string) Message { + m.tags = tags + return m +} + +// Keys returns message keys +func (m Message) Keys() []string { + return m.keys +} + +// Value returns message value +func (m Message) Value() []byte { + return m.value +} + +// Tags returns message tags +func (m Message) Tags() []string { + return m.tags +} + +type Messages []Message + +// batchResponse is used to wrap the data return by batch map function along +// with the ID of the corresponding request +type batchResponse struct { + id string + messages []Message +} + +// Id returns request ID for the given list of responses +func (m batchResponse) Id() string { + return m.id +} + +// Append appends a Message to the messages list of a batchResponse +// object and then returns the updated object. +func (m batchResponse) Append(msg Message) batchResponse { + m.messages = append(m.messages, msg) + return m +} + +// Items returns the message list for a batchResponse +func (m batchResponse) Items() []Message { + return m.messages +} + +// BatchResponses is a list of batchResponse which signify the consolidated +// results for a batch of input messages. +type BatchResponses []batchResponse + +// NewBatchResponse is a utility function used to create a new batchResponse object +// Specifying an id is a mandatory requirement, as it is required to reference the +// responses back to a request. +func NewBatchResponse(id string) batchResponse { + return batchResponse{ + id: id, + messages: Messages{}, + } +} +func BatchResponsesBuilder() BatchResponses { + return BatchResponses{} +} + +// Append appends a Message +func (m BatchResponses) Append(msg batchResponse) BatchResponses { + m = append(m, msg) + return m +} + +// Items returns the message list +func (m BatchResponses) Items() []batchResponse { + return m +} diff --git a/pkg/batchmapper/options.go b/pkg/batchmapper/options.go new file mode 100644 index 00000000..2a24d17c --- /dev/null +++ b/pkg/batchmapper/options.go @@ -0,0 +1,39 @@ +package batchmapper + +type options struct { + sockAddr string + maxMessageSize int + serverInfoFilePath string +} + +// Option is the interface to apply options. +type Option func(*options) + +func defaultOptions() *options { + return &options{ + sockAddr: address, + maxMessageSize: defaultMaxMessageSize, + serverInfoFilePath: serverInfoFilePath, + } +} + +// WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size. +func WithMaxMessageSize(size int) Option { + return func(opts *options) { + opts.maxMessageSize = size + } +} + +// WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes. +func WithSockAddr(addr string) Option { + return func(opts *options) { + opts.sockAddr = addr + } +} + +// WithServerInfoFilePath sets the server info file path to the given path. +func WithServerInfoFilePath(f string) Option { + return func(opts *options) { + opts.serverInfoFilePath = f + } +} diff --git a/pkg/batchmapper/options_test.go b/pkg/batchmapper/options_test.go new file mode 100644 index 00000000..927cdb78 --- /dev/null +++ b/pkg/batchmapper/options_test.go @@ -0,0 +1,18 @@ +package batchmapper + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWithMaxMessageSize(t *testing.T) { + var ( + size = 1024 * 1024 * 10 + opts = &options{ + maxMessageSize: defaultMaxMessageSize, + } + ) + WithMaxMessageSize(1024 * 1024 * 10)(opts) + assert.Equal(t, size, opts.maxMessageSize) +} diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go new file mode 100644 index 00000000..78c94764 --- /dev/null +++ b/pkg/batchmapper/server.go @@ -0,0 +1,60 @@ +package batchmapper + +import ( + "context" + "fmt" + "log" + "os/signal" + "syscall" + + "github.com/numaproj/numaflow-go/pkg" + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" + "github.com/numaproj/numaflow-go/pkg/shared" +) + +// server is a map gRPC server. +type server struct { + svc *Service + opts *options +} + +// NewServer creates a new batch map server. +// TODO(map-batch): as this would be a streaming server should we see if there are some options (like maxMessageSize) +// which are different than unary server which are optimal for this use case. +func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server { + opts := defaultOptions() + for _, inputOption := range inputOptions { + inputOption(opts) + } + s := new(server) + s.svc = new(Service) + s.svc.BatchMapper = m + s.opts = opts + return s +} + +// Start starts the batch map server. +func (m *server) Start(ctx context.Context) error { + ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) + defer stop() + + // write server info to the file + // start listening on unix domain socket + lis, err := shared.PrepareServer(m.opts.sockAddr, m.opts.serverInfoFilePath) + if err != nil { + return fmt.Errorf("failed to execute net.Listen(%q, %q): %v", uds, address, err) + } + // close the listener + defer func() { _ = lis.Close() }() + + // create a grpc server + grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) + defer log.Println("Successfully stopped the gRPC server") + defer grpcServer.GracefulStop() + + // register the map service + batchmappb.RegisterBatchMapServer(grpcServer, m.svc) + + // start the grpc server + return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) +} diff --git a/pkg/batchmapper/server_test.go b/pkg/batchmapper/server_test.go new file mode 100644 index 00000000..fb907079 --- /dev/null +++ b/pkg/batchmapper/server_test.go @@ -0,0 +1,37 @@ +package batchmapper + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBatchMapServer_Start(t *testing.T) { + socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock") + defer func() { + _ = os.RemoveAll(socketFile.Name()) + }() + + serverInfoFile, _ := os.CreateTemp("/tmp", "numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + + var batchMapHandler = BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses.Append(results) + } + return batchResponses + }) + // note: using actual uds connection + ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + defer cancel() + err := NewServer(batchMapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) + assert.NoError(t, err) +} diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go new file mode 100644 index 00000000..721b4448 --- /dev/null +++ b/pkg/batchmapper/service.go @@ -0,0 +1,93 @@ +package batchmapper + +import ( + "context" + "fmt" + "io" + "log" + + "google.golang.org/protobuf/types/known/emptypb" + + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" +) + +const ( + uds = "unix" + address = "/var/run/numaflow/batchmap.sock" + defaultMaxMessageSize = 1024 * 1024 * 64 + serverInfoFilePath = "/var/run/numaflow/batchmapper-server-info" +) + +// Service implements the proto gen server interface and contains the map operation +// handler. +type Service struct { + batchmappb.UnimplementedBatchMapServer + BatchMapper BatchMapper +} + +// IsReady returns true to indicate the gRPC connection is ready. +func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*batchmappb.ReadyResponse, error) { + return &batchmappb.ReadyResponse{Ready: true}, nil +} + +// BatchMapFn applies a user defined function to a stream of request element and streams back the responses for them. +func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error { + ctx := stream.Context() + + // As the BatchMap interface expects a list of request elements + // we read all the requests coming on the stream and keep appending them together + // and then finally send the array for processing once all the messages on the stream have been read. + datums := make([]Datum, 0) + for { + d, err := stream.Recv() + // if we see EOF on the stream we do not have any more messages coming up + if err == io.EOF { + break + } + if err != nil { + log.Println("BatchMapFn: Got an error while recv() on stream", err) + return err + } + var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders(), d.GetId(), d.GetKeys()) + datums = append(datums, hd) + } + + // Apply the user BatchMap implementation function + responses := fs.BatchMapper.BatchMap(ctx, datums) + + // If the number of responses received does not align with the request batch size, + // we will not be able to process the data correctly. + // This should be marked as an error and shown to the user. + // TODO(map-batch): We could potentially panic here as well + if len(responses.Items()) != len(datums) { + errMsg := "batchMapFn: mismatch between length of batch requests and responses" + log.Println(errMsg) + return fmt.Errorf(errMsg) + } + + // iterate over the responses received and covert to the required proto format + for _, batchResp := range responses.Items() { + var elements []*batchmappb.BatchMapResponse_Result + for _, resp := range batchResp.Items() { + elements = append(elements, &batchmappb.BatchMapResponse_Result{ + Keys: resp.Keys(), + Value: resp.Value(), + Tags: resp.Tags(), + }) + } + singleRequestResp := &batchmappb.BatchMapResponse{ + Results: elements, + Id: batchResp.Id(), + } + // We stream back the result for a single request ID + // this would contain all the responses for that request. + err := stream.Send(singleRequestResp) + if err != nil { + log.Println("BatchMapFn: Got an error while Send() on stream", err) + return err + } + } + // Once all responses are sent we can return, this would indicate the end of the rpc and + // send an EOF to the client on the stream + return nil +} diff --git a/pkg/batchmapper/service_test.go b/pkg/batchmapper/service_test.go new file mode 100644 index 00000000..f2f4cccd --- /dev/null +++ b/pkg/batchmapper/service_test.go @@ -0,0 +1,290 @@ +package batchmapper + +import ( + "context" + "fmt" + "io" + "reflect" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" + + batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" +) + +type BatchMapStreamFnServerTest struct { + ctx context.Context + outputCh chan *batchmappb.BatchMapResponse + inputCh chan *batchmappb.BatchMapRequest + grpc.ServerStream +} + +func NewBatchBatchMapStreamFnServerTest( + ctx context.Context, + inputCh chan *batchmappb.BatchMapRequest, + outputCh chan *batchmappb.BatchMapResponse, +) *BatchMapStreamFnServerTest { + return &BatchMapStreamFnServerTest{ + ctx: ctx, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (u *BatchMapStreamFnServerTest) Recv() (*batchmappb.BatchMapRequest, error) { + val, ok := <-u.inputCh + if !ok { + return val, io.EOF + } + return val, nil +} + +func (u *BatchMapStreamFnServerTest) Send(d *batchmappb.BatchMapResponse) error { + u.outputCh <- d + return nil +} + +func (u *BatchMapStreamFnServerTest) Context() context.Context { + return u.ctx +} + +type BatchMapFnServerErrTest struct { + ctx context.Context + inputCh chan *batchmappb.BatchMapRequest + outputCh chan *batchmappb.BatchMapResponse + grpc.ServerStream +} + +func NewBatchMapFnServerErrTest( + ctx context.Context, + inputCh chan *batchmappb.BatchMapRequest, + outputCh chan *batchmappb.BatchMapResponse, + +) *BatchMapFnServerErrTest { + return &BatchMapFnServerErrTest{ + ctx: ctx, + inputCh: inputCh, + outputCh: outputCh, + } +} + +func (u *BatchMapFnServerErrTest) Recv() (*batchmappb.BatchMapRequest, error) { + val, ok := <-u.inputCh + if !ok { + return val, io.EOF + } + return val, nil +} + +func (u *BatchMapFnServerErrTest) Send(_ *batchmappb.BatchMapResponse) error { + return fmt.Errorf("send error") +} + +func (u *BatchMapFnServerErrTest) Context() context.Context { + return u.ctx +} + +func TestService_MapFnStream(t *testing.T) { + tests := []struct { + name string + handler BatchMapper + input []*batchmappb.BatchMapRequest + expected []*batchmappb.BatchMapResponse + expectedErr bool + streamErr bool + }{ + { + name: "batch_map_stream_fn_forward_msg", + handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses = batchResponses.Append(results) + } + return batchResponses + }), + input: []*batchmappb.BatchMapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*batchmappb.BatchMapResponse{ + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: false, + }, + { + name: "batch_map_mismatch_output_len", + handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + return batchResponses + }), + input: []*batchmappb.BatchMapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*batchmappb.BatchMapResponse{ + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: true, + }, + { + name: "batch_map_stream_err", + handler: BatchMapperFunc(func(ctx context.Context, datums []Datum) BatchResponses { + batchResponses := BatchResponsesBuilder() + for _, d := range datums { + results := NewBatchResponse(d.Id()) + results = results.Append(NewMessage(d.Value()).WithKeys([]string{d.Keys()[0] + "_test"})) + batchResponses = batchResponses.Append(results) + } + return batchResponses + }), + input: []*batchmappb.BatchMapRequest{{ + Keys: []string{"client"}, + Value: []byte(`test1`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test1", + }, { + Keys: []string{"client"}, + Value: []byte(`test2`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Id: "test2", + }}, + expected: []*batchmappb.BatchMapResponse{ + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test1`), + }, + }, + Id: "test1", + }, + { + Results: []*batchmappb.BatchMapResponse_Result{ + { + Keys: []string{"client_test"}, + Value: []byte(`test2`), + }, + }, + Id: "test2", + }, + }, + expectedErr: true, + streamErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fs := &Service{ + BatchMapper: tt.handler, + } + // here's a trick for testing: + // because we are not using gRPC, we directly set a new incoming ctx + // instead of the regular outgoing context in the real gRPC connection. + ctx := context.Background() + inputCh := make(chan *batchmappb.BatchMapRequest) + outputCh := make(chan *batchmappb.BatchMapResponse) + result := make([]*batchmappb.BatchMapResponse, 0) + + var udfBatchMapFnStream batchmappb.BatchMap_BatchMapFnServer + if tt.streamErr { + udfBatchMapFnStream = NewBatchMapFnServerErrTest(ctx, inputCh, outputCh) + } else { + udfBatchMapFnStream = NewBatchBatchMapStreamFnServerTest(ctx, inputCh, outputCh) + } + + var wg sync.WaitGroup + var err error + + wg.Add(1) + go func() { + defer wg.Done() + err = fs.BatchMapFn(udfBatchMapFnStream) + close(outputCh) + }() + + wg.Add(1) + go func() { + defer wg.Done() + for msg := range outputCh { + result = append(result, msg) + } + }() + + for _, val := range tt.input { + inputCh <- val + } + close(inputCh) + wg.Wait() + + if err != nil { + assert.True(t, tt.expectedErr, "MapStreamFn() error = %v, expectedErr %v", err, tt.expectedErr) + return + } + + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("MapStreamFn() got = %v, want %v", result, tt.expected) + } + + }) + } +} diff --git a/pkg/batchmapper/types.go b/pkg/batchmapper/types.go new file mode 100644 index 00000000..1facd290 --- /dev/null +++ b/pkg/batchmapper/types.go @@ -0,0 +1,48 @@ +package batchmapper + +import "time" + +// handlerDatum implements the Datum interface and is used in the batch map function. +type handlerDatum struct { + value []byte + eventTime time.Time + watermark time.Time + headers map[string]string + id string + keys []string +} + +func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, id string, keys []string) Datum { + return &handlerDatum{ + value: value, + eventTime: eventTime, + watermark: watermark, + headers: headers, + id: id, + keys: keys, + } +} + +func (h *handlerDatum) Value() []byte { + return h.value +} + +func (h *handlerDatum) EventTime() time.Time { + return h.eventTime +} + +func (h *handlerDatum) Watermark() time.Time { + return h.watermark +} + +func (h *handlerDatum) Headers() map[string]string { + return h.headers +} + +func (h *handlerDatum) Id() string { + return h.id +} + +func (h *handlerDatum) Keys() []string { + return h.keys +}