From bd836d080848ee3ab94bb98349eac635904da78c Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 8 Dec 2023 22:43:01 +0530 Subject: [PATCH] chore: adding partitions for user defined source (#91) Signed-off-by: Yashash H L Signed-off-by: Vigith Maurice Co-authored-by: Vigith Maurice --- pkg/apis/proto/map/v1/map.pb.go | 2 +- pkg/apis/proto/map/v1/map_grpc.pb.go | 2 +- pkg/apis/proto/mapstream/v1/mapstream.pb.go | 2 +- .../proto/mapstream/v1/mapstream_grpc.pb.go | 2 +- pkg/apis/proto/reduce/v1/reduce.pb.go | 2 +- pkg/apis/proto/reduce/v1/reduce_grpc.pb.go | 2 +- pkg/apis/proto/sideinput/v1/sideinput.pb.go | 2 +- .../proto/sideinput/v1/sideinput_grpc.pb.go | 2 +- pkg/apis/proto/sink/v1/sink.pb.go | 2 +- pkg/apis/proto/sink/v1/sink_grpc.pb.go | 2 +- pkg/apis/proto/source/v1/source.pb.go | 309 +++++++++++++----- pkg/apis/proto/source/v1/source.proto | 17 +- pkg/apis/proto/source/v1/source_grpc.pb.go | 40 ++- .../proto/source/v1/sourcemock/sourcemock.go | 20 ++ .../proto/sourcetransform/v1/transform.pb.go | 2 +- .../sourcetransform/v1/transform_grpc.pb.go | 2 +- pkg/sourcer/examples/simple_source/go.mod | 2 +- pkg/sourcer/examples/simple_source/go.sum | 6 +- .../simple_source/impl/simple_source.go | 6 +- pkg/sourcer/interface.go | 6 + pkg/sourcer/message.go | 31 +- pkg/sourcer/server_test.go | 4 + pkg/sourcer/service.go | 9 + pkg/sourcer/service_test.go | 41 ++- 24 files changed, 396 insertions(+), 119 deletions(-) diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index 6dcf613c..29d0b37d 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index 9026727d..d3844348 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/mapstream/v1/mapstream.pb.go b/pkg/apis/proto/mapstream/v1/mapstream.pb.go index 5681354a..23112488 100644 --- a/pkg/apis/proto/mapstream/v1/mapstream.pb.go +++ b/pkg/apis/proto/mapstream/v1/mapstream.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/mapstream/v1/mapstream.proto package v1 diff --git a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go index ad88a113..e8927dc5 100644 --- a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go +++ b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/mapstream/v1/mapstream.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce.pb.go b/pkg/apis/proto/reduce/v1/reduce.pb.go index e952a775..a64affa1 100644 --- a/pkg/apis/proto/reduce/v1/reduce.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go index 11919bac..afe493d1 100644 --- a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput.pb.go b/pkg/apis/proto/sideinput/v1/sideinput.pb.go index 7f8b7280..5fd5b7cd 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go index 9d38558e..2562314d 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index 50658427..3da1634b 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink_grpc.pb.go b/pkg/apis/proto/sink/v1/sink_grpc.pb.go index 9af7b4e4..bc50da8e 100644 --- a/pkg/apis/proto/sink/v1/sink_grpc.pb.go +++ b/pkg/apis/proto/sink/v1/sink_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index 1c8dcba4..60903e31 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/source/v1/source.proto package v1 @@ -324,6 +324,55 @@ func (x *PendingResponse) GetResult() *PendingResponse_Result { return nil } +// PartitionsResponse is the response for the partitions request. +type PartitionsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required field holding the result. + Result *PartitionsResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` +} + +func (x *PartitionsResponse) Reset() { + *x = PartitionsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PartitionsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PartitionsResponse) ProtoMessage() {} + +func (x *PartitionsResponse) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PartitionsResponse.ProtoReflect.Descriptor instead. +func (*PartitionsResponse) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{6} +} + +func (x *PartitionsResponse) GetResult() *PartitionsResponse_Result { + if x != nil { + return x.Result + } + return nil +} + // Offset is the offset of the datum. type Offset struct { state protoimpl.MessageState @@ -338,13 +387,13 @@ type Offset struct { // Optional partition_id indicates which partition of the source the datum belongs to. // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. - PartitionId string `protobuf:"bytes,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + PartitionId int32 `protobuf:"varint,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` } func (x *Offset) Reset() { *x = Offset{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[6] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -357,7 +406,7 @@ func (x *Offset) String() string { func (*Offset) ProtoMessage() {} func (x *Offset) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[6] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -370,7 +419,7 @@ func (x *Offset) ProtoReflect() protoreflect.Message { // Deprecated: Use Offset.ProtoReflect.Descriptor instead. func (*Offset) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{6} + return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{7} } func (x *Offset) GetOffset() []byte { @@ -380,11 +429,11 @@ func (x *Offset) GetOffset() []byte { return nil } -func (x *Offset) GetPartitionId() string { +func (x *Offset) GetPartitionId() int32 { if x != nil { return x.PartitionId } - return "" + return 0 } type ReadRequest_Request struct { @@ -403,7 +452,7 @@ type ReadRequest_Request struct { func (x *ReadRequest_Request) Reset() { *x = ReadRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[7] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -416,7 +465,7 @@ func (x *ReadRequest_Request) String() string { func (*ReadRequest_Request) ProtoMessage() {} func (x *ReadRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[7] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -467,7 +516,7 @@ type ReadResponse_Result struct { func (x *ReadResponse_Result) Reset() { *x = ReadResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[8] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -480,7 +529,7 @@ func (x *ReadResponse_Result) String() string { func (*ReadResponse_Result) ProtoMessage() {} func (x *ReadResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[8] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -539,7 +588,7 @@ type AckRequest_Request struct { func (x *AckRequest_Request) Reset() { *x = AckRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[9] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -552,7 +601,7 @@ func (x *AckRequest_Request) String() string { func (*AckRequest_Request) ProtoMessage() {} func (x *AckRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[9] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -587,7 +636,7 @@ type AckResponse_Result struct { func (x *AckResponse_Result) Reset() { *x = AckResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[10] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -600,7 +649,7 @@ func (x *AckResponse_Result) String() string { func (*AckResponse_Result) ProtoMessage() {} func (x *AckResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[10] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -636,7 +685,7 @@ type PendingResponse_Result struct { func (x *PendingResponse_Result) Reset() { *x = PendingResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[11] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -649,7 +698,7 @@ func (x *PendingResponse_Result) String() string { func (*PendingResponse_Result) ProtoMessage() {} func (x *PendingResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[11] + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -672,6 +721,54 @@ func (x *PendingResponse_Result) GetCount() int64 { return 0 } +type PartitionsResponse_Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required field holding the list of partitions. + Partitions []int32 `protobuf:"varint,1,rep,packed,name=partitions,proto3" json:"partitions,omitempty"` +} + +func (x *PartitionsResponse_Result) Reset() { + *x = PartitionsResponse_Result{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PartitionsResponse_Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PartitionsResponse_Result) ProtoMessage() {} + +func (x *PartitionsResponse_Result) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_source_v1_source_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PartitionsResponse_Result.ProtoReflect.Descriptor instead. +func (*PartitionsResponse_Result) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP(), []int{6, 0} +} + +func (x *PartitionsResponse_Result) GetPartitions() []int32 { + if x != nil { + return x.Partitions + } + return nil +} + var File_pkg_apis_proto_source_v1_source_proto protoreflect.FileDescriptor var file_pkg_apis_proto_source_v1_source_proto_rawDesc = []byte{ @@ -731,31 +828,44 @@ var file_pkg_apis_proto_source_v1_source_proto_rawDesc = []byte{ 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x1e, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, - 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xfb, - 0x01, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x52, 0x65, 0x61, - 0x64, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, - 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, 0x12, - 0x15, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, - 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, - 0x0a, 0x09, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, - 0x70, 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, - 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, - 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3a, 0x5a, 0x38, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, - 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, - 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x7c, 0x0a, 0x12, 0x50, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x3c, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x24, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, + 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x28, 0x0a, + 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, + 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x05, 0x52, 0x0a, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, 0x73, 0x65, + 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, + 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, + 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xc2, 0x02, 0x0a, + 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3b, 0x0a, 0x06, 0x52, 0x65, 0x61, 0x64, 0x46, + 0x6e, 0x12, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, 0x12, 0x15, 0x2e, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x09, + 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, + 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, + 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, 0x6e, 0x12, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, + 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -770,46 +880,51 @@ func file_pkg_apis_proto_source_v1_source_proto_rawDescGZIP() []byte { return file_pkg_apis_proto_source_v1_source_proto_rawDescData } -var file_pkg_apis_proto_source_v1_source_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_pkg_apis_proto_source_v1_source_proto_msgTypes = make([]protoimpl.MessageInfo, 14) var file_pkg_apis_proto_source_v1_source_proto_goTypes = []interface{}{ - (*ReadRequest)(nil), // 0: source.v1.ReadRequest - (*ReadResponse)(nil), // 1: source.v1.ReadResponse - (*AckRequest)(nil), // 2: source.v1.AckRequest - (*AckResponse)(nil), // 3: source.v1.AckResponse - (*ReadyResponse)(nil), // 4: source.v1.ReadyResponse - (*PendingResponse)(nil), // 5: source.v1.PendingResponse - (*Offset)(nil), // 6: source.v1.Offset - (*ReadRequest_Request)(nil), // 7: source.v1.ReadRequest.Request - (*ReadResponse_Result)(nil), // 8: source.v1.ReadResponse.Result - (*AckRequest_Request)(nil), // 9: source.v1.AckRequest.Request - (*AckResponse_Result)(nil), // 10: source.v1.AckResponse.Result - (*PendingResponse_Result)(nil), // 11: source.v1.PendingResponse.Result - (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 13: google.protobuf.Empty + (*ReadRequest)(nil), // 0: source.v1.ReadRequest + (*ReadResponse)(nil), // 1: source.v1.ReadResponse + (*AckRequest)(nil), // 2: source.v1.AckRequest + (*AckResponse)(nil), // 3: source.v1.AckResponse + (*ReadyResponse)(nil), // 4: source.v1.ReadyResponse + (*PendingResponse)(nil), // 5: source.v1.PendingResponse + (*PartitionsResponse)(nil), // 6: source.v1.PartitionsResponse + (*Offset)(nil), // 7: source.v1.Offset + (*ReadRequest_Request)(nil), // 8: source.v1.ReadRequest.Request + (*ReadResponse_Result)(nil), // 9: source.v1.ReadResponse.Result + (*AckRequest_Request)(nil), // 10: source.v1.AckRequest.Request + (*AckResponse_Result)(nil), // 11: source.v1.AckResponse.Result + (*PendingResponse_Result)(nil), // 12: source.v1.PendingResponse.Result + (*PartitionsResponse_Result)(nil), // 13: source.v1.PartitionsResponse.Result + (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 15: google.protobuf.Empty } var file_pkg_apis_proto_source_v1_source_proto_depIdxs = []int32{ - 7, // 0: source.v1.ReadRequest.request:type_name -> source.v1.ReadRequest.Request - 8, // 1: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result - 9, // 2: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request - 10, // 3: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result - 11, // 4: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result - 6, // 5: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset - 12, // 6: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp - 6, // 7: source.v1.AckRequest.Request.offsets:type_name -> source.v1.Offset - 13, // 8: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty - 0, // 9: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest - 2, // 10: source.v1.Source.AckFn:input_type -> source.v1.AckRequest - 13, // 11: source.v1.Source.PendingFn:input_type -> google.protobuf.Empty - 13, // 12: source.v1.Source.IsReady:input_type -> google.protobuf.Empty - 1, // 13: source.v1.Source.ReadFn:output_type -> source.v1.ReadResponse - 3, // 14: source.v1.Source.AckFn:output_type -> source.v1.AckResponse - 5, // 15: source.v1.Source.PendingFn:output_type -> source.v1.PendingResponse - 4, // 16: source.v1.Source.IsReady:output_type -> source.v1.ReadyResponse - 13, // [13:17] is the sub-list for method output_type - 9, // [9:13] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 8, // 0: source.v1.ReadRequest.request:type_name -> source.v1.ReadRequest.Request + 9, // 1: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result + 10, // 2: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request + 11, // 3: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result + 12, // 4: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result + 13, // 5: source.v1.PartitionsResponse.result:type_name -> source.v1.PartitionsResponse.Result + 7, // 6: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset + 14, // 7: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp + 7, // 8: source.v1.AckRequest.Request.offsets:type_name -> source.v1.Offset + 15, // 9: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty + 0, // 10: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest + 2, // 11: source.v1.Source.AckFn:input_type -> source.v1.AckRequest + 15, // 12: source.v1.Source.PendingFn:input_type -> google.protobuf.Empty + 15, // 13: source.v1.Source.PartitionsFn:input_type -> google.protobuf.Empty + 15, // 14: source.v1.Source.IsReady:input_type -> google.protobuf.Empty + 1, // 15: source.v1.Source.ReadFn:output_type -> source.v1.ReadResponse + 3, // 16: source.v1.Source.AckFn:output_type -> source.v1.AckResponse + 5, // 17: source.v1.Source.PendingFn:output_type -> source.v1.PendingResponse + 6, // 18: source.v1.Source.PartitionsFn:output_type -> source.v1.PartitionsResponse + 4, // 19: source.v1.Source.IsReady:output_type -> source.v1.ReadyResponse + 15, // [15:20] is the sub-list for method output_type + 10, // [10:15] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_pkg_apis_proto_source_v1_source_proto_init() } @@ -891,7 +1006,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Offset); i { + switch v := v.(*PartitionsResponse); i { case 0: return &v.state case 1: @@ -903,7 +1018,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadRequest_Request); i { + switch v := v.(*Offset); i { case 0: return &v.state case 1: @@ -915,7 +1030,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadResponse_Result); i { + switch v := v.(*ReadRequest_Request); i { case 0: return &v.state case 1: @@ -927,7 +1042,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AckRequest_Request); i { + switch v := v.(*ReadResponse_Result); i { case 0: return &v.state case 1: @@ -939,7 +1054,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AckResponse_Result); i { + switch v := v.(*AckRequest_Request); i { case 0: return &v.state case 1: @@ -951,6 +1066,18 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AckResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_source_v1_source_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PendingResponse_Result); i { case 0: return &v.state @@ -962,6 +1089,18 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { return nil } } + file_pkg_apis_proto_source_v1_source_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PartitionsResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -969,7 +1108,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_source_v1_source_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 14, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index ab21a5cb..a0778dfe 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -23,6 +23,9 @@ service Source { // PendingFn returns the number of pending records at the user defined source. rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); + // PartitionsFn returns the list of partitions for the user defined source. + rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse); + // IsReady is the heartbeat endpoint for user defined source gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } @@ -120,6 +123,18 @@ message PendingResponse { Result result = 1; } +/* + * PartitionsResponse is the response for the partitions request. + */ +message PartitionsResponse { + message Result { + // Required field holding the list of partitions. + repeated int32 partitions = 1; + } + // Required field holding the result. + Result result = 1; +} + /* * Offset is the offset of the datum. */ @@ -132,5 +147,5 @@ message Offset { // Optional partition_id indicates which partition of the source the datum belongs to. // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. - string partition_id = 2; + int32 partition_id = 2; } \ No newline at end of file diff --git a/pkg/apis/proto/source/v1/source_grpc.pb.go b/pkg/apis/proto/source/v1/source_grpc.pb.go index 79c769d6..ebb38e85 100644 --- a/pkg/apis/proto/source/v1/source_grpc.pb.go +++ b/pkg/apis/proto/source/v1/source_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/source/v1/source.proto package v1 @@ -35,6 +35,8 @@ type SourceClient interface { AckFn(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) // PendingFn returns the number of pending records at the user defined source. PendingFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PendingResponse, error) + // PartitionsFn returns the list of partitions for the user defined source. + PartitionsFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PartitionsResponse, error) // IsReady is the heartbeat endpoint for user defined source gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) } @@ -97,6 +99,15 @@ func (c *sourceClient) PendingFn(ctx context.Context, in *emptypb.Empty, opts .. return out, nil } +func (c *sourceClient) PartitionsFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PartitionsResponse, error) { + out := new(PartitionsResponse) + err := c.cc.Invoke(ctx, "/source.v1.Source/PartitionsFn", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *sourceClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) err := c.cc.Invoke(ctx, "/source.v1.Source/IsReady", in, out, opts...) @@ -122,6 +133,8 @@ type SourceServer interface { AckFn(context.Context, *AckRequest) (*AckResponse, error) // PendingFn returns the number of pending records at the user defined source. PendingFn(context.Context, *emptypb.Empty) (*PendingResponse, error) + // PartitionsFn returns the list of partitions for the user defined source. + PartitionsFn(context.Context, *emptypb.Empty) (*PartitionsResponse, error) // IsReady is the heartbeat endpoint for user defined source gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) mustEmbedUnimplementedSourceServer() @@ -140,6 +153,9 @@ func (UnimplementedSourceServer) AckFn(context.Context, *AckRequest) (*AckRespon func (UnimplementedSourceServer) PendingFn(context.Context, *emptypb.Empty) (*PendingResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PendingFn not implemented") } +func (UnimplementedSourceServer) PartitionsFn(context.Context, *emptypb.Empty) (*PartitionsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PartitionsFn not implemented") +} func (UnimplementedSourceServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") } @@ -213,6 +229,24 @@ func _Source_PendingFn_Handler(srv interface{}, ctx context.Context, dec func(in return interceptor(ctx, in, info, handler) } +func _Source_PartitionsFn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SourceServer).PartitionsFn(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/source.v1.Source/PartitionsFn", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SourceServer).PartitionsFn(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + func _Source_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(emptypb.Empty) if err := dec(in); err != nil { @@ -246,6 +280,10 @@ var Source_ServiceDesc = grpc.ServiceDesc{ MethodName: "PendingFn", Handler: _Source_PendingFn_Handler, }, + { + MethodName: "PartitionsFn", + Handler: _Source_PartitionsFn_Handler, + }, { MethodName: "IsReady", Handler: _Source_IsReady_Handler, diff --git a/pkg/apis/proto/source/v1/sourcemock/sourcemock.go b/pkg/apis/proto/source/v1/sourcemock/sourcemock.go index 2975b862..6d23e3d1 100644 --- a/pkg/apis/proto/source/v1/sourcemock/sourcemock.go +++ b/pkg/apis/proto/source/v1/sourcemock/sourcemock.go @@ -78,6 +78,26 @@ func (mr *MockSourceClientMockRecorder) IsReady(arg0, arg1 interface{}, arg2 ... return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockSourceClient)(nil).IsReady), varargs...) } +// PartitionsFn mocks base method. +func (m *MockSourceClient) PartitionsFn(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.PartitionsResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PartitionsFn", varargs...) + ret0, _ := ret[0].(*v1.PartitionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PartitionsFn indicates an expected call of PartitionsFn. +func (mr *MockSourceClientMockRecorder) PartitionsFn(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PartitionsFn", reflect.TypeOf((*MockSourceClient)(nil).PartitionsFn), varargs...) +} + // PendingFn mocks base method. func (m *MockSourceClient) PendingFn(arg0 context.Context, arg1 *emptypb.Empty, arg2 ...grpc.CallOption) (*v1.PendingResponse, error) { m.ctrl.T.Helper() diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index f20a8d88..6aff9fb6 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.28.1 -// protoc v4.23.4 +// protoc v4.25.1 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 diff --git a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go index 5aac6e78..a3f4c206 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.2.0 -// - protoc v4.23.4 +// - protoc v4.25.1 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 diff --git a/pkg/sourcer/examples/simple_source/go.mod b/pkg/sourcer/examples/simple_source/go.mod index a0550a49..f36309b2 100644 --- a/pkg/sourcer/examples/simple_source/go.mod +++ b/pkg/sourcer/examples/simple_source/go.mod @@ -3,7 +3,7 @@ module github.com/numaproj/numaflow-go/pkg/sourcer/examples/simple_source go 1.20 require ( - github.com/numaproj/numaflow-go v0.5.1 + github.com/numaproj/numaflow-go v0.5.3-0.20231208052532-80d6321d6ee7 github.com/stretchr/testify v1.8.1 ) diff --git a/pkg/sourcer/examples/simple_source/go.sum b/pkg/sourcer/examples/simple_source/go.sum index 79cbb35d..6f296c15 100644 --- a/pkg/sourcer/examples/simple_source/go.sum +++ b/pkg/sourcer/examples/simple_source/go.sum @@ -10,8 +10,10 @@ github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/numaproj/numaflow-go v0.5.1 h1:mvala+EmlrRtI20cr1y928zR7dO/HKUJsLai7vISHEA= -github.com/numaproj/numaflow-go v0.5.1/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= +github.com/numaproj/numaflow-go v0.5.3-0.20231207034843-0efe53f69ea4 h1:1/kDlvNFl1V0hki4NA4OwgzXRRMCNzYcuaTUYXXzfmU= +github.com/numaproj/numaflow-go v0.5.3-0.20231207034843-0efe53f69ea4/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= +github.com/numaproj/numaflow-go v0.5.3-0.20231208052532-80d6321d6ee7 h1:ZD4bQ41CHYKx1LIDkx1E1nM69YE7T0BFFEJ+nrICwA4= +github.com/numaproj/numaflow-go v0.5.3-0.20231208052532-80d6321d6ee7/go.mod h1:5zwvvREIbqaCPCKsNE1MVjVToD0kvkCh2Z90Izlhw5U= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= diff --git a/pkg/sourcer/examples/simple_source/impl/simple_source.go b/pkg/sourcer/examples/simple_source/impl/simple_source.go index a7ce0c75..bd8c783c 100644 --- a/pkg/sourcer/examples/simple_source/impl/simple_source.go +++ b/pkg/sourcer/examples/simple_source/impl/simple_source.go @@ -56,7 +56,7 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest offsetValue := serializeOffset(s.readIdx) messageCh <- sourcesdk.NewMessage( []byte(strconv.FormatInt(s.readIdx, 10)), - sourcesdk.NewOffset(offsetValue, "0"), + sourcesdk.NewOffsetWithDefaultPartitionId(offsetValue), time.Now()) // Mark the offset as to be acked, and increment the read index. s.toAckSet[s.readIdx] = struct{}{} @@ -72,6 +72,10 @@ func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) { } } +func (s *SimpleSource) Partitions(_ context.Context) []int32 { + return sourcesdk.DefaultPartitions() +} + func serializeOffset(idx int64) []byte { return []byte(strconv.FormatInt(idx, 10)) } diff --git a/pkg/sourcer/interface.go b/pkg/sourcer/interface.go index 9e7a8540..01f78515 100644 --- a/pkg/sourcer/interface.go +++ b/pkg/sourcer/interface.go @@ -18,6 +18,12 @@ type Sourcer interface { // When the return value is negative, it indicates the pending information is not available. // With pending information being not available, the Numaflow platform doesn't auto-scale the source. Pending(ctx context.Context) int64 + // Partitions returns the partitions associated with the source, will be used by the platform to determine + // the partitions to which the watermark should be published. If the source doesn't have partitions, + // DefaultPartitions() can be used to return the default partitions. + // In most cases, the DefaultPartitions() should be enough; the cases where we need to implement custom Partitions() + // is in a case like Kafka, where a reader can read from multiple Kafka partitions. + Partitions(ctx context.Context) []int32 } // ReadRequest is the interface of read request. diff --git a/pkg/sourcer/message.go b/pkg/sourcer/message.go index 82bf955e..7bdb26a7 100644 --- a/pkg/sourcer/message.go +++ b/pkg/sourcer/message.go @@ -1,6 +1,13 @@ package sourcer -import "time" +import ( + "os" + "strconv" + "time" +) + +// create default partition id from the environment variable "NUMAFLOW_REPLICA" +var defaultPartitionId, _ = strconv.Atoi(os.Getenv("NUMAFLOW_REPLICA")) // Message is used to wrap the data return by UDSource type Message struct { @@ -43,20 +50,36 @@ func (m Message) EventTime() time.Time { type Offset struct { value []byte - partitionId string + partitionId int32 } // NewOffset creates an Offset with value and partition id -func NewOffset(value []byte, partitionId string) Offset { +func NewOffset(value []byte, partitionId int32) Offset { return Offset{value: value, partitionId: partitionId} } +// NewOffsetWithDefaultPartitionId creates an Offset with value and default partition id. This +// function can be used if you use DefaultPartitions() to implement the Sourcer interface. +// For most cases, this function can be used as long as the source does not have a concept of partitions. +// If you need to implement a custom partition, use `NewOffset`. +func NewOffsetWithDefaultPartitionId(value []byte) Offset { + return Offset{value: value, partitionId: DefaultPartitions()[0]} +} + +// DefaultPartitions returns default partitions for the source. +// It can be used in the Partitions() function of the Sourcer interface only +// if the source doesn't have partitions. DefaultPartition will be the pod replica +// index of the source. +func DefaultPartitions() []int32 { + return []int32{int32(defaultPartitionId)} +} + // Value returns value of the offset func (o Offset) Value() []byte { return o.value } // PartitionId returns partition id of the offset -func (o Offset) PartitionId() string { +func (o Offset) PartitionId() int32 { return o.partitionId } diff --git a/pkg/sourcer/server_test.go b/pkg/sourcer/server_test.go index 42067ea6..6ace5d5f 100644 --- a/pkg/sourcer/server_test.go +++ b/pkg/sourcer/server_test.go @@ -23,6 +23,10 @@ func (ts TestNoopSource) Pending(ctx context.Context) int64 { return 0 } +func (ts TestNoopSource) Partitions(ctx context.Context) []int32 { + return []int32{0} +} + func TestServer_Start(t *testing.T) { socketFile, _ := os.CreateTemp("/tmp", "numaflow-test.sock") defer func() { diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 848a5b69..962691b1 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -111,3 +111,12 @@ func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb Result: &sourcepb.AckResponse_Result{}, }, nil } + +func (fs *Service) PartitionsFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PartitionsResponse, error) { + partitions := fs.Source.Partitions(ctx) + return &sourcepb.PartitionsResponse{ + Result: &sourcepb.PartitionsResponse_Result{ + Partitions: partitions, + }, + }, nil +} diff --git a/pkg/sourcer/service_test.go b/pkg/sourcer/service_test.go index 29f6bb2c..1f340777 100644 --- a/pkg/sourcer/service_test.go +++ b/pkg/sourcer/service_test.go @@ -16,15 +16,16 @@ import ( sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" ) -var TestEventTime = time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) -var TestKey = "test-key" -var TestPendingNumber int64 = 123 +var testEventTime = time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) +var testKey = "test-key" +var testPendingNumber int64 = 123 +var testPartitions = []int32{1, 3, 5} type TestSource struct{} func (ts TestSource) Read(_ context.Context, _ ReadRequest, messageCh chan<- Message) { - msg := NewMessage([]byte(`test`), Offset{}, TestEventTime) - messageCh <- msg.WithKeys([]string{TestKey}) + msg := NewMessage([]byte(`test`), Offset{}, testEventTime) + messageCh <- msg.WithKeys([]string{testKey}) } func (ts TestSource) Ack(_ context.Context, _ AckRequest) { @@ -33,7 +34,11 @@ func (ts TestSource) Ack(_ context.Context, _ AckRequest) { } func (ts TestSource) Pending(_ context.Context) int64 { - return TestPendingNumber + return testPendingNumber +} + +func (ts TestSource) Partitions(_ context.Context) []int32 { + return testPartitions } func TestService_IsReady(t *testing.T) { @@ -113,8 +118,8 @@ func TestService_ReadFn(t *testing.T) { Result: &sourcepb.ReadResponse_Result{ Payload: []byte(`test`), Offset: &sourcepb.Offset{}, - EventTime: timestamppb.New(TestEventTime), - Keys: []string{TestKey}, + EventTime: timestamppb.New(testEventTime), + Keys: []string{testKey}, }, }, }, @@ -133,8 +138,8 @@ func TestService_ReadFn(t *testing.T) { Result: &sourcepb.ReadResponse_Result{ Payload: []byte(`test`), Offset: &sourcepb.Offset{}, - EventTime: timestamppb.New(TestEventTime), - Keys: []string{TestKey}, + EventTime: timestamppb.New(testEventTime), + Keys: []string{testKey}, }, }, }, @@ -193,7 +198,7 @@ func TestService_AckFn(t *testing.T) { Request: &sourcepb.AckRequest_Request{ Offsets: []*sourcepb.Offset{ { - PartitionId: "0", + PartitionId: 0, Offset: []byte("test"), }, }, @@ -211,7 +216,19 @@ func TestService_PendingFn(t *testing.T) { got, err := fs.PendingFn(ctx, &emptypb.Empty{}) assert.Equal(t, got, &sourcepb.PendingResponse{ Result: &sourcepb.PendingResponse_Result{ - Count: TestPendingNumber, + Count: testPendingNumber, + }, + }) + assert.NoError(t, err) +} + +func TestService_PartitionsFn(t *testing.T) { + fs := &Service{Source: TestSource{}} + ctx := context.Background() + got, err := fs.PartitionsFn(ctx, &emptypb.Empty{}) + assert.EqualValues(t, got, &sourcepb.PartitionsResponse{ + Result: &sourcepb.PartitionsResponse_Result{ + Partitions: testPartitions, }, }) assert.NoError(t, err)