diff --git a/go.mod b/go.mod index e5cf9d54..77f26309 100644 --- a/go.mod +++ b/go.mod @@ -2,8 +2,6 @@ module github.com/numaproj/numaflow-go go 1.21 -toolchain go1.22.4 - require ( github.com/golang/mock v1.6.0 github.com/stretchr/testify v1.8.1 diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index 9fd77179..dba248ac 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -22,6 +22,100 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +// Code to indicate the status of the response. +type ReadResponse_Status_Code int32 + +const ( + ReadResponse_Status_SUCCESS ReadResponse_Status_Code = 0 + ReadResponse_Status_FAILURE ReadResponse_Status_Code = 1 +) + +// Enum value maps for ReadResponse_Status_Code. +var ( + ReadResponse_Status_Code_name = map[int32]string{ + 0: "SUCCESS", + 1: "FAILURE", + } + ReadResponse_Status_Code_value = map[string]int32{ + "SUCCESS": 0, + "FAILURE": 1, + } +) + +func (x ReadResponse_Status_Code) Enum() *ReadResponse_Status_Code { + p := new(ReadResponse_Status_Code) + *p = x + return p +} + +func (x ReadResponse_Status_Code) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ReadResponse_Status_Code) Descriptor() protoreflect.EnumDescriptor { + return file_pkg_apis_proto_source_v1_source_proto_enumTypes[0].Descriptor() +} + +func (ReadResponse_Status_Code) Type() protoreflect.EnumType { + return &file_pkg_apis_proto_source_v1_source_proto_enumTypes[0] +} + +func (x ReadResponse_Status_Code) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ReadResponse_Status_Code.Descriptor instead. +func (ReadResponse_Status_Code) EnumDescriptor() ([]byte, []int) { + return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{1, 1, 0} +} + +// Error to indicate the error type. If the code is FAILURE, then the error field will be populated. +type ReadResponse_Status_Error int32 + +const ( + ReadResponse_Status_UNACKED ReadResponse_Status_Error = 0 + ReadResponse_Status_OTHER ReadResponse_Status_Error = 1 +) + +// Enum value maps for ReadResponse_Status_Error. +var ( + ReadResponse_Status_Error_name = map[int32]string{ + 0: "UNACKED", + 1: "OTHER", + } + ReadResponse_Status_Error_value = map[string]int32{ + "UNACKED": 0, + "OTHER": 1, + } +) + +func (x ReadResponse_Status_Error) Enum() *ReadResponse_Status_Error { + p := new(ReadResponse_Status_Error) + *p = x + return p +} + +func (x ReadResponse_Status_Error) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ReadResponse_Status_Error) Descriptor() protoreflect.EnumDescriptor { + return file_pkg_apis_proto_source_v1_source_proto_enumTypes[1].Descriptor() +} + +func (ReadResponse_Status_Error) Type() protoreflect.EnumType { + return &file_pkg_apis_proto_source_v1_source_proto_enumTypes[1] +} + +func (x ReadResponse_Status_Error) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ReadResponse_Status_Error.Descriptor instead. +func (ReadResponse_Status_Error) EnumDescriptor() ([]byte, []int) { + return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{1, 1, 1} +} + // ReadRequest is the request for reading datum stream from user defined source. type ReadRequest struct { state protoimpl.MessageState @@ -79,6 +173,8 @@ type ReadResponse struct { // Required field holding the result. Result *ReadResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` + // Status of the response. Holds the end of transmission flag and the status code. + Status *ReadResponse_Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` } func (x *ReadResponse) Reset() { @@ -120,6 +216,13 @@ func (x *ReadResponse) GetResult() *ReadResponse_Result { return nil } +func (x *ReadResponse) GetStatus() *ReadResponse_Status { + if x != nil { + return x.Status + } + return nil +} + // AckRequest is the request for acknowledging datum. // It takes a list of offsets to be acknowledged. type AckRequest struct { @@ -584,6 +687,78 @@ func (x *ReadResponse_Result) GetHeaders() map[string]string { return nil } +type ReadResponse_Status struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // End of transmission flag. + Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` + Code ReadResponse_Status_Code `protobuf:"varint,2,opt,name=code,proto3,enum=source.v1.ReadResponse_Status_Code" json:"code,omitempty"` + Error ReadResponse_Status_Error `protobuf:"varint,3,opt,name=error,proto3,enum=source.v1.ReadResponse_Status_Error" json:"error,omitempty"` + Msg *string `protobuf:"bytes,4,opt,name=msg,proto3,oneof" json:"msg,omitempty"` +} + +func (x *ReadResponse_Status) Reset() { + *x = ReadResponse_Status{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadResponse_Status) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadResponse_Status) ProtoMessage() {} + +func (x *ReadResponse_Status) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[10] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadResponse_Status.ProtoReflect.Descriptor instead. +func (*ReadResponse_Status) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{1, 1} +} + +func (x *ReadResponse_Status) GetEot() bool { + if x != nil { + return x.Eot + } + return false +} + +func (x *ReadResponse_Status) GetCode() ReadResponse_Status_Code { + if x != nil { + return x.Code + } + return ReadResponse_Status_SUCCESS +} + +func (x *ReadResponse_Status) GetError() ReadResponse_Status_Error { + if x != nil { + return x.Error + } + return ReadResponse_Status_UNACKED +} + +func (x *ReadResponse_Status) GetMsg() string { + if x != nil && x.Msg != nil { + return *x.Msg + } + return "" +} + type AckRequest_Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -596,7 +771,7 @@ type AckRequest_Request struct { func (x *AckRequest_Request) Reset() { *x = AckRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[11] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -609,7 +784,7 @@ func (x *AckRequest_Request) String() string { func (*AckRequest_Request) ProtoMessage() {} func (x *AckRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[11] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -644,7 +819,7 @@ type AckResponse_Result struct { func (x *AckResponse_Result) Reset() { *x = AckResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[12] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -657,7 +832,7 @@ func (x *AckResponse_Result) String() string { func (*AckResponse_Result) ProtoMessage() {} func (x *AckResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[12] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -693,7 +868,7 @@ type PendingResponse_Result struct { func (x *PendingResponse_Result) Reset() { *x = PendingResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[13] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -706,7 +881,7 @@ func (x *PendingResponse_Result) String() string { func (*PendingResponse_Result) ProtoMessage() {} func (x *PendingResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[13] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -741,7 +916,7 @@ type PartitionsResponse_Result struct { func (x *PartitionsResponse_Result) Reset() { *x = PartitionsResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[14] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -754,7 +929,7 @@ func (x *PartitionsResponse_Result) String() string { func (*PartitionsResponse_Result) ProtoMessage() {} func (x *PartitionsResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[14] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -796,92 +971,111 @@ var file_pkg_apis_proto_source_v1_source_proto_rawDesc = []byte{ 0x6f, 0x72, 0x64, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x52, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x73, 0x12, 0x22, 0x0a, 0x0d, 0x74, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x5f, 0x69, 0x6e, 0x5f, 0x6d, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x74, - 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x49, 0x6e, 0x4d, 0x73, 0x22, 0xe8, 0x02, 0x0a, 0x0c, 0x52, + 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x49, 0x6e, 0x4d, 0x73, 0x22, 0x94, 0x05, 0x0a, 0x0c, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x1a, 0x9f, 0x02, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x18, - 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, - 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x29, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x06, 0x6f, 0x66, 0x66, - 0x73, 0x65, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, - 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, - 0x79, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, - 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, - 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, - 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, 0x7b, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x34, 0x0a, 0x07, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, - 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x22, 0x80, 0x01, 0x0a, 0x0b, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x35, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, - 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x3a, 0x0a, 0x06, 0x52, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x12, 0x30, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x07, 0x73, 0x75, - 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0x6c, 0x0a, 0x0f, - 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x39, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x21, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, 0x64, - 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x1e, 0x0a, 0x06, 0x52, 0x65, - 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x7c, 0x0a, 0x12, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x3c, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x24, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, - 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x28, - 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, - 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0a, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, - 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xc6, 0x02, - 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3d, 0x0a, 0x06, 0x52, 0x65, 0x61, 0x64, - 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, - 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x38, 0x0a, 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, - 0x12, 0x15, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, - 0x01, 0x12, 0x3f, 0x0a, 0x09, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, 0x12, 0x16, + 0x75, 0x6c, 0x74, 0x12, 0x36, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x1a, 0x9f, 0x02, 0x0a, 0x06, + 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x29, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x11, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x66, 0x66, + 0x73, 0x65, 0x74, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x04, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x68, 0x65, + 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, + 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0xf1, 0x01, + 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x12, 0x37, 0x0a, 0x04, 0x63, 0x6f, + 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x43, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x63, + 0x6f, 0x64, 0x65, 0x12, 0x3a, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, + 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, + 0x15, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x03, + 0x6d, 0x73, 0x67, 0x88, 0x01, 0x01, 0x22, 0x20, 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x0b, + 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, + 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x22, 0x1f, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x41, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x00, 0x12, 0x09, + 0x0a, 0x05, 0x4f, 0x54, 0x48, 0x45, 0x52, 0x10, 0x01, 0x42, 0x06, 0x0a, 0x04, 0x5f, 0x6d, 0x73, + 0x67, 0x22, 0x7b, 0x0a, 0x0a, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x37, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, + 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x34, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x29, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x22, 0x80, + 0x01, 0x0a, 0x0b, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x35, + 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, + 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x3a, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, + 0x30, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, + 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0x6c, 0x0a, 0x0f, 0x50, 0x65, 0x6e, 0x64, + 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x39, 0x0a, 0x06, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, + 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x1e, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, + 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x7c, 0x0a, 0x12, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x06, + 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x28, 0x0a, 0x06, 0x52, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, + 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, + 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, 0x61, + 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xc6, 0x02, 0x0a, 0x06, 0x53, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x12, 0x3d, 0x0a, 0x06, 0x52, 0x65, 0x61, 0x64, 0x46, 0x6e, 0x12, 0x16, + 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x38, 0x0a, 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, 0x12, 0x15, 0x2e, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x3f, 0x0a, + 0x09, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, + 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, + 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, + 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, - 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1d, 0x2e, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, - 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, - 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, - 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, - 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, + 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -896,53 +1090,60 @@ func file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_source_v1_source_proto_rawDescData } -var file_pkg_apis_proto_source_v1_source_proto_msgTypes = make([]protoimpl.MessageInfo, 15) +var file_pkg_apis_proto_source_v1_source_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_pkg_apis_proto_source_v1_source_proto_msgTypes = make([]protoimpl.MessageInfo, 16) var file_pkg_apis_proto_source_v1_source_proto_goTypes = []any{ - (*ReadRequest)(nil), // 0: source.v1.ReadRequest - (*ReadResponse)(nil), // 1: source.v1.ReadResponse - (*AckRequest)(nil), // 2: source.v1.AckRequest - (*AckResponse)(nil), // 3: source.v1.AckResponse - (*ReadyResponse)(nil), // 4: source.v1.ReadyResponse - (*PendingResponse)(nil), // 5: source.v1.PendingResponse - (*PartitionsResponse)(nil), // 6: source.v1.PartitionsResponse - (*Offset)(nil), // 7: source.v1.Offset - (*ReadRequest_Request)(nil), // 8: source.v1.ReadRequest.Request - (*ReadResponse_Result)(nil), // 9: source.v1.ReadResponse.Result - nil, // 10: source.v1.ReadResponse.Result.HeadersEntry - (*AckRequest_Request)(nil), // 11: source.v1.AckRequest.Request - (*AckResponse_Result)(nil), // 12: source.v1.AckResponse.Result - (*PendingResponse_Result)(nil), // 13: source.v1.PendingResponse.Result - (*PartitionsResponse_Result)(nil), // 14: source.v1.PartitionsResponse.Result - (*timestamppb.Timestamp)(nil), // 15: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 16: google.protobuf.Empty + (ReadResponse_Status_Code)(0), // 0: source.v1.ReadResponse.Status.Code + (ReadResponse_Status_Error)(0), // 1: source.v1.ReadResponse.Status.Error + (*ReadRequest)(nil), // 2: source.v1.ReadRequest + (*ReadResponse)(nil), // 3: source.v1.ReadResponse + (*AckRequest)(nil), // 4: source.v1.AckRequest + (*AckResponse)(nil), // 5: source.v1.AckResponse + (*ReadyResponse)(nil), // 6: source.v1.ReadyResponse + (*PendingResponse)(nil), // 7: source.v1.PendingResponse + (*PartitionsResponse)(nil), // 8: source.v1.PartitionsResponse + (*Offset)(nil), // 9: source.v1.Offset + (*ReadRequest_Request)(nil), // 10: source.v1.ReadRequest.Request + (*ReadResponse_Result)(nil), // 11: source.v1.ReadResponse.Result + (*ReadResponse_Status)(nil), // 12: source.v1.ReadResponse.Status + nil, // 13: source.v1.ReadResponse.Result.HeadersEntry + (*AckRequest_Request)(nil), // 14: source.v1.AckRequest.Request + (*AckResponse_Result)(nil), // 15: source.v1.AckResponse.Result + (*PendingResponse_Result)(nil), // 16: source.v1.PendingResponse.Result + (*PartitionsResponse_Result)(nil), // 17: source.v1.PartitionsResponse.Result + (*timestamppb.Timestamp)(nil), // 18: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 19: google.protobuf.Empty } var file_pkg_apis_proto_source_v1_source_proto_depIdxs = []int32{ - 8, // 0: source.v1.ReadRequest.request:type_name -> source.v1.ReadRequest.Request - 9, // 1: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result - 11, // 2: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request - 12, // 3: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result - 13, // 4: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result - 14, // 5: source.v1.PartitionsResponse.result:type_name -> source.v1.PartitionsResponse.Result - 7, // 6: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset - 15, // 7: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp - 10, // 8: source.v1.ReadResponse.Result.headers:type_name -> source.v1.ReadResponse.Result.HeadersEntry - 7, // 9: source.v1.AckRequest.Request.offset:type_name -> source.v1.Offset - 16, // 10: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty - 0, // 11: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest - 2, // 12: source.v1.Source.AckFn:input_type -> source.v1.AckRequest - 16, // 13: source.v1.Source.PendingFn:input_type -> google.protobuf.Empty - 16, // 14: source.v1.Source.PartitionsFn:input_type -> google.protobuf.Empty - 16, // 15: source.v1.Source.IsReady:input_type -> google.protobuf.Empty - 1, // 16: source.v1.Source.ReadFn:output_type -> source.v1.ReadResponse - 3, // 17: source.v1.Source.AckFn:output_type -> source.v1.AckResponse - 5, // 18: source.v1.Source.PendingFn:output_type -> source.v1.PendingResponse - 6, // 19: source.v1.Source.PartitionsFn:output_type -> source.v1.PartitionsResponse - 4, // 20: source.v1.Source.IsReady:output_type -> source.v1.ReadyResponse - 16, // [16:21] is the sub-list for method output_type - 11, // [11:16] is the sub-list for method input_type - 11, // [11:11] is the sub-list for extension type_name - 11, // [11:11] is the sub-list for extension extendee - 0, // [0:11] is the sub-list for field type_name + 10, // 0: source.v1.ReadRequest.request:type_name -> source.v1.ReadRequest.Request + 11, // 1: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result + 12, // 2: source.v1.ReadResponse.status:type_name -> source.v1.ReadResponse.Status + 14, // 3: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request + 15, // 4: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result + 16, // 5: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result + 17, // 6: source.v1.PartitionsResponse.result:type_name -> source.v1.PartitionsResponse.Result + 9, // 7: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset + 18, // 8: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp + 13, // 9: source.v1.ReadResponse.Result.headers:type_name -> source.v1.ReadResponse.Result.HeadersEntry + 0, // 10: source.v1.ReadResponse.Status.code:type_name -> source.v1.ReadResponse.Status.Code + 1, // 11: source.v1.ReadResponse.Status.error:type_name -> source.v1.ReadResponse.Status.Error + 9, // 12: source.v1.AckRequest.Request.offset:type_name -> source.v1.Offset + 19, // 13: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty + 2, // 14: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest + 4, // 15: source.v1.Source.AckFn:input_type -> source.v1.AckRequest + 19, // 16: source.v1.Source.PendingFn:input_type -> google.protobuf.Empty + 19, // 17: source.v1.Source.PartitionsFn:input_type -> google.protobuf.Empty + 19, // 18: source.v1.Source.IsReady:input_type -> google.protobuf.Empty + 3, // 19: source.v1.Source.ReadFn:output_type -> source.v1.ReadResponse + 5, // 20: source.v1.Source.AckFn:output_type -> source.v1.AckResponse + 7, // 21: source.v1.Source.PendingFn:output_type -> source.v1.PendingResponse + 8, // 22: source.v1.Source.PartitionsFn:output_type -> source.v1.PartitionsResponse + 6, // 23: source.v1.Source.IsReady:output_type -> source.v1.ReadyResponse + 19, // [19:24] is the sub-list for method output_type + 14, // [14:19] is the sub-list for method input_type + 14, // [14:14] is the sub-list for extension type_name + 14, // [14:14] is the sub-list for extension extendee + 0, // [0:14] is the sub-list for field type_name } func init() { file_pkg_apis_proto_source_v1_source_proto_init() } @@ -1071,8 +1272,8 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { return nil } } - file_pkg_apis_proto_source_v1_source_proto_msgTypes[11].Exporter = func(v any, i int) any { - switch v := v.(*AckRequest_Request); i { + file_pkg_apis_proto_source_v1_source_proto_msgTypes[10].Exporter = func(v any, i int) any { + switch v := v.(*ReadResponse_Status); i { case 0: return &v.state case 1: @@ -1084,7 +1285,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[12].Exporter = func(v any, i int) any { - switch v := v.(*AckResponse_Result); i { + switch v := v.(*AckRequest_Request); i { case 0: return &v.state case 1: @@ -1096,7 +1297,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[13].Exporter = func(v any, i int) any { - switch v := v.(*PendingResponse_Result); i { + switch v := v.(*AckResponse_Result); i { case 0: return &v.state case 1: @@ -1108,6 +1309,18 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[14].Exporter = func(v any, i int) any { + switch v := v.(*PendingResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_source_v1_source_proto_msgTypes[15].Exporter = func(v any, i int) any { switch v := v.(*PartitionsResponse_Result); i { case 0: return &v.state @@ -1120,18 +1333,20 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } } + file_pkg_apis_proto_source_v1_source_proto_msgTypes[10].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_source_v1_source_proto_rawDesc, - NumEnums: 0, - NumMessages: 15, + NumEnums: 2, + NumMessages: 16, NumExtensions: 0, NumServices: 1, }, GoTypes: file_pkg_apis_proto_source_v1_source_proto_goTypes, DependencyIndexes: file_pkg_apis_proto_source_v1_source_proto_depIdxs, + EnumInfos: file_pkg_apis_proto_source_v1_source_proto_enumTypes, MessageInfos: file_pkg_apis_proto_source_v1_source_proto_msgTypes, }.Build() File_pkg_apis_proto_source_v1_source_proto = out.File diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index 1f251b3c..314f2905 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -68,8 +68,30 @@ message ReadResponse { // e.g. Kafka and Redis Stream message usually include information about the headers. map headers = 5; } + message Status { + // Code to indicate the status of the response. + enum Code { + SUCCESS = 0; + FAILURE = 1; + } + + // Error to indicate the error type. If the code is FAILURE, then the error field will be populated. + enum Error { + UNACKED = 0; + OTHER = 1; + } + + // End of transmission flag. + bool eot = 1; + Code code = 2; + Error error = 3; + optional string msg = 4; + } // Required field holding the result. Result result = 1; + // Status of the response. Holds the end of transmission flag and the status code. + // + Status status = 2; } /* diff --git a/pkg/batchmapper/examples/batchmap_flatmap/go.mod b/pkg/batchmapper/examples/batchmap_flatmap/go.mod index 2a4f2180..955ee4ec 100644 --- a/pkg/batchmapper/examples/batchmap_flatmap/go.mod +++ b/pkg/batchmapper/examples/batchmap_flatmap/go.mod @@ -2,8 +2,6 @@ module batchmap-flatmap go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/mapper/examples/even_odd/go.mod b/pkg/mapper/examples/even_odd/go.mod index 043ac8f4..9ae5e962 100644 --- a/pkg/mapper/examples/even_odd/go.mod +++ b/pkg/mapper/examples/even_odd/go.mod @@ -2,8 +2,6 @@ module even_odd go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/mapper/examples/flatmap/go.mod b/pkg/mapper/examples/flatmap/go.mod index 4a0f8b38..e0d09895 100644 --- a/pkg/mapper/examples/flatmap/go.mod +++ b/pkg/mapper/examples/flatmap/go.mod @@ -2,8 +2,6 @@ module flatmap go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/mapper/examples/forward_message/go.mod b/pkg/mapper/examples/forward_message/go.mod index ec58f147..105bb757 100644 --- a/pkg/mapper/examples/forward_message/go.mod +++ b/pkg/mapper/examples/forward_message/go.mod @@ -2,8 +2,6 @@ module forward_message go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/mapper/examples/retry/go.mod b/pkg/mapper/examples/retry/go.mod index 539300e3..7b99a191 100644 --- a/pkg/mapper/examples/retry/go.mod +++ b/pkg/mapper/examples/retry/go.mod @@ -2,8 +2,6 @@ module retry go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/mapper/examples/tickgen/go.mod b/pkg/mapper/examples/tickgen/go.mod index 15fc53cc..ae01041e 100644 --- a/pkg/mapper/examples/tickgen/go.mod +++ b/pkg/mapper/examples/tickgen/go.mod @@ -2,8 +2,6 @@ module tickgen go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/mapstreamer/examples/flatmap_stream/go.mod b/pkg/mapstreamer/examples/flatmap_stream/go.mod index 85fa7d0d..7ecda24b 100644 --- a/pkg/mapstreamer/examples/flatmap_stream/go.mod +++ b/pkg/mapstreamer/examples/flatmap_stream/go.mod @@ -2,8 +2,6 @@ module flatmap_stream go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/reducer/examples/counter/go.mod b/pkg/reducer/examples/counter/go.mod index f1bd9353..f0feabbe 100644 --- a/pkg/reducer/examples/counter/go.mod +++ b/pkg/reducer/examples/counter/go.mod @@ -2,8 +2,6 @@ module counter go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/reducer/examples/sum/go.mod b/pkg/reducer/examples/sum/go.mod index 5a04da23..5425ca28 100644 --- a/pkg/reducer/examples/sum/go.mod +++ b/pkg/reducer/examples/sum/go.mod @@ -2,8 +2,6 @@ module sum go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/reducestreamer/examples/counter/go.mod b/pkg/reducestreamer/examples/counter/go.mod index f1bd9353..f0feabbe 100644 --- a/pkg/reducestreamer/examples/counter/go.mod +++ b/pkg/reducestreamer/examples/counter/go.mod @@ -2,8 +2,6 @@ module counter go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/reducestreamer/examples/sum/go.mod b/pkg/reducestreamer/examples/sum/go.mod index 5a04da23..5425ca28 100644 --- a/pkg/reducestreamer/examples/sum/go.mod +++ b/pkg/reducestreamer/examples/sum/go.mod @@ -2,8 +2,6 @@ module sum go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/sessionreducer/examples/counter/go.mod b/pkg/sessionreducer/examples/counter/go.mod index a255e821..17aa3337 100644 --- a/pkg/sessionreducer/examples/counter/go.mod +++ b/pkg/sessionreducer/examples/counter/go.mod @@ -2,8 +2,6 @@ module counter go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require ( diff --git a/pkg/sessionreducer/examples/sum/go.mod b/pkg/sessionreducer/examples/sum/go.mod index aff635c8..c02d9747 100644 --- a/pkg/sessionreducer/examples/sum/go.mod +++ b/pkg/sessionreducer/examples/sum/go.mod @@ -2,8 +2,6 @@ module sum go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require ( diff --git a/pkg/sideinput/examples/simple_sideinput/go.mod b/pkg/sideinput/examples/simple_sideinput/go.mod index 4cb237ce..bf7eabef 100644 --- a/pkg/sideinput/examples/simple_sideinput/go.mod +++ b/pkg/sideinput/examples/simple_sideinput/go.mod @@ -2,8 +2,6 @@ module simple_sideinput go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/sideinput/examples/simple_sideinput/udf/go.mod b/pkg/sideinput/examples/simple_sideinput/udf/go.mod index e94f8c53..f9d7bd73 100644 --- a/pkg/sideinput/examples/simple_sideinput/udf/go.mod +++ b/pkg/sideinput/examples/simple_sideinput/udf/go.mod @@ -2,8 +2,6 @@ module udf go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../../.. require ( diff --git a/pkg/sinker/examples/fallback/go.mod b/pkg/sinker/examples/fallback/go.mod index 58181144..2c901d73 100644 --- a/pkg/sinker/examples/fallback/go.mod +++ b/pkg/sinker/examples/fallback/go.mod @@ -2,8 +2,6 @@ module fallback go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/sinker/examples/log/go.mod b/pkg/sinker/examples/log/go.mod index 2699aa86..63639be0 100644 --- a/pkg/sinker/examples/log/go.mod +++ b/pkg/sinker/examples/log/go.mod @@ -2,8 +2,6 @@ module log_sink go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/sinker/examples/redis_sink/go.mod b/pkg/sinker/examples/redis_sink/go.mod index 1cad373c..9427cbca 100644 --- a/pkg/sinker/examples/redis_sink/go.mod +++ b/pkg/sinker/examples/redis_sink/go.mod @@ -2,8 +2,6 @@ module redis-e2e-test-sink go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require ( diff --git a/pkg/sourcer/examples/simple_source/go.mod b/pkg/sourcer/examples/simple_source/go.mod index 841ad3a7..02e15749 100644 --- a/pkg/sourcer/examples/simple_source/go.mod +++ b/pkg/sourcer/examples/simple_source/go.mod @@ -2,8 +2,6 @@ module simple_source go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require ( @@ -14,11 +12,10 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/protobuf v1.5.3 // indirect github.com/kr/text v0.2.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect - golang.org/x/net v0.28.0 // indirect + golang.org/x/net v0.29.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/sourcer/examples/simple_source/go.sum b/pkg/sourcer/examples/simple_source/go.sum index cfffb6f2..4646dbfd 100644 --- a/pkg/sourcer/examples/simple_source/go.sum +++ b/pkg/sourcer/examples/simple_source/go.sum @@ -2,15 +2,12 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= -github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= -github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -24,29 +21,23 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= -golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= -golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= +golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 h1:0nDDozoAU19Qb2HwhXadU8OcsiO/09cnTqhUtq2MEOM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1:pPJltXNxVzT4pK9yD8vR9X75DaWYYmLGMsEvBfFQZzQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw= -google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo= +google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= -google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/sourcer/examples/simple_source/impl/simple_source.go b/pkg/sourcer/examples/simple_source/impl/simple_source.go index 4ec7d9cc..9409fe25 100644 --- a/pkg/sourcer/examples/simple_source/impl/simple_source.go +++ b/pkg/sourcer/examples/simple_source/impl/simple_source.go @@ -2,6 +2,7 @@ package impl import ( "context" + "log" "strconv" "sync" "time" @@ -27,8 +28,11 @@ func NewSimpleSource() *SimpleSource { } func (s *SimpleSource) Pending(_ context.Context) int64 { + s.lock.Lock() + defer s.lock.Unlock() + log.Println("Number of pending records: ", len(s.toAckSet)) // The simple source always returns zero to indicate there is no pending record. - return 0 + return int64(len(s.toAckSet)) } func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest, messageCh chan<- sourcesdk.Message) { @@ -42,9 +46,9 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest // leaving the toAckSet not empty on the UDSource container side. // In this case, for the next batch read, we should read the data from the last acked offset instead of returning. // Our built-in Kafka source follows this logic. - if len(s.toAckSet) > 0 { - return - } + //if len(s.toAckSet) > 0 { + // return + //} // Read the data from the source and send the data to the message channel. for i := 0; uint64(i) < readRequest.Count(); i++ { @@ -72,7 +76,10 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest } func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) { - delete(s.toAckSet, deserializeOffset(request.Offset().Value())) + s.lock.Lock() + defer s.lock.Unlock() + offset := deserializeOffset(request.Offset().Value()) + delete(s.toAckSet, offset) } func (s *SimpleSource) Partitions(_ context.Context) []int32 { diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index eff56796..2d58de8c 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -32,7 +32,6 @@ type Service struct { // ReadFn reads the data from the source. func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { ctx := stream.Context() - messageCh := make(chan Message) errCh := make(chan error, 1) var wg sync.WaitGroup @@ -40,7 +39,6 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { wg.Add(1) go func() { defer wg.Done() - defer close(messageCh) // handle panic defer func() { if r := recover(); r != nil { @@ -62,35 +60,53 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { return } - request := readRequest{ - count: req.Request.GetNumRecords(), - timeout: time.Duration(req.Request.GetTimeoutInMs()) * time.Millisecond, + messageCh := make(chan Message) + go func() { + defer close(messageCh) + request := readRequest{ + count: req.Request.GetNumRecords(), + timeout: time.Duration(req.Request.GetTimeoutInMs()) * time.Millisecond, + } + fs.Source.Read(ctx, &request, messageCh) + }() + + // Read messages from the channel and send them to the stream. + for msg := range messageCh { + offset := &sourcepb.Offset{ + Offset: msg.Offset().Value(), + PartitionId: msg.Offset().PartitionId(), + } + element := &sourcepb.ReadResponse{ + Result: &sourcepb.ReadResponse_Result{ + Payload: msg.Value(), + Offset: offset, + EventTime: timestamppb.New(msg.EventTime()), + Keys: msg.Keys(), + Headers: msg.Headers(), + }, + Status: &sourcepb.ReadResponse_Status{ + Eot: false, + Code: 0, + }, + } + // The error here is returned by the stream, which is already a gRPC error + if err := stream.Send(element); err != nil { + errCh <- err + return + } + } + err = stream.Send(&sourcepb.ReadResponse{ + Status: &sourcepb.ReadResponse_Status{ + Eot: true, + Code: 0, + }, + }) + if err != nil { + errCh <- err + return } - - fs.Source.Read(ctx, &request, messageCh) } }() - // Read messages from the channel and send them to the stream, until the channel is closed - for msg := range messageCh { - offset := &sourcepb.Offset{ - Offset: msg.Offset().Value(), - PartitionId: msg.Offset().PartitionId(), - } - element := &sourcepb.ReadResponse{ - Result: &sourcepb.ReadResponse_Result{ - Payload: msg.Value(), - Offset: offset, - EventTime: timestamppb.New(msg.EventTime()), - Keys: msg.Keys(), - Headers: msg.Headers(), - }, - } - // The error here is returned by the stream, which is already a gRPC error - if err := stream.Send(element); err != nil { - // The channel may or may not be closed, as we are not sure, we leave it to GC. - return err - } - } // Wait for the goroutine to finish wg.Wait() diff --git a/pkg/sourcer/service_test.go b/pkg/sourcer/service_test.go index 3418ecab..ae484fec 100644 --- a/pkg/sourcer/service_test.go +++ b/pkg/sourcer/service_test.go @@ -171,23 +171,23 @@ func TestService_ReadFn(t *testing.T) { Keys: []string{testKey}, Headers: map[string]string{"x-txn-id": "test-txn-id"}, }, + Status: &sourcepb.ReadResponse_Status{ + Eot: false, + Code: sourcepb.ReadResponse_Status_SUCCESS, + }, }, - }, - expectedErr: false, - }, - { - name: "read_fn_err", - expected: []*sourcepb.ReadResponse{ { - Result: &sourcepb.ReadResponse_Result{ - Payload: []byte(`test`), - Offset: &sourcepb.Offset{}, - EventTime: timestamppb.New(testEventTime), - Keys: []string{testKey}, - Headers: map[string]string{"x-txn-id": "test-txn-id"}, + Status: &sourcepb.ReadResponse_Status{ + Eot: true, + Code: sourcepb.ReadResponse_Status_SUCCESS, }, }, }, + expectedErr: false, + }, + { + name: "read_fn_err", + expected: []*sourcepb.ReadResponse{}, expectedErr: true, }, } diff --git a/pkg/sourcetransformer/examples/assign_event_time/go.mod b/pkg/sourcetransformer/examples/assign_event_time/go.mod index ba9ba3bf..8e70063a 100644 --- a/pkg/sourcetransformer/examples/assign_event_time/go.mod +++ b/pkg/sourcetransformer/examples/assign_event_time/go.mod @@ -2,8 +2,6 @@ module assign_event_time go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require github.com/numaproj/numaflow-go v0.8.0 diff --git a/pkg/sourcetransformer/examples/event_time_filter/go.mod b/pkg/sourcetransformer/examples/event_time_filter/go.mod index 7682680b..73900b91 100644 --- a/pkg/sourcetransformer/examples/event_time_filter/go.mod +++ b/pkg/sourcetransformer/examples/event_time_filter/go.mod @@ -2,8 +2,6 @@ module event_time_filter go 1.21 -toolchain go1.22.4 - replace github.com/numaproj/numaflow-go => ../../../.. require (