From a15002304b6d55ab3c40f5467fb3f4005d49c8a5 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Sun, 13 Oct 2024 07:49:57 +0530 Subject: [PATCH] send eot from response as well Signed-off-by: Yashash H L --- pkg/apis/proto/map/v1/map.pb.go | 293 +++++++++--------- pkg/apis/proto/map/v1/map.proto | 11 +- .../examples/batchmap_flatmap/go.mod | 1 - .../examples/batchmap_flatmap/go.sum | 2 - pkg/batchmapper/service.go | 24 +- pkg/batchmapper/service_test.go | 18 +- 6 files changed, 200 insertions(+), 149 deletions(-) diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index 137a402c..f638b4e7 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -31,9 +31,9 @@ type MapRequest struct { Request *MapRequest_Request `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` // This ID is used to uniquely identify a map request - Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` - Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` - Status *MapRequest_Status `protobuf:"bytes,4,opt,name=status,proto3,oneof" json:"status,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` + Status *Status `protobuf:"bytes,4,opt,name=status,proto3,oneof" json:"status,omitempty"` } func (x *MapRequest) Reset() { @@ -89,7 +89,7 @@ func (x *MapRequest) GetHandshake() *Handshake { return nil } -func (x *MapRequest) GetStatus() *MapRequest_Status { +func (x *MapRequest) GetStatus() *Status { if x != nil { return x.Status } @@ -145,6 +145,54 @@ func (x *Handshake) GetSot() bool { return false } +// Status message to indicate the status of the message. +type Status struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` +} + +func (x *Status) Reset() { + *x = Status{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Status) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Status) ProtoMessage() {} + +func (x *Status) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Status.ProtoReflect.Descriptor instead. +func (*Status) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2} +} + +func (x *Status) GetEot() bool { + if x != nil { + return x.Eot + } + return false +} + // * // MapResponse represents a response element. type MapResponse struct { @@ -156,12 +204,13 @@ type MapResponse struct { // This ID is used to refer the responses to the request it corresponds to. Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` + Status *Status `protobuf:"bytes,4,opt,name=status,proto3,oneof" json:"status,omitempty"` } func (x *MapResponse) Reset() { *x = MapResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -174,7 +223,7 @@ func (x *MapResponse) String() string { func (*MapResponse) ProtoMessage() {} func (x *MapResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[2] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -187,7 +236,7 @@ func (x *MapResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use MapResponse.ProtoReflect.Descriptor instead. func (*MapResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3} } func (x *MapResponse) GetResults() []*MapResponse_Result { @@ -211,6 +260,13 @@ func (x *MapResponse) GetHandshake() *Handshake { return nil } +func (x *MapResponse) GetStatus() *Status { + if x != nil { + return x.Status + } + return nil +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -224,7 +280,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -237,7 +293,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[3] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -250,7 +306,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{4} } func (x *ReadyResponse) GetReady() bool { @@ -275,7 +331,7 @@ type MapRequest_Request struct { func (x *MapRequest_Request) Reset() { *x = MapRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -288,7 +344,7 @@ func (x *MapRequest_Request) String() string { func (*MapRequest_Request) ProtoMessage() {} func (x *MapRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[4] + mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -339,53 +395,6 @@ func (x *MapRequest_Request) GetHeaders() map[string]string { return nil } -type MapRequest_Status struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` -} - -func (x *MapRequest_Status) Reset() { - *x = MapRequest_Status{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MapRequest_Status) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MapRequest_Status) ProtoMessage() {} - -func (x *MapRequest_Status) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_map_v1_map_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MapRequest_Status.ProtoReflect.Descriptor instead. -func (*MapRequest_Status) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{0, 1} -} - -func (x *MapRequest_Status) GetEot() bool { - if x != nil { - return x.Eot - } - return false -} - type MapResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -425,7 +434,7 @@ func (x *MapResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use MapResponse_Result.ProtoReflect.Descriptor instead. func (*MapResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{2, 0} + return file_pkg_apis_proto_map_v1_map_proto_rawDescGZIP(), []int{3, 0} } func (x *MapResponse_Result) GetKeys() []string { @@ -458,7 +467,7 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x9f, 0x04, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xf8, 0x03, 0x0a, 0x0a, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, @@ -467,62 +476,65 @@ var file_pkg_apis_proto_map_v1_map_proto_rawDesc = []byte{ 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, - 0x01, 0x01, 0x12, 0x36, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x19, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x01, 0x52, - 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, 0x1a, 0xa7, 0x02, 0x0a, 0x07, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, - 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, - 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, - 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x41, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, - 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, - 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, - 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, - 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x1a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, - 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, - 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, - 0x0a, 0x07, 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, - 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x03, 0x73, 0x6f, 0x74, 0x22, 0xdf, 0x01, 0x0a, 0x0b, 0x4d, 0x61, 0x70, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x6d, 0x61, 0x70, 0x2e, - 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, 0x0e, - 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x34, - 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, - 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, - 0x65, 0x88, 0x01, 0x01, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, - 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, - 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, - 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, 0x0a, 0x0a, - 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, - 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, - 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, - 0x79, 0x32, 0x75, 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x34, 0x0a, 0x05, 0x4d, 0x61, 0x70, 0x46, - 0x6e, 0x12, 0x12, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, - 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x38, - 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, - 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, - 0x79, 0x1a, 0x15, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, - 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, - 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, - 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x01, 0x01, 0x12, 0x2b, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x48, 0x01, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, 0x1a, + 0xa7, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, + 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, + 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x41, 0x0a, 0x07, 0x68, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x6d, 0x61, + 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, + 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, + 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x73, 0x6f, + 0x74, 0x22, 0x1a, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, + 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x22, 0x97, 0x02, + 0x0a, 0x0b, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, + 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x34, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, + 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, + 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x12, 0x2b, 0x0a, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x6d, 0x61, 0x70, 0x2e, + 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x48, 0x01, 0x52, 0x06, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, + 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, + 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, + 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, 0x0a, 0x07, + 0x5f, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x75, + 0x0a, 0x03, 0x4d, 0x61, 0x70, 0x12, 0x34, 0x0a, 0x05, 0x4d, 0x61, 0x70, 0x46, 0x6e, 0x12, 0x12, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x4d, 0x61, 0x70, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x38, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x15, + 0x2e, 0x6d, 0x61, 0x70, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x37, 0x5a, 0x35, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, + 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, + 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x6d, 0x61, 0x70, 0x2f, 0x76, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -541,33 +553,34 @@ var file_pkg_apis_proto_map_v1_map_proto_msgTypes = make([]protoimpl.MessageInfo var file_pkg_apis_proto_map_v1_map_proto_goTypes = []any{ (*MapRequest)(nil), // 0: map.v1.MapRequest (*Handshake)(nil), // 1: map.v1.Handshake - (*MapResponse)(nil), // 2: map.v1.MapResponse - (*ReadyResponse)(nil), // 3: map.v1.ReadyResponse - (*MapRequest_Request)(nil), // 4: map.v1.MapRequest.Request - (*MapRequest_Status)(nil), // 5: map.v1.MapRequest.Status + (*Status)(nil), // 2: map.v1.Status + (*MapResponse)(nil), // 3: map.v1.MapResponse + (*ReadyResponse)(nil), // 4: map.v1.ReadyResponse + (*MapRequest_Request)(nil), // 5: map.v1.MapRequest.Request nil, // 6: map.v1.MapRequest.Request.HeadersEntry (*MapResponse_Result)(nil), // 7: map.v1.MapResponse.Result (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp (*emptypb.Empty)(nil), // 9: google.protobuf.Empty } var file_pkg_apis_proto_map_v1_map_proto_depIdxs = []int32{ - 4, // 0: map.v1.MapRequest.request:type_name -> map.v1.MapRequest.Request + 5, // 0: map.v1.MapRequest.request:type_name -> map.v1.MapRequest.Request 1, // 1: map.v1.MapRequest.handshake:type_name -> map.v1.Handshake - 5, // 2: map.v1.MapRequest.status:type_name -> map.v1.MapRequest.Status + 2, // 2: map.v1.MapRequest.status:type_name -> map.v1.Status 7, // 3: map.v1.MapResponse.results:type_name -> map.v1.MapResponse.Result 1, // 4: map.v1.MapResponse.handshake:type_name -> map.v1.Handshake - 8, // 5: map.v1.MapRequest.Request.event_time:type_name -> google.protobuf.Timestamp - 8, // 6: map.v1.MapRequest.Request.watermark:type_name -> google.protobuf.Timestamp - 6, // 7: map.v1.MapRequest.Request.headers:type_name -> map.v1.MapRequest.Request.HeadersEntry - 0, // 8: map.v1.Map.MapFn:input_type -> map.v1.MapRequest - 9, // 9: map.v1.Map.IsReady:input_type -> google.protobuf.Empty - 2, // 10: map.v1.Map.MapFn:output_type -> map.v1.MapResponse - 3, // 11: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse - 10, // [10:12] is the sub-list for method output_type - 8, // [8:10] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 2, // 5: map.v1.MapResponse.status:type_name -> map.v1.Status + 8, // 6: map.v1.MapRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 8, // 7: map.v1.MapRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 6, // 8: map.v1.MapRequest.Request.headers:type_name -> map.v1.MapRequest.Request.HeadersEntry + 0, // 9: map.v1.Map.MapFn:input_type -> map.v1.MapRequest + 9, // 10: map.v1.Map.IsReady:input_type -> google.protobuf.Empty + 3, // 11: map.v1.Map.MapFn:output_type -> map.v1.MapResponse + 4, // 12: map.v1.Map.IsReady:output_type -> map.v1.ReadyResponse + 11, // [11:13] is the sub-list for method output_type + 9, // [9:11] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_pkg_apis_proto_map_v1_map_proto_init() } @@ -601,7 +614,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*MapResponse); i { + switch v := v.(*Status); i { case 0: return &v.state case 1: @@ -613,7 +626,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*ReadyResponse); i { + switch v := v.(*MapResponse); i { case 0: return &v.state case 1: @@ -625,7 +638,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*MapRequest_Request); i { + switch v := v.(*ReadyResponse); i { case 0: return &v.state case 1: @@ -637,7 +650,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*MapRequest_Status); i { + switch v := v.(*MapRequest_Request); i { case 0: return &v.state case 1: @@ -662,7 +675,7 @@ func file_pkg_apis_proto_map_v1_map_proto_init() { } } file_pkg_apis_proto_map_v1_map_proto_msgTypes[0].OneofWrappers = []any{} - file_pkg_apis_proto_map_v1_map_proto_msgTypes[2].OneofWrappers = []any{} + file_pkg_apis_proto_map_v1_map_proto_msgTypes[3].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/pkg/apis/proto/map/v1/map.proto b/pkg/apis/proto/map/v1/map.proto index 4442de80..f34cc4e9 100644 --- a/pkg/apis/proto/map/v1/map.proto +++ b/pkg/apis/proto/map/v1/map.proto @@ -26,9 +26,6 @@ message MapRequest { google.protobuf.Timestamp watermark = 4; map headers = 5; } - message Status { - bool eot = 1; - } Request request = 1; // This ID is used to uniquely identify a map request string id = 2; @@ -44,6 +41,13 @@ message Handshake { bool sot = 1; } +/* + * Status message to indicate the status of the message. + */ +message Status { + bool eot = 1; +} + /** * MapResponse represents a response element. */ @@ -57,6 +61,7 @@ message MapResponse { // This ID is used to refer the responses to the request it corresponds to. string id = 2; optional Handshake handshake = 3; + optional Status status = 4; } /** diff --git a/pkg/batchmapper/examples/batchmap_flatmap/go.mod b/pkg/batchmapper/examples/batchmap_flatmap/go.mod index 91c38a70..fa00e50d 100644 --- a/pkg/batchmapper/examples/batchmap_flatmap/go.mod +++ b/pkg/batchmapper/examples/batchmap_flatmap/go.mod @@ -7,7 +7,6 @@ replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.1 require ( - go.uber.org/atomic v1.11.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect diff --git a/pkg/batchmapper/examples/batchmap_flatmap/go.sum b/pkg/batchmapper/examples/batchmap_flatmap/go.sum index 38feccd7..36997a49 100644 --- a/pkg/batchmapper/examples/batchmap_flatmap/go.sum +++ b/pkg/batchmapper/examples/batchmap_flatmap/go.sum @@ -6,8 +6,6 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= -go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index f3a8829d..b81cc217 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -33,8 +33,8 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons return &mappb.ReadyResponse{Ready: true}, nil } -// BatchMapFn applies a user defined function to a stream of request elements and streams back the responses for them. -func (fs *Service) BatchMapFn(stream mappb.Map_MapFnServer) error { +// MapFn applies a user defined function to a stream of request elements and streams back the responses for them. +func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { ctx := stream.Context() // Perform handshake before entering the main loop @@ -153,10 +153,30 @@ func (fs *Service) processData(ctx context.Context, stream mappb.Map_MapFnServer Results: elements, Id: batchResp.Id(), } + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if err := stream.Send(singleRequestResp); err != nil { log.Println("BatchMapFn: Got an error while Send() on stream", err) return err } } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + // send the end of transmission message + eot := &mappb.MapResponse{ + Status: &mappb.Status{ + Eot: true, + }, + } + if err := stream.Send(eot); err != nil { + log.Println("BatchMapFn: Got an error while Send() on stream", err) + } + } return nil } diff --git a/pkg/batchmapper/service_test.go b/pkg/batchmapper/service_test.go index 2671c362..a0278e11 100644 --- a/pkg/batchmapper/service_test.go +++ b/pkg/batchmapper/service_test.go @@ -131,6 +131,12 @@ func TestService_BatchMapFn(t *testing.T) { }, Id: "test2", }, + { + Request: &mappb.MapRequest_Request{}, + Status: &mappb.Status{ + Eot: true, + }, + }, }, expected: []*mappb.MapResponse{ { @@ -156,6 +162,11 @@ func TestService_BatchMapFn(t *testing.T) { }, Id: "test2", }, + { + Status: &mappb.Status{ + Eot: true, + }, + }, }, expectedErr: false, }, @@ -219,6 +230,11 @@ func TestService_BatchMapFn(t *testing.T) { }, Id: "test2", }, + { + Status: &mappb.Status{ + Eot: true, + }, + }, }, expectedErr: true, }, @@ -250,7 +266,7 @@ func TestService_BatchMapFn(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - err = fs.BatchMapFn(udfBatchMapFnStream) + err = fs.MapFn(udfBatchMapFnStream) close(outputCh) }()