diff --git a/hack/protogen.sh b/hack/protogen.sh index 7b91548b..54275181 100755 --- a/hack/protogen.sh +++ b/hack/protogen.sh @@ -37,5 +37,6 @@ curl -Ls -o ${REPO_ROOT}/dist/sideinput.proto ${REMOTE_URL_PRE}/sideinput/v1/sid curl -Ls -o ${REPO_ROOT}/dist/sink.proto ${REMOTE_URL_PRE}/sink/v1/sink.proto curl -Ls -o ${REPO_ROOT}/dist/source.proto ${REMOTE_URL_PRE}/source/v1/source.proto curl -Ls -o ${REPO_ROOT}/dist/transform.proto ${REMOTE_URL_PRE}/sourcetransform/v1/transform.proto +curl -Ls -o ${REPO_ROOT}/dist/sessionreduce.proto ${REMOTE_URL_PRE}/sessionreduce/v1/sessionreduce.proto protoc --go_out=module=github.com/numaproj/numaflow-go:. --go-grpc_out=module=github.com/numaproj/numaflow-go:. -I ${REPO_ROOT}/dist $(find ${REPO_ROOT}/dist -name '*.proto') diff --git a/pkg/apis/proto/reduce/v1/reduce.pb.go b/pkg/apis/proto/reduce/v1/reduce.pb.go index 5068da54..9b78cb52 100644 --- a/pkg/apis/proto/reduce/v1/reduce.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce.pb.go @@ -37,6 +37,55 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) +type ReduceRequest_WindowOperation_Event int32 + +const ( + ReduceRequest_WindowOperation_OPEN ReduceRequest_WindowOperation_Event = 0 + ReduceRequest_WindowOperation_CLOSE ReduceRequest_WindowOperation_Event = 1 + ReduceRequest_WindowOperation_APPEND ReduceRequest_WindowOperation_Event = 4 +) + +// Enum value maps for ReduceRequest_WindowOperation_Event. +var ( + ReduceRequest_WindowOperation_Event_name = map[int32]string{ + 0: "OPEN", + 1: "CLOSE", + 4: "APPEND", + } + ReduceRequest_WindowOperation_Event_value = map[string]int32{ + "OPEN": 0, + "CLOSE": 1, + "APPEND": 4, + } +) + +func (x ReduceRequest_WindowOperation_Event) Enum() *ReduceRequest_WindowOperation_Event { + p := new(ReduceRequest_WindowOperation_Event) + *p = x + return p +} + +func (x ReduceRequest_WindowOperation_Event) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (ReduceRequest_WindowOperation_Event) Descriptor() protoreflect.EnumDescriptor { + return file_reduce_proto_enumTypes[0].Descriptor() +} + +func (ReduceRequest_WindowOperation_Event) Type() protoreflect.EnumType { + return &file_reduce_proto_enumTypes[0] +} + +func (x ReduceRequest_WindowOperation_Event) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use ReduceRequest_WindowOperation_Event.Descriptor instead. +func (ReduceRequest_WindowOperation_Event) EnumDescriptor() ([]byte, []int) { + return file_reduce_proto_rawDescGZIP(), []int{0, 0, 0} +} + // * // ReduceRequest represents a request element. type ReduceRequest struct { @@ -44,10 +93,8 @@ type ReduceRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Payload *ReduceRequest_Payload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Operation *ReduceRequest_WindowOperation `protobuf:"bytes,2,opt,name=operation,proto3" json:"operation,omitempty"` } func (x *ReduceRequest) Reset() { @@ -82,34 +129,85 @@ func (*ReduceRequest) Descriptor() ([]byte, []int) { return file_reduce_proto_rawDescGZIP(), []int{0} } -func (x *ReduceRequest) GetKeys() []string { +func (x *ReduceRequest) GetPayload() *ReduceRequest_Payload { if x != nil { - return x.Keys + return x.Payload } return nil } -func (x *ReduceRequest) GetValue() []byte { +func (x *ReduceRequest) GetOperation() *ReduceRequest_WindowOperation { if x != nil { - return x.Value + return x.Operation } return nil } -func (x *ReduceRequest) GetEventTime() *timestamppb.Timestamp { +// Window represents a window. +// Since the client doesn't track keys, window doesn't have a keys field. +type Window struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Start *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` + Slot string `protobuf:"bytes,3,opt,name=slot,proto3" json:"slot,omitempty"` +} + +func (x *Window) Reset() { + *x = Window{} + if protoimpl.UnsafeEnabled { + mi := &file_reduce_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Window) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Window) ProtoMessage() {} + +func (x *Window) ProtoReflect() protoreflect.Message { + mi := &file_reduce_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Window.ProtoReflect.Descriptor instead. +func (*Window) Descriptor() ([]byte, []int) { + return file_reduce_proto_rawDescGZIP(), []int{1} +} + +func (x *Window) GetStart() *timestamppb.Timestamp { if x != nil { - return x.EventTime + return x.Start } return nil } -func (x *ReduceRequest) GetWatermark() *timestamppb.Timestamp { +func (x *Window) GetEnd() *timestamppb.Timestamp { if x != nil { - return x.Watermark + return x.End } return nil } +func (x *Window) GetSlot() string { + if x != nil { + return x.Slot + } + return "" +} + // * // ReduceResponse represents a response element. type ReduceResponse struct { @@ -117,13 +215,17 @@ type ReduceResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Results []*ReduceResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + Result *ReduceResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` + // window represents a window to which the result belongs. + Window *Window `protobuf:"bytes,2,opt,name=window,proto3" json:"window,omitempty"` + // EOF represents the end of the response for a window. + EOF bool `protobuf:"varint,3,opt,name=EOF,proto3" json:"EOF,omitempty"` } func (x *ReduceResponse) Reset() { *x = ReduceResponse{} if protoimpl.UnsafeEnabled { - mi := &file_reduce_proto_msgTypes[1] + mi := &file_reduce_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -136,7 +238,7 @@ func (x *ReduceResponse) String() string { func (*ReduceResponse) ProtoMessage() {} func (x *ReduceResponse) ProtoReflect() protoreflect.Message { - mi := &file_reduce_proto_msgTypes[1] + mi := &file_reduce_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -149,16 +251,30 @@ func (x *ReduceResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReduceResponse.ProtoReflect.Descriptor instead. func (*ReduceResponse) Descriptor() ([]byte, []int) { - return file_reduce_proto_rawDescGZIP(), []int{1} + return file_reduce_proto_rawDescGZIP(), []int{2} +} + +func (x *ReduceResponse) GetResult() *ReduceResponse_Result { + if x != nil { + return x.Result + } + return nil } -func (x *ReduceResponse) GetResults() []*ReduceResponse_Result { +func (x *ReduceResponse) GetWindow() *Window { if x != nil { - return x.Results + return x.Window } return nil } +func (x *ReduceResponse) GetEOF() bool { + if x != nil { + return x.EOF + } + return false +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -172,7 +288,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_reduce_proto_msgTypes[2] + mi := &file_reduce_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -185,7 +301,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_reduce_proto_msgTypes[2] + mi := &file_reduce_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -198,7 +314,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_reduce_proto_rawDescGZIP(), []int{2} + return file_reduce_proto_rawDescGZIP(), []int{3} } func (x *ReadyResponse) GetReady() bool { @@ -208,6 +324,136 @@ func (x *ReadyResponse) GetReady() bool { return false } +// WindowOperation represents a window operation. +// For Aligned windows, OPEN, APPEND and CLOSE events are sent. +type ReduceRequest_WindowOperation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Event ReduceRequest_WindowOperation_Event `protobuf:"varint,1,opt,name=event,proto3,enum=reduce.v1.ReduceRequest_WindowOperation_Event" json:"event,omitempty"` + Windows []*Window `protobuf:"bytes,2,rep,name=windows,proto3" json:"windows,omitempty"` +} + +func (x *ReduceRequest_WindowOperation) Reset() { + *x = ReduceRequest_WindowOperation{} + if protoimpl.UnsafeEnabled { + mi := &file_reduce_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReduceRequest_WindowOperation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReduceRequest_WindowOperation) ProtoMessage() {} + +func (x *ReduceRequest_WindowOperation) ProtoReflect() protoreflect.Message { + mi := &file_reduce_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReduceRequest_WindowOperation.ProtoReflect.Descriptor instead. +func (*ReduceRequest_WindowOperation) Descriptor() ([]byte, []int) { + return file_reduce_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *ReduceRequest_WindowOperation) GetEvent() ReduceRequest_WindowOperation_Event { + if x != nil { + return x.Event + } + return ReduceRequest_WindowOperation_OPEN +} + +func (x *ReduceRequest_WindowOperation) GetWindows() []*Window { + if x != nil { + return x.Windows + } + return nil +} + +// Payload represents a payload element. +type ReduceRequest_Payload struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` +} + +func (x *ReduceRequest_Payload) Reset() { + *x = ReduceRequest_Payload{} + if protoimpl.UnsafeEnabled { + mi := &file_reduce_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReduceRequest_Payload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReduceRequest_Payload) ProtoMessage() {} + +func (x *ReduceRequest_Payload) ProtoReflect() protoreflect.Message { + mi := &file_reduce_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReduceRequest_Payload.ProtoReflect.Descriptor instead. +func (*ReduceRequest_Payload) Descriptor() ([]byte, []int) { + return file_reduce_proto_rawDescGZIP(), []int{0, 1} +} + +func (x *ReduceRequest_Payload) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *ReduceRequest_Payload) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *ReduceRequest_Payload) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *ReduceRequest_Payload) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +// Result represents a result element. It contains the result of the reduce function. type ReduceResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -221,7 +467,7 @@ type ReduceResponse_Result struct { func (x *ReduceResponse_Result) Reset() { *x = ReduceResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_reduce_proto_msgTypes[3] + mi := &file_reduce_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -234,7 +480,7 @@ func (x *ReduceResponse_Result) String() string { func (*ReduceResponse_Result) ProtoMessage() {} func (x *ReduceResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_reduce_proto_msgTypes[3] + mi := &file_reduce_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -247,7 +493,7 @@ func (x *ReduceResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use ReduceResponse_Result.ProtoReflect.Descriptor instead. func (*ReduceResponse_Result) Descriptor() ([]byte, []int) { - return file_reduce_proto_rawDescGZIP(), []int{1, 0} + return file_reduce_proto_rawDescGZIP(), []int{2, 0} } func (x *ReduceResponse_Result) GetKeys() []string { @@ -279,45 +525,77 @@ var file_reduce_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xae, 0x01, 0x0a, 0x0d, 0x52, 0x65, 0x64, 0x75, - 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, - 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, - 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0x94, 0x01, 0x0a, 0x0e, 0x52, 0x65, 0x64, - 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3a, 0x0a, 0x07, 0x72, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, - 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, - 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, - 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, - 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, - 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0x8a, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x64, 0x75, 0x63, - 0x65, 0x12, 0x43, 0x0a, 0x08, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x46, 0x6e, 0x12, 0x18, 0x2e, - 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, - 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, - 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x72, 0x65, 0x64, 0x75, - 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x5a, 0x0a, 0x1e, 0x69, 0x6f, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, - 0x6f, 0x6a, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x72, 0x65, 0x64, 0x75, - 0x63, 0x65, 0x2e, 0x76, 0x31, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, - 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xef, 0x03, 0x0a, 0x0d, 0x52, 0x65, 0x64, 0x75, + 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3a, 0x0a, 0x07, 0x70, 0x61, 0x79, + 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x65, 0x64, + 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x2e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x07, 0x70, 0x61, + 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x46, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x2e, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0xae, 0x01, + 0x0a, 0x0f, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x12, 0x44, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, + 0x32, 0x2e, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, + 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x57, 0x69, 0x6e, 0x64, 0x6f, + 0x77, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x2b, 0x0a, 0x07, 0x77, 0x69, 0x6e, 0x64, 0x6f, + 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x52, 0x07, 0x77, 0x69, 0x6e, + 0x64, 0x6f, 0x77, 0x73, 0x22, 0x28, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x08, 0x0a, + 0x04, 0x4f, 0x50, 0x45, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x43, 0x4c, 0x4f, 0x53, 0x45, + 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, 0x10, 0x04, 0x1a, 0xa8, + 0x01, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, + 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, + 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, + 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, + 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, + 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0x7c, 0x0a, 0x06, 0x57, 0x69, 0x6e, + 0x64, 0x6f, 0x77, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x03, + 0x65, 0x6e, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x22, 0xcf, 0x01, 0x0a, 0x0e, 0x52, 0x65, 0x64, 0x75, + 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x38, 0x0a, 0x06, 0x72, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x72, 0x65, 0x64, + 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x12, 0x29, 0x0a, 0x06, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x52, 0x06, 0x77, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x12, + 0x10, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x45, 0x4f, + 0x46, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, + 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, + 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, + 0x32, 0x8a, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x12, 0x43, 0x0a, 0x08, 0x52, + 0x65, 0x64, 0x75, 0x63, 0x65, 0x46, 0x6e, 0x12, 0x18, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x19, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, + 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, + 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, + 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x5a, 0x0a, + 0x1e, 0x69, 0x6f, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2e, 0x6e, 0x75, 0x6d, + 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x5a, + 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, + 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, + 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, + 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -332,28 +610,40 @@ func file_reduce_proto_rawDescGZIP() []byte { return file_reduce_proto_rawDescData } -var file_reduce_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_reduce_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_reduce_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_reduce_proto_goTypes = []interface{}{ - (*ReduceRequest)(nil), // 0: reduce.v1.ReduceRequest - (*ReduceResponse)(nil), // 1: reduce.v1.ReduceResponse - (*ReadyResponse)(nil), // 2: reduce.v1.ReadyResponse - (*ReduceResponse_Result)(nil), // 3: reduce.v1.ReduceResponse.Result - (*timestamppb.Timestamp)(nil), // 4: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 5: google.protobuf.Empty + (ReduceRequest_WindowOperation_Event)(0), // 0: reduce.v1.ReduceRequest.WindowOperation.Event + (*ReduceRequest)(nil), // 1: reduce.v1.ReduceRequest + (*Window)(nil), // 2: reduce.v1.Window + (*ReduceResponse)(nil), // 3: reduce.v1.ReduceResponse + (*ReadyResponse)(nil), // 4: reduce.v1.ReadyResponse + (*ReduceRequest_WindowOperation)(nil), // 5: reduce.v1.ReduceRequest.WindowOperation + (*ReduceRequest_Payload)(nil), // 6: reduce.v1.ReduceRequest.Payload + (*ReduceResponse_Result)(nil), // 7: reduce.v1.ReduceResponse.Result + (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty } var file_reduce_proto_depIdxs = []int32{ - 4, // 0: reduce.v1.ReduceRequest.event_time:type_name -> google.protobuf.Timestamp - 4, // 1: reduce.v1.ReduceRequest.watermark:type_name -> google.protobuf.Timestamp - 3, // 2: reduce.v1.ReduceResponse.results:type_name -> reduce.v1.ReduceResponse.Result - 0, // 3: reduce.v1.Reduce.ReduceFn:input_type -> reduce.v1.ReduceRequest - 5, // 4: reduce.v1.Reduce.IsReady:input_type -> google.protobuf.Empty - 1, // 5: reduce.v1.Reduce.ReduceFn:output_type -> reduce.v1.ReduceResponse - 2, // 6: reduce.v1.Reduce.IsReady:output_type -> reduce.v1.ReadyResponse - 5, // [5:7] is the sub-list for method output_type - 3, // [3:5] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 6, // 0: reduce.v1.ReduceRequest.payload:type_name -> reduce.v1.ReduceRequest.Payload + 5, // 1: reduce.v1.ReduceRequest.operation:type_name -> reduce.v1.ReduceRequest.WindowOperation + 8, // 2: reduce.v1.Window.start:type_name -> google.protobuf.Timestamp + 8, // 3: reduce.v1.Window.end:type_name -> google.protobuf.Timestamp + 7, // 4: reduce.v1.ReduceResponse.result:type_name -> reduce.v1.ReduceResponse.Result + 2, // 5: reduce.v1.ReduceResponse.window:type_name -> reduce.v1.Window + 0, // 6: reduce.v1.ReduceRequest.WindowOperation.event:type_name -> reduce.v1.ReduceRequest.WindowOperation.Event + 2, // 7: reduce.v1.ReduceRequest.WindowOperation.windows:type_name -> reduce.v1.Window + 8, // 8: reduce.v1.ReduceRequest.Payload.event_time:type_name -> google.protobuf.Timestamp + 8, // 9: reduce.v1.ReduceRequest.Payload.watermark:type_name -> google.protobuf.Timestamp + 1, // 10: reduce.v1.Reduce.ReduceFn:input_type -> reduce.v1.ReduceRequest + 9, // 11: reduce.v1.Reduce.IsReady:input_type -> google.protobuf.Empty + 3, // 12: reduce.v1.Reduce.ReduceFn:output_type -> reduce.v1.ReduceResponse + 4, // 13: reduce.v1.Reduce.IsReady:output_type -> reduce.v1.ReadyResponse + 12, // [12:14] is the sub-list for method output_type + 10, // [10:12] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_reduce_proto_init() } @@ -375,7 +665,7 @@ func file_reduce_proto_init() { } } file_reduce_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReduceResponse); i { + switch v := v.(*Window); i { case 0: return &v.state case 1: @@ -387,7 +677,7 @@ func file_reduce_proto_init() { } } file_reduce_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadyResponse); i { + switch v := v.(*ReduceResponse); i { case 0: return &v.state case 1: @@ -399,6 +689,42 @@ func file_reduce_proto_init() { } } file_reduce_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_reduce_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReduceRequest_WindowOperation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_reduce_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReduceRequest_Payload); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_reduce_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*ReduceResponse_Result); i { case 0: return &v.state @@ -416,13 +742,14 @@ func file_reduce_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_reduce_proto_rawDesc, - NumEnums: 0, - NumMessages: 4, + NumEnums: 1, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, GoTypes: file_reduce_proto_goTypes, DependencyIndexes: file_reduce_proto_depIdxs, + EnumInfos: file_reduce_proto_enumTypes, MessageInfos: file_reduce_proto_msgTypes, }.Build() File_reduce_proto = out.File diff --git a/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go b/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go new file mode 100644 index 00000000..c057e227 --- /dev/null +++ b/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go @@ -0,0 +1,788 @@ +// +//Copyright 2022 The Numaproj Authors. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.24.3 +// source: sessionreduce.proto + +package v1 + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + emptypb "google.golang.org/protobuf/types/known/emptypb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SessionReduceRequest_WindowOperation_Event int32 + +const ( + SessionReduceRequest_WindowOperation_OPEN SessionReduceRequest_WindowOperation_Event = 0 + SessionReduceRequest_WindowOperation_CLOSE SessionReduceRequest_WindowOperation_Event = 1 + SessionReduceRequest_WindowOperation_EXPAND SessionReduceRequest_WindowOperation_Event = 2 + SessionReduceRequest_WindowOperation_MERGE SessionReduceRequest_WindowOperation_Event = 3 + SessionReduceRequest_WindowOperation_APPEND SessionReduceRequest_WindowOperation_Event = 4 +) + +// Enum value maps for SessionReduceRequest_WindowOperation_Event. +var ( + SessionReduceRequest_WindowOperation_Event_name = map[int32]string{ + 0: "OPEN", + 1: "CLOSE", + 2: "EXPAND", + 3: "MERGE", + 4: "APPEND", + } + SessionReduceRequest_WindowOperation_Event_value = map[string]int32{ + "OPEN": 0, + "CLOSE": 1, + "EXPAND": 2, + "MERGE": 3, + "APPEND": 4, + } +) + +func (x SessionReduceRequest_WindowOperation_Event) Enum() *SessionReduceRequest_WindowOperation_Event { + p := new(SessionReduceRequest_WindowOperation_Event) + *p = x + return p +} + +func (x SessionReduceRequest_WindowOperation_Event) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (SessionReduceRequest_WindowOperation_Event) Descriptor() protoreflect.EnumDescriptor { + return file_sessionreduce_proto_enumTypes[0].Descriptor() +} + +func (SessionReduceRequest_WindowOperation_Event) Type() protoreflect.EnumType { + return &file_sessionreduce_proto_enumTypes[0] +} + +func (x SessionReduceRequest_WindowOperation_Event) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use SessionReduceRequest_WindowOperation_Event.Descriptor instead. +func (SessionReduceRequest_WindowOperation_Event) EnumDescriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{1, 0, 0} +} + +// KeyedWindow represents a window with keys. +// since the client track the keys, we use keyed window. +type KeyedWindow struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Start *timestamppb.Timestamp `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` + Slot string `protobuf:"bytes,3,opt,name=slot,proto3" json:"slot,omitempty"` + Keys []string `protobuf:"bytes,4,rep,name=keys,proto3" json:"keys,omitempty"` +} + +func (x *KeyedWindow) Reset() { + *x = KeyedWindow{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *KeyedWindow) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*KeyedWindow) ProtoMessage() {} + +func (x *KeyedWindow) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use KeyedWindow.ProtoReflect.Descriptor instead. +func (*KeyedWindow) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{0} +} + +func (x *KeyedWindow) GetStart() *timestamppb.Timestamp { + if x != nil { + return x.Start + } + return nil +} + +func (x *KeyedWindow) GetEnd() *timestamppb.Timestamp { + if x != nil { + return x.End + } + return nil +} + +func (x *KeyedWindow) GetSlot() string { + if x != nil { + return x.Slot + } + return "" +} + +func (x *KeyedWindow) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +// * +// SessionReduceRequest represents a request element. +type SessionReduceRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Payload *SessionReduceRequest_Payload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` + Operation *SessionReduceRequest_WindowOperation `protobuf:"bytes,2,opt,name=operation,proto3" json:"operation,omitempty"` +} + +func (x *SessionReduceRequest) Reset() { + *x = SessionReduceRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SessionReduceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SessionReduceRequest) ProtoMessage() {} + +func (x *SessionReduceRequest) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SessionReduceRequest.ProtoReflect.Descriptor instead. +func (*SessionReduceRequest) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{1} +} + +func (x *SessionReduceRequest) GetPayload() *SessionReduceRequest_Payload { + if x != nil { + return x.Payload + } + return nil +} + +func (x *SessionReduceRequest) GetOperation() *SessionReduceRequest_WindowOperation { + if x != nil { + return x.Operation + } + return nil +} + +// * +// SessionReduceResponse represents a response element. +type SessionReduceResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Result *SessionReduceResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` + // keyedWindow represents a window to which the result belongs. + KeyedWindow *KeyedWindow `protobuf:"bytes,2,opt,name=keyedWindow,proto3" json:"keyedWindow,omitempty"` + // EOF represents the end of the response for a window. + EOF bool `protobuf:"varint,3,opt,name=EOF,proto3" json:"EOF,omitempty"` +} + +func (x *SessionReduceResponse) Reset() { + *x = SessionReduceResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SessionReduceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SessionReduceResponse) ProtoMessage() {} + +func (x *SessionReduceResponse) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SessionReduceResponse.ProtoReflect.Descriptor instead. +func (*SessionReduceResponse) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{2} +} + +func (x *SessionReduceResponse) GetResult() *SessionReduceResponse_Result { + if x != nil { + return x.Result + } + return nil +} + +func (x *SessionReduceResponse) GetKeyedWindow() *KeyedWindow { + if x != nil { + return x.KeyedWindow + } + return nil +} + +func (x *SessionReduceResponse) GetEOF() bool { + if x != nil { + return x.EOF + } + return false +} + +// * +// ReadyResponse is the health check result. +type ReadyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Ready bool `protobuf:"varint,1,opt,name=ready,proto3" json:"ready,omitempty"` +} + +func (x *ReadyResponse) Reset() { + *x = ReadyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ReadyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ReadyResponse) ProtoMessage() {} + +func (x *ReadyResponse) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. +func (*ReadyResponse) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{3} +} + +func (x *ReadyResponse) GetReady() bool { + if x != nil { + return x.Ready + } + return false +} + +// WindowOperation represents a window operation. +// For Aligned window values can be one of OPEN, CLOSE, EXPAND, MERGE and APPEND. +type SessionReduceRequest_WindowOperation struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Event SessionReduceRequest_WindowOperation_Event `protobuf:"varint,1,opt,name=event,proto3,enum=sessionreduce.v1.SessionReduceRequest_WindowOperation_Event" json:"event,omitempty"` + KeyedWindows []*KeyedWindow `protobuf:"bytes,2,rep,name=keyedWindows,proto3" json:"keyedWindows,omitempty"` +} + +func (x *SessionReduceRequest_WindowOperation) Reset() { + *x = SessionReduceRequest_WindowOperation{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SessionReduceRequest_WindowOperation) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SessionReduceRequest_WindowOperation) ProtoMessage() {} + +func (x *SessionReduceRequest_WindowOperation) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SessionReduceRequest_WindowOperation.ProtoReflect.Descriptor instead. +func (*SessionReduceRequest_WindowOperation) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *SessionReduceRequest_WindowOperation) GetEvent() SessionReduceRequest_WindowOperation_Event { + if x != nil { + return x.Event + } + return SessionReduceRequest_WindowOperation_OPEN +} + +func (x *SessionReduceRequest_WindowOperation) GetKeyedWindows() []*KeyedWindow { + if x != nil { + return x.KeyedWindows + } + return nil +} + +// Payload represents a payload element. +type SessionReduceRequest_Payload struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` +} + +func (x *SessionReduceRequest_Payload) Reset() { + *x = SessionReduceRequest_Payload{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SessionReduceRequest_Payload) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SessionReduceRequest_Payload) ProtoMessage() {} + +func (x *SessionReduceRequest_Payload) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SessionReduceRequest_Payload.ProtoReflect.Descriptor instead. +func (*SessionReduceRequest_Payload) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{1, 1} +} + +func (x *SessionReduceRequest_Payload) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *SessionReduceRequest_Payload) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *SessionReduceRequest_Payload) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *SessionReduceRequest_Payload) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +// Result represents a result element. It contains the result of the reduce function. +type SessionReduceResponse_Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + Tags []string `protobuf:"bytes,3,rep,name=tags,proto3" json:"tags,omitempty"` +} + +func (x *SessionReduceResponse_Result) Reset() { + *x = SessionReduceResponse_Result{} + if protoimpl.UnsafeEnabled { + mi := &file_sessionreduce_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SessionReduceResponse_Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SessionReduceResponse_Result) ProtoMessage() {} + +func (x *SessionReduceResponse_Result) ProtoReflect() protoreflect.Message { + mi := &file_sessionreduce_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SessionReduceResponse_Result.ProtoReflect.Descriptor instead. +func (*SessionReduceResponse_Result) Descriptor() ([]byte, []int) { + return file_sessionreduce_proto_rawDescGZIP(), []int{2, 0} +} + +func (x *SessionReduceResponse_Result) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *SessionReduceResponse_Result) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *SessionReduceResponse_Result) GetTags() []string { + if x != nil { + return x.Tags + } + return nil +} + +var File_sessionreduce_proto protoreflect.FileDescriptor + +var file_sessionreduce_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x10, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, + 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x95, 0x01, 0x0a, 0x0b, 0x4b, 0x65, 0x79, 0x65, 0x64, 0x57, + 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x12, 0x30, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x2c, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x03, 0x65, 0x6e, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x6c, 0x6f, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, + 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x22, 0xcd, 0x04, + 0x0a, 0x14, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x48, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x12, 0x54, 0x0a, 0x09, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x36, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, 0x64, + 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x57, 0x69, 0x6e, 0x64, + 0x6f, 0x77, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x6f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0xe9, 0x01, 0x0a, 0x0f, 0x57, 0x69, 0x6e, 0x64, 0x6f, + 0x77, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x52, 0x0a, 0x05, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x3c, 0x2e, 0x73, 0x65, 0x73, 0x73, + 0x69, 0x6f, 0x6e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x73, + 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x2e, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x2e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x41, + 0x0a, 0x0c, 0x6b, 0x65, 0x79, 0x65, 0x64, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x73, 0x18, 0x02, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, + 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4b, 0x65, 0x79, 0x65, 0x64, 0x57, 0x69, 0x6e, + 0x64, 0x6f, 0x77, 0x52, 0x0c, 0x6b, 0x65, 0x79, 0x65, 0x64, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, + 0x73, 0x22, 0x3f, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x08, 0x0a, 0x04, 0x4f, 0x50, + 0x45, 0x4e, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x43, 0x4c, 0x4f, 0x53, 0x45, 0x10, 0x01, 0x12, + 0x0a, 0x0a, 0x06, 0x45, 0x58, 0x50, 0x41, 0x4e, 0x44, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x4d, + 0x45, 0x52, 0x47, 0x45, 0x10, 0x03, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x50, 0x50, 0x45, 0x4e, 0x44, + 0x10, 0x04, 0x1a, 0xa8, 0x01, 0x0a, 0x07, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x12, + 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, + 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, + 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, + 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, + 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x22, 0xfa, 0x01, + 0x0a, 0x15, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x46, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, + 0x6f, 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, + 0x3f, 0x0a, 0x0b, 0x6b, 0x65, 0x79, 0x65, 0x64, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, + 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4b, 0x65, 0x79, 0x65, 0x64, 0x57, 0x69, 0x6e, + 0x64, 0x6f, 0x77, 0x52, 0x0b, 0x6b, 0x65, 0x79, 0x65, 0x64, 0x57, 0x69, 0x6e, 0x64, 0x6f, 0x77, + 0x12, 0x10, 0x0a, 0x03, 0x45, 0x4f, 0x46, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x45, + 0x4f, 0x46, 0x1a, 0x46, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x03, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, + 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x32, 0xbb, 0x01, 0x0a, 0x0d, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x64, + 0x75, 0x63, 0x65, 0x12, 0x66, 0x0a, 0x0f, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, + 0x64, 0x75, 0x63, 0x65, 0x46, 0x6e, 0x12, 0x26, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x27, + 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x64, 0x75, 0x63, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x42, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1f, + 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, + 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, + 0x68, 0x0a, 0x25, 0x69, 0x6f, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2e, 0x6e, + 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x72, + 0x65, 0x64, 0x75, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x5a, 0x3f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, + 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, + 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x72, 0x65, 0x64, 0x75, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_sessionreduce_proto_rawDescOnce sync.Once + file_sessionreduce_proto_rawDescData = file_sessionreduce_proto_rawDesc +) + +func file_sessionreduce_proto_rawDescGZIP() []byte { + file_sessionreduce_proto_rawDescOnce.Do(func() { + file_sessionreduce_proto_rawDescData = protoimpl.X.CompressGZIP(file_sessionreduce_proto_rawDescData) + }) + return file_sessionreduce_proto_rawDescData +} + +var file_sessionreduce_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_sessionreduce_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_sessionreduce_proto_goTypes = []interface{}{ + (SessionReduceRequest_WindowOperation_Event)(0), // 0: sessionreduce.v1.SessionReduceRequest.WindowOperation.Event + (*KeyedWindow)(nil), // 1: sessionreduce.v1.KeyedWindow + (*SessionReduceRequest)(nil), // 2: sessionreduce.v1.SessionReduceRequest + (*SessionReduceResponse)(nil), // 3: sessionreduce.v1.SessionReduceResponse + (*ReadyResponse)(nil), // 4: sessionreduce.v1.ReadyResponse + (*SessionReduceRequest_WindowOperation)(nil), // 5: sessionreduce.v1.SessionReduceRequest.WindowOperation + (*SessionReduceRequest_Payload)(nil), // 6: sessionreduce.v1.SessionReduceRequest.Payload + (*SessionReduceResponse_Result)(nil), // 7: sessionreduce.v1.SessionReduceResponse.Result + (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 9: google.protobuf.Empty +} +var file_sessionreduce_proto_depIdxs = []int32{ + 8, // 0: sessionreduce.v1.KeyedWindow.start:type_name -> google.protobuf.Timestamp + 8, // 1: sessionreduce.v1.KeyedWindow.end:type_name -> google.protobuf.Timestamp + 6, // 2: sessionreduce.v1.SessionReduceRequest.payload:type_name -> sessionreduce.v1.SessionReduceRequest.Payload + 5, // 3: sessionreduce.v1.SessionReduceRequest.operation:type_name -> sessionreduce.v1.SessionReduceRequest.WindowOperation + 7, // 4: sessionreduce.v1.SessionReduceResponse.result:type_name -> sessionreduce.v1.SessionReduceResponse.Result + 1, // 5: sessionreduce.v1.SessionReduceResponse.keyedWindow:type_name -> sessionreduce.v1.KeyedWindow + 0, // 6: sessionreduce.v1.SessionReduceRequest.WindowOperation.event:type_name -> sessionreduce.v1.SessionReduceRequest.WindowOperation.Event + 1, // 7: sessionreduce.v1.SessionReduceRequest.WindowOperation.keyedWindows:type_name -> sessionreduce.v1.KeyedWindow + 8, // 8: sessionreduce.v1.SessionReduceRequest.Payload.event_time:type_name -> google.protobuf.Timestamp + 8, // 9: sessionreduce.v1.SessionReduceRequest.Payload.watermark:type_name -> google.protobuf.Timestamp + 2, // 10: sessionreduce.v1.SessionReduce.SessionReduceFn:input_type -> sessionreduce.v1.SessionReduceRequest + 9, // 11: sessionreduce.v1.SessionReduce.IsReady:input_type -> google.protobuf.Empty + 3, // 12: sessionreduce.v1.SessionReduce.SessionReduceFn:output_type -> sessionreduce.v1.SessionReduceResponse + 4, // 13: sessionreduce.v1.SessionReduce.IsReady:output_type -> sessionreduce.v1.ReadyResponse + 12, // [12:14] is the sub-list for method output_type + 10, // [10:12] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name +} + +func init() { file_sessionreduce_proto_init() } +func file_sessionreduce_proto_init() { + if File_sessionreduce_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_sessionreduce_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*KeyedWindow); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sessionreduce_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionReduceRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sessionreduce_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionReduceResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sessionreduce_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ReadyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sessionreduce_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionReduceRequest_WindowOperation); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sessionreduce_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionReduceRequest_Payload); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_sessionreduce_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SessionReduceResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_sessionreduce_proto_rawDesc, + NumEnums: 1, + NumMessages: 7, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_sessionreduce_proto_goTypes, + DependencyIndexes: file_sessionreduce_proto_depIdxs, + EnumInfos: file_sessionreduce_proto_enumTypes, + MessageInfos: file_sessionreduce_proto_msgTypes, + }.Build() + File_sessionreduce_proto = out.File + file_sessionreduce_proto_rawDesc = nil + file_sessionreduce_proto_goTypes = nil + file_sessionreduce_proto_depIdxs = nil +} diff --git a/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go b/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go new file mode 100644 index 00000000..2d505bbd --- /dev/null +++ b/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go @@ -0,0 +1,199 @@ +// +//Copyright 2022 The Numaproj Authors. +// +//Licensed under the Apache License, Version 2.0 (the "License"); +//you may not use this file except in compliance with the License. +//You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +//Unless required by applicable law or agreed to in writing, software +//distributed under the License is distributed on an "AS IS" BASIS, +//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//See the License for the specific language governing permissions and +//limitations under the License. + +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v4.24.3 +// source: sessionreduce.proto + +package v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + emptypb "google.golang.org/protobuf/types/known/emptypb" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + SessionReduce_SessionReduceFn_FullMethodName = "/sessionreduce.v1.SessionReduce/SessionReduceFn" + SessionReduce_IsReady_FullMethodName = "/sessionreduce.v1.SessionReduce/IsReady" +) + +// SessionReduceClient is the client API for SessionReduce service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SessionReduceClient interface { + // SessionReduceFn applies a reduce function to a request stream. + SessionReduceFn(ctx context.Context, opts ...grpc.CallOption) (SessionReduce_SessionReduceFnClient, error) + // IsReady is the heartbeat endpoint for gRPC. + IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) +} + +type sessionReduceClient struct { + cc grpc.ClientConnInterface +} + +func NewSessionReduceClient(cc grpc.ClientConnInterface) SessionReduceClient { + return &sessionReduceClient{cc} +} + +func (c *sessionReduceClient) SessionReduceFn(ctx context.Context, opts ...grpc.CallOption) (SessionReduce_SessionReduceFnClient, error) { + stream, err := c.cc.NewStream(ctx, &SessionReduce_ServiceDesc.Streams[0], SessionReduce_SessionReduceFn_FullMethodName, opts...) + if err != nil { + return nil, err + } + x := &sessionReduceSessionReduceFnClient{stream} + return x, nil +} + +type SessionReduce_SessionReduceFnClient interface { + Send(*SessionReduceRequest) error + Recv() (*SessionReduceResponse, error) + grpc.ClientStream +} + +type sessionReduceSessionReduceFnClient struct { + grpc.ClientStream +} + +func (x *sessionReduceSessionReduceFnClient) Send(m *SessionReduceRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sessionReduceSessionReduceFnClient) Recv() (*SessionReduceResponse, error) { + m := new(SessionReduceResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *sessionReduceClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { + out := new(ReadyResponse) + err := c.cc.Invoke(ctx, SessionReduce_IsReady_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SessionReduceServer is the server API for SessionReduce service. +// All implementations must embed UnimplementedSessionReduceServer +// for forward compatibility +type SessionReduceServer interface { + // SessionReduceFn applies a reduce function to a request stream. + SessionReduceFn(SessionReduce_SessionReduceFnServer) error + // IsReady is the heartbeat endpoint for gRPC. + IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) + mustEmbedUnimplementedSessionReduceServer() +} + +// UnimplementedSessionReduceServer must be embedded to have forward compatible implementations. +type UnimplementedSessionReduceServer struct { +} + +func (UnimplementedSessionReduceServer) SessionReduceFn(SessionReduce_SessionReduceFnServer) error { + return status.Errorf(codes.Unimplemented, "method SessionReduceFn not implemented") +} +func (UnimplementedSessionReduceServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") +} +func (UnimplementedSessionReduceServer) mustEmbedUnimplementedSessionReduceServer() {} + +// UnsafeSessionReduceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SessionReduceServer will +// result in compilation errors. +type UnsafeSessionReduceServer interface { + mustEmbedUnimplementedSessionReduceServer() +} + +func RegisterSessionReduceServer(s grpc.ServiceRegistrar, srv SessionReduceServer) { + s.RegisterService(&SessionReduce_ServiceDesc, srv) +} + +func _SessionReduce_SessionReduceFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SessionReduceServer).SessionReduceFn(&sessionReduceSessionReduceFnServer{stream}) +} + +type SessionReduce_SessionReduceFnServer interface { + Send(*SessionReduceResponse) error + Recv() (*SessionReduceRequest, error) + grpc.ServerStream +} + +type sessionReduceSessionReduceFnServer struct { + grpc.ServerStream +} + +func (x *sessionReduceSessionReduceFnServer) Send(m *SessionReduceResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sessionReduceSessionReduceFnServer) Recv() (*SessionReduceRequest, error) { + m := new(SessionReduceRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _SessionReduce_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SessionReduceServer).IsReady(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: SessionReduce_IsReady_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SessionReduceServer).IsReady(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +// SessionReduce_ServiceDesc is the grpc.ServiceDesc for SessionReduce service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SessionReduce_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "sessionreduce.v1.SessionReduce", + HandlerType: (*SessionReduceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "IsReady", + Handler: _SessionReduce_IsReady_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "SessionReduceFn", + Handler: _SessionReduce_SessionReduceFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "sessionreduce.proto", +} diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index 46ad0541..dd3fa430 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -339,6 +339,55 @@ func (x *PendingResponse) GetResult() *PendingResponse_Result { return nil } +// PartitionsResponse is the response for the partitions request. +type PartitionsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required field holding the result. + Result *PartitionsResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` +} + +func (x *PartitionsResponse) Reset() { + *x = PartitionsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_source_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PartitionsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PartitionsResponse) ProtoMessage() {} + +func (x *PartitionsResponse) ProtoReflect() protoreflect.Message { + mi := &file_source_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PartitionsResponse.ProtoReflect.Descriptor instead. +func (*PartitionsResponse) Descriptor() ([]byte, []int) { + return file_source_proto_rawDescGZIP(), []int{6} +} + +func (x *PartitionsResponse) GetResult() *PartitionsResponse_Result { + if x != nil { + return x.Result + } + return nil +} + // Offset is the offset of the datum. type Offset struct { state protoimpl.MessageState @@ -353,13 +402,13 @@ type Offset struct { // Optional partition_id indicates which partition of the source the datum belongs to. // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. - PartitionId string `protobuf:"bytes,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` + PartitionId int32 `protobuf:"varint,2,opt,name=partition_id,json=partitionId,proto3" json:"partition_id,omitempty"` } func (x *Offset) Reset() { *x = Offset{} if protoimpl.UnsafeEnabled { - mi := &file_source_proto_msgTypes[6] + mi := &file_source_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -372,7 +421,7 @@ func (x *Offset) String() string { func (*Offset) ProtoMessage() {} func (x *Offset) ProtoReflect() protoreflect.Message { - mi := &file_source_proto_msgTypes[6] + mi := &file_source_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -385,7 +434,7 @@ func (x *Offset) ProtoReflect() protoreflect.Message { // Deprecated: Use Offset.ProtoReflect.Descriptor instead. func (*Offset) Descriptor() ([]byte, []int) { - return file_source_proto_rawDescGZIP(), []int{6} + return file_source_proto_rawDescGZIP(), []int{7} } func (x *Offset) GetOffset() []byte { @@ -395,11 +444,11 @@ func (x *Offset) GetOffset() []byte { return nil } -func (x *Offset) GetPartitionId() string { +func (x *Offset) GetPartitionId() int32 { if x != nil { return x.PartitionId } - return "" + return 0 } type ReadRequest_Request struct { @@ -418,7 +467,7 @@ type ReadRequest_Request struct { func (x *ReadRequest_Request) Reset() { *x = ReadRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_source_proto_msgTypes[7] + mi := &file_source_proto_msgTypes[8] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -431,7 +480,7 @@ func (x *ReadRequest_Request) String() string { func (*ReadRequest_Request) ProtoMessage() {} func (x *ReadRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_source_proto_msgTypes[7] + mi := &file_source_proto_msgTypes[8] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -482,7 +531,7 @@ type ReadResponse_Result struct { func (x *ReadResponse_Result) Reset() { *x = ReadResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_source_proto_msgTypes[8] + mi := &file_source_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -495,7 +544,7 @@ func (x *ReadResponse_Result) String() string { func (*ReadResponse_Result) ProtoMessage() {} func (x *ReadResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_source_proto_msgTypes[8] + mi := &file_source_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -554,7 +603,7 @@ type AckRequest_Request struct { func (x *AckRequest_Request) Reset() { *x = AckRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_source_proto_msgTypes[9] + mi := &file_source_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -567,7 +616,7 @@ func (x *AckRequest_Request) String() string { func (*AckRequest_Request) ProtoMessage() {} func (x *AckRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_source_proto_msgTypes[9] + mi := &file_source_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -602,7 +651,7 @@ type AckResponse_Result struct { func (x *AckResponse_Result) Reset() { *x = AckResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_source_proto_msgTypes[10] + mi := &file_source_proto_msgTypes[11] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -615,7 +664,7 @@ func (x *AckResponse_Result) String() string { func (*AckResponse_Result) ProtoMessage() {} func (x *AckResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_source_proto_msgTypes[10] + mi := &file_source_proto_msgTypes[11] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -651,7 +700,7 @@ type PendingResponse_Result struct { func (x *PendingResponse_Result) Reset() { *x = PendingResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_source_proto_msgTypes[11] + mi := &file_source_proto_msgTypes[12] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -664,7 +713,7 @@ func (x *PendingResponse_Result) String() string { func (*PendingResponse_Result) ProtoMessage() {} func (x *PendingResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_source_proto_msgTypes[11] + mi := &file_source_proto_msgTypes[12] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -687,6 +736,54 @@ func (x *PendingResponse_Result) GetCount() int64 { return 0 } +type PartitionsResponse_Result struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required field holding the list of partitions. + Partitions []int32 `protobuf:"varint,1,rep,packed,name=partitions,proto3" json:"partitions,omitempty"` +} + +func (x *PartitionsResponse_Result) Reset() { + *x = PartitionsResponse_Result{} + if protoimpl.UnsafeEnabled { + mi := &file_source_proto_msgTypes[13] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *PartitionsResponse_Result) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*PartitionsResponse_Result) ProtoMessage() {} + +func (x *PartitionsResponse_Result) ProtoReflect() protoreflect.Message { + mi := &file_source_proto_msgTypes[13] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use PartitionsResponse_Result.ProtoReflect.Descriptor instead. +func (*PartitionsResponse_Result) Descriptor() ([]byte, []int) { + return file_source_proto_rawDescGZIP(), []int{6, 0} +} + +func (x *PartitionsResponse_Result) GetPartitions() []int32 { + if x != nil { + return x.Partitions + } + return nil +} + var File_source_proto protoreflect.FileDescriptor var file_source_proto_rawDesc = []byte{ @@ -745,33 +842,45 @@ var file_source_proto_rawDesc = []byte{ 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x1e, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, - 0x22, 0x43, 0x0a, 0x06, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, - 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, - 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, - 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xfb, 0x01, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x12, 0x3b, 0x0a, 0x06, 0x52, 0x65, 0x61, 0x64, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, - 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x36, 0x0a, - 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, 0x12, 0x15, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, - 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x09, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, - 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, - 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x42, 0x5a, 0x0a, 0x1e, 0x69, 0x6f, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, - 0x6f, 0x6a, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x2e, 0x76, 0x31, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, - 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, - 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x7c, 0x0a, 0x12, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3c, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x24, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x1a, 0x28, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x1e, + 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x05, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x22, 0x43, + 0x0a, 0x06, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, + 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x32, 0xc2, 0x02, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3b, + 0x0a, 0x06, 0x52, 0x65, 0x61, 0x64, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, + 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x36, 0x0a, 0x05, 0x41, + 0x63, 0x6b, 0x46, 0x6e, 0x12, 0x15, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x3f, 0x0a, 0x09, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, + 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0c, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1d, 0x2e, 0x73, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, + 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, + 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x5a, 0x0a, 0x1e, 0x69, 0x6f, 0x2e, 0x6e, + 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2e, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, + 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, + 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, + 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -786,46 +895,51 @@ func file_source_proto_rawDescGZIP() []byte { return file_source_proto_rawDescData } -var file_source_proto_msgTypes = make([]protoimpl.MessageInfo, 12) +var file_source_proto_msgTypes = make([]protoimpl.MessageInfo, 14) var file_source_proto_goTypes = []interface{}{ - (*ReadRequest)(nil), // 0: source.v1.ReadRequest - (*ReadResponse)(nil), // 1: source.v1.ReadResponse - (*AckRequest)(nil), // 2: source.v1.AckRequest - (*AckResponse)(nil), // 3: source.v1.AckResponse - (*ReadyResponse)(nil), // 4: source.v1.ReadyResponse - (*PendingResponse)(nil), // 5: source.v1.PendingResponse - (*Offset)(nil), // 6: source.v1.Offset - (*ReadRequest_Request)(nil), // 7: source.v1.ReadRequest.Request - (*ReadResponse_Result)(nil), // 8: source.v1.ReadResponse.Result - (*AckRequest_Request)(nil), // 9: source.v1.AckRequest.Request - (*AckResponse_Result)(nil), // 10: source.v1.AckResponse.Result - (*PendingResponse_Result)(nil), // 11: source.v1.PendingResponse.Result - (*timestamppb.Timestamp)(nil), // 12: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 13: google.protobuf.Empty + (*ReadRequest)(nil), // 0: source.v1.ReadRequest + (*ReadResponse)(nil), // 1: source.v1.ReadResponse + (*AckRequest)(nil), // 2: source.v1.AckRequest + (*AckResponse)(nil), // 3: source.v1.AckResponse + (*ReadyResponse)(nil), // 4: source.v1.ReadyResponse + (*PendingResponse)(nil), // 5: source.v1.PendingResponse + (*PartitionsResponse)(nil), // 6: source.v1.PartitionsResponse + (*Offset)(nil), // 7: source.v1.Offset + (*ReadRequest_Request)(nil), // 8: source.v1.ReadRequest.Request + (*ReadResponse_Result)(nil), // 9: source.v1.ReadResponse.Result + (*AckRequest_Request)(nil), // 10: source.v1.AckRequest.Request + (*AckResponse_Result)(nil), // 11: source.v1.AckResponse.Result + (*PendingResponse_Result)(nil), // 12: source.v1.PendingResponse.Result + (*PartitionsResponse_Result)(nil), // 13: source.v1.PartitionsResponse.Result + (*timestamppb.Timestamp)(nil), // 14: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 15: google.protobuf.Empty } var file_source_proto_depIdxs = []int32{ - 7, // 0: source.v1.ReadRequest.request:type_name -> source.v1.ReadRequest.Request - 8, // 1: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result - 9, // 2: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request - 10, // 3: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result - 11, // 4: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result - 6, // 5: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset - 12, // 6: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp - 6, // 7: source.v1.AckRequest.Request.offsets:type_name -> source.v1.Offset - 13, // 8: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty - 0, // 9: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest - 2, // 10: source.v1.Source.AckFn:input_type -> source.v1.AckRequest - 13, // 11: source.v1.Source.PendingFn:input_type -> google.protobuf.Empty - 13, // 12: source.v1.Source.IsReady:input_type -> google.protobuf.Empty - 1, // 13: source.v1.Source.ReadFn:output_type -> source.v1.ReadResponse - 3, // 14: source.v1.Source.AckFn:output_type -> source.v1.AckResponse - 5, // 15: source.v1.Source.PendingFn:output_type -> source.v1.PendingResponse - 4, // 16: source.v1.Source.IsReady:output_type -> source.v1.ReadyResponse - 13, // [13:17] is the sub-list for method output_type - 9, // [9:13] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 8, // 0: source.v1.ReadRequest.request:type_name -> source.v1.ReadRequest.Request + 9, // 1: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result + 10, // 2: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request + 11, // 3: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result + 12, // 4: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result + 13, // 5: source.v1.PartitionsResponse.result:type_name -> source.v1.PartitionsResponse.Result + 7, // 6: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset + 14, // 7: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp + 7, // 8: source.v1.AckRequest.Request.offsets:type_name -> source.v1.Offset + 15, // 9: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty + 0, // 10: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest + 2, // 11: source.v1.Source.AckFn:input_type -> source.v1.AckRequest + 15, // 12: source.v1.Source.PendingFn:input_type -> google.protobuf.Empty + 15, // 13: source.v1.Source.PartitionsFn:input_type -> google.protobuf.Empty + 15, // 14: source.v1.Source.IsReady:input_type -> google.protobuf.Empty + 1, // 15: source.v1.Source.ReadFn:output_type -> source.v1.ReadResponse + 3, // 16: source.v1.Source.AckFn:output_type -> source.v1.AckResponse + 5, // 17: source.v1.Source.PendingFn:output_type -> source.v1.PendingResponse + 6, // 18: source.v1.Source.PartitionsFn:output_type -> source.v1.PartitionsResponse + 4, // 19: source.v1.Source.IsReady:output_type -> source.v1.ReadyResponse + 15, // [15:20] is the sub-list for method output_type + 10, // [10:15] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_source_proto_init() } @@ -907,7 +1021,7 @@ func file_source_proto_init() { } } file_source_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Offset); i { + switch v := v.(*PartitionsResponse); i { case 0: return &v.state case 1: @@ -919,7 +1033,7 @@ func file_source_proto_init() { } } file_source_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadRequest_Request); i { + switch v := v.(*Offset); i { case 0: return &v.state case 1: @@ -931,7 +1045,7 @@ func file_source_proto_init() { } } file_source_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ReadResponse_Result); i { + switch v := v.(*ReadRequest_Request); i { case 0: return &v.state case 1: @@ -943,7 +1057,7 @@ func file_source_proto_init() { } } file_source_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AckRequest_Request); i { + switch v := v.(*ReadResponse_Result); i { case 0: return &v.state case 1: @@ -955,7 +1069,7 @@ func file_source_proto_init() { } } file_source_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*AckResponse_Result); i { + switch v := v.(*AckRequest_Request); i { case 0: return &v.state case 1: @@ -967,6 +1081,18 @@ func file_source_proto_init() { } } file_source_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*AckResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_source_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*PendingResponse_Result); i { case 0: return &v.state @@ -978,6 +1104,18 @@ func file_source_proto_init() { return nil } } + file_source_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*PartitionsResponse_Result); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -985,7 +1123,7 @@ func file_source_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_source_proto_rawDesc, NumEnums: 0, - NumMessages: 12, + NumMessages: 14, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/source/v1/source_grpc.pb.go b/pkg/apis/proto/source/v1/source_grpc.pb.go index 2e85f96f..fa7ce62f 100644 --- a/pkg/apis/proto/source/v1/source_grpc.pb.go +++ b/pkg/apis/proto/source/v1/source_grpc.pb.go @@ -35,10 +35,11 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( - Source_ReadFn_FullMethodName = "/source.v1.Source/ReadFn" - Source_AckFn_FullMethodName = "/source.v1.Source/AckFn" - Source_PendingFn_FullMethodName = "/source.v1.Source/PendingFn" - Source_IsReady_FullMethodName = "/source.v1.Source/IsReady" + Source_ReadFn_FullMethodName = "/source.v1.Source/ReadFn" + Source_AckFn_FullMethodName = "/source.v1.Source/AckFn" + Source_PendingFn_FullMethodName = "/source.v1.Source/PendingFn" + Source_PartitionsFn_FullMethodName = "/source.v1.Source/PartitionsFn" + Source_IsReady_FullMethodName = "/source.v1.Source/IsReady" ) // SourceClient is the client API for Source service. @@ -57,6 +58,8 @@ type SourceClient interface { AckFn(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) // PendingFn returns the number of pending records at the user defined source. PendingFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PendingResponse, error) + // PartitionsFn returns the list of partitions for the user defined source. + PartitionsFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PartitionsResponse, error) // IsReady is the heartbeat endpoint for user defined source gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) } @@ -119,6 +122,15 @@ func (c *sourceClient) PendingFn(ctx context.Context, in *emptypb.Empty, opts .. return out, nil } +func (c *sourceClient) PartitionsFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PartitionsResponse, error) { + out := new(PartitionsResponse) + err := c.cc.Invoke(ctx, Source_PartitionsFn_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *sourceClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) err := c.cc.Invoke(ctx, Source_IsReady_FullMethodName, in, out, opts...) @@ -144,6 +156,8 @@ type SourceServer interface { AckFn(context.Context, *AckRequest) (*AckResponse, error) // PendingFn returns the number of pending records at the user defined source. PendingFn(context.Context, *emptypb.Empty) (*PendingResponse, error) + // PartitionsFn returns the list of partitions for the user defined source. + PartitionsFn(context.Context, *emptypb.Empty) (*PartitionsResponse, error) // IsReady is the heartbeat endpoint for user defined source gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) mustEmbedUnimplementedSourceServer() @@ -162,6 +176,9 @@ func (UnimplementedSourceServer) AckFn(context.Context, *AckRequest) (*AckRespon func (UnimplementedSourceServer) PendingFn(context.Context, *emptypb.Empty) (*PendingResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method PendingFn not implemented") } +func (UnimplementedSourceServer) PartitionsFn(context.Context, *emptypb.Empty) (*PartitionsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method PartitionsFn not implemented") +} func (UnimplementedSourceServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") } @@ -235,6 +252,24 @@ func _Source_PendingFn_Handler(srv interface{}, ctx context.Context, dec func(in return interceptor(ctx, in, info, handler) } +func _Source_PartitionsFn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(emptypb.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SourceServer).PartitionsFn(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Source_PartitionsFn_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SourceServer).PartitionsFn(ctx, req.(*emptypb.Empty)) + } + return interceptor(ctx, in, info, handler) +} + func _Source_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(emptypb.Empty) if err := dec(in); err != nil { @@ -268,6 +303,10 @@ var Source_ServiceDesc = grpc.ServiceDesc{ MethodName: "PendingFn", Handler: _Source_PendingFn_Handler, }, + { + MethodName: "PartitionsFn", + Handler: _Source_PartitionsFn_Handler, + }, { MethodName: "IsReady", Handler: _Source_IsReady_Handler,