diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index e08dfd50..e0ccb0cc 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -83,7 +83,7 @@ type SinkRequest struct { Request *SinkRequest_Request `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` // Required field indicating the status of the request. // If eot is set to true, it indicates the end of transmission. - Status *SinkRequest_Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + Status *TransmissionStatus `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` // optional field indicating the handshake message. Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } @@ -127,7 +127,7 @@ func (x *SinkRequest) GetRequest() *SinkRequest_Request { return nil } -func (x *SinkRequest) GetStatus() *SinkRequest_Status { +func (x *SinkRequest) GetStatus() *TransmissionStatus { if x != nil { return x.Status } @@ -239,6 +239,55 @@ func (x *ReadyResponse) GetReady() bool { return false } +// * +// TransmissionStatus is the status of the transmission. +type TransmissionStatus struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` +} + +func (x *TransmissionStatus) Reset() { + *x = TransmissionStatus{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TransmissionStatus) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TransmissionStatus) ProtoMessage() {} + +func (x *TransmissionStatus) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TransmissionStatus.ProtoReflect.Descriptor instead. +func (*TransmissionStatus) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{3} +} + +func (x *TransmissionStatus) GetEot() bool { + if x != nil { + return x.Eot + } + return false +} + // * // SinkResponse is the individual response of each message written to the sink. type SinkResponse struct { @@ -248,12 +297,13 @@ type SinkResponse struct { Result *SinkResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` Handshake *Handshake `protobuf:"bytes,2,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` + Status *TransmissionStatus `protobuf:"bytes,3,opt,name=status,proto3,oneof" json:"status,omitempty"` } func (x *SinkResponse) Reset() { *x = SinkResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -266,7 +316,7 @@ func (x *SinkResponse) String() string { func (*SinkResponse) ProtoMessage() {} func (x *SinkResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -279,7 +329,7 @@ func (x *SinkResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SinkResponse.ProtoReflect.Descriptor instead. func (*SinkResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{3} + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{4} } func (x *SinkResponse) GetResult() *SinkResponse_Result { @@ -296,6 +346,13 @@ func (x *SinkResponse) GetHandshake() *Handshake { return nil } +func (x *SinkResponse) GetStatus() *TransmissionStatus { + if x != nil { + return x.Status + } + return nil +} + type SinkRequest_Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -312,7 +369,7 @@ type SinkRequest_Request struct { func (x *SinkRequest_Request) Reset() { *x = SinkRequest_Request{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -325,7 +382,7 @@ func (x *SinkRequest_Request) String() string { func (*SinkRequest_Request) ProtoMessage() {} func (x *SinkRequest_Request) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -383,53 +440,6 @@ func (x *SinkRequest_Request) GetHeaders() map[string]string { return nil } -type SinkRequest_Status struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` -} - -func (x *SinkRequest_Status) Reset() { - *x = SinkRequest_Status{} - if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *SinkRequest_Status) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*SinkRequest_Status) ProtoMessage() {} - -func (x *SinkRequest_Status) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use SinkRequest_Status.ProtoReflect.Descriptor instead. -func (*SinkRequest_Status) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{0, 1} -} - -func (x *SinkRequest_Status) GetEot() bool { - if x != nil { - return x.Eot - } - return false -} - type SinkResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -472,7 +482,7 @@ func (x *SinkResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use SinkResponse_Result.ProtoReflect.Descriptor instead. func (*SinkResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{3, 0} + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{4, 0} } func (x *SinkResponse_Result) GetId() string { @@ -505,14 +515,14 @@ var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x04, 0x0a, 0x0b, 0x53, + 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xfb, 0x03, 0x0a, 0x0b, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x33, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x35, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, @@ -536,44 +546,49 @@ var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x1a, 0x0a, 0x06, 0x53, 0x74, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, + 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, + 0x73, 0x68, 0x61, 0x6b, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x03, 0x73, 0x6f, 0x74, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0x26, + 0x0a, 0x12, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, - 0x68, 0x61, 0x6b, 0x65, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, - 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, - 0x73, 0x6f, 0x74, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0xe5, 0x01, 0x0a, 0x0c, 0x53, - 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x06, 0x72, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, - 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x12, 0x35, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x48, - 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, - 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x5a, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, - 0x69, 0x64, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, - 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x65, - 0x72, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, - 0x72, 0x4d, 0x73, 0x67, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, - 0x6b, 0x65, 0x2a, 0x30, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, - 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, - 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x41, 0x4c, 0x4c, 0x42, 0x41, - 0x43, 0x4b, 0x10, 0x02, 0x32, 0x7c, 0x0a, 0x04, 0x53, 0x69, 0x6e, 0x6b, 0x12, 0x39, 0x0a, 0x06, - 0x53, 0x69, 0x6e, 0x6b, 0x46, 0x6e, 0x12, 0x14, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, - 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, - 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, - 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x69, 0x6e, - 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, - 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x22, 0xaa, 0x02, 0x0a, 0x0c, 0x53, 0x69, 0x6e, 0x6b, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, + 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x35, 0x0a, + 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x12, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, + 0x65, 0x88, 0x01, 0x01, 0x12, 0x38, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x6d, 0x69, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x48, 0x01, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x88, 0x01, 0x01, 0x1a, 0x5a, + 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, + 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, + 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x12, 0x17, 0x0a, 0x07, 0x65, 0x72, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, + 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x73, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x2a, 0x30, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, + 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, + 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x41, 0x4c, 0x4c, 0x42, + 0x41, 0x43, 0x4b, 0x10, 0x02, 0x32, 0x7c, 0x0a, 0x04, 0x53, 0x69, 0x6e, 0x6b, 0x12, 0x39, 0x0a, + 0x06, 0x53, 0x69, 0x6e, 0x6b, 0x46, 0x6e, 0x12, 0x14, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, + 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, + 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x69, + 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, + 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -595,33 +610,34 @@ var file_pkg_apis_proto_sink_v1_sink_proto_goTypes = []any{ (*SinkRequest)(nil), // 1: sink.v1.SinkRequest (*Handshake)(nil), // 2: sink.v1.Handshake (*ReadyResponse)(nil), // 3: sink.v1.ReadyResponse - (*SinkResponse)(nil), // 4: sink.v1.SinkResponse - (*SinkRequest_Request)(nil), // 5: sink.v1.SinkRequest.Request - (*SinkRequest_Status)(nil), // 6: sink.v1.SinkRequest.Status + (*TransmissionStatus)(nil), // 4: sink.v1.TransmissionStatus + (*SinkResponse)(nil), // 5: sink.v1.SinkResponse + (*SinkRequest_Request)(nil), // 6: sink.v1.SinkRequest.Request nil, // 7: sink.v1.SinkRequest.Request.HeadersEntry (*SinkResponse_Result)(nil), // 8: sink.v1.SinkResponse.Result (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp (*emptypb.Empty)(nil), // 10: google.protobuf.Empty } var file_pkg_apis_proto_sink_v1_sink_proto_depIdxs = []int32{ - 5, // 0: sink.v1.SinkRequest.request:type_name -> sink.v1.SinkRequest.Request - 6, // 1: sink.v1.SinkRequest.status:type_name -> sink.v1.SinkRequest.Status + 6, // 0: sink.v1.SinkRequest.request:type_name -> sink.v1.SinkRequest.Request + 4, // 1: sink.v1.SinkRequest.status:type_name -> sink.v1.TransmissionStatus 2, // 2: sink.v1.SinkRequest.handshake:type_name -> sink.v1.Handshake 8, // 3: sink.v1.SinkResponse.result:type_name -> sink.v1.SinkResponse.Result 2, // 4: sink.v1.SinkResponse.handshake:type_name -> sink.v1.Handshake - 9, // 5: sink.v1.SinkRequest.Request.event_time:type_name -> google.protobuf.Timestamp - 9, // 6: sink.v1.SinkRequest.Request.watermark:type_name -> google.protobuf.Timestamp - 7, // 7: sink.v1.SinkRequest.Request.headers:type_name -> sink.v1.SinkRequest.Request.HeadersEntry - 0, // 8: sink.v1.SinkResponse.Result.status:type_name -> sink.v1.Status - 1, // 9: sink.v1.Sink.SinkFn:input_type -> sink.v1.SinkRequest - 10, // 10: sink.v1.Sink.IsReady:input_type -> google.protobuf.Empty - 4, // 11: sink.v1.Sink.SinkFn:output_type -> sink.v1.SinkResponse - 3, // 12: sink.v1.Sink.IsReady:output_type -> sink.v1.ReadyResponse - 11, // [11:13] is the sub-list for method output_type - 9, // [9:11] is the sub-list for method input_type - 9, // [9:9] is the sub-list for extension type_name - 9, // [9:9] is the sub-list for extension extendee - 0, // [0:9] is the sub-list for field type_name + 4, // 5: sink.v1.SinkResponse.status:type_name -> sink.v1.TransmissionStatus + 9, // 6: sink.v1.SinkRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 9, // 7: sink.v1.SinkRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 7, // 8: sink.v1.SinkRequest.Request.headers:type_name -> sink.v1.SinkRequest.Request.HeadersEntry + 0, // 9: sink.v1.SinkResponse.Result.status:type_name -> sink.v1.Status + 1, // 10: sink.v1.Sink.SinkFn:input_type -> sink.v1.SinkRequest + 10, // 11: sink.v1.Sink.IsReady:input_type -> google.protobuf.Empty + 5, // 12: sink.v1.Sink.SinkFn:output_type -> sink.v1.SinkResponse + 3, // 13: sink.v1.Sink.IsReady:output_type -> sink.v1.ReadyResponse + 12, // [12:14] is the sub-list for method output_type + 10, // [10:12] is the sub-list for method input_type + 10, // [10:10] is the sub-list for extension type_name + 10, // [10:10] is the sub-list for extension extendee + 0, // [0:10] is the sub-list for field type_name } func init() { file_pkg_apis_proto_sink_v1_sink_proto_init() } @@ -667,7 +683,7 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3].Exporter = func(v any, i int) any { - switch v := v.(*SinkResponse); i { + switch v := v.(*TransmissionStatus); i { case 0: return &v.state case 1: @@ -679,7 +695,7 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4].Exporter = func(v any, i int) any { - switch v := v.(*SinkRequest_Request); i { + switch v := v.(*SinkResponse); i { case 0: return &v.state case 1: @@ -691,7 +707,7 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5].Exporter = func(v any, i int) any { - switch v := v.(*SinkRequest_Status); i { + switch v := v.(*SinkRequest_Request); i { case 0: return &v.state case 1: @@ -716,7 +732,7 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[0].OneofWrappers = []any{} - file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3].OneofWrappers = []any{} + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ diff --git a/pkg/apis/proto/sink/v1/sink.proto b/pkg/apis/proto/sink/v1/sink.proto index 37775673..d51c55d1 100644 --- a/pkg/apis/proto/sink/v1/sink.proto +++ b/pkg/apis/proto/sink/v1/sink.proto @@ -28,14 +28,11 @@ message SinkRequest { string id = 5; map headers = 6; } - message Status { - bool eot = 1; - } // Required field indicating the request. Request request = 1; // Required field indicating the status of the request. // If eot is set to true, it indicates the end of transmission. - Status status = 2; + TransmissionStatus status = 2; // optional field indicating the handshake message. optional Handshake handshake = 3; } @@ -55,6 +52,13 @@ message ReadyResponse { bool ready = 1; } +/** + * TransmissionStatus is the status of the transmission. + */ +message TransmissionStatus { + bool eot = 1; +} + /* * Status is the status of the response. */ @@ -78,4 +82,5 @@ message SinkResponse { } Result result = 1; optional Handshake handshake = 2; + optional TransmissionStatus status = 3; } \ No newline at end of file diff --git a/pkg/mapstreamer/examples/flatmap_stream/go.mod b/pkg/mapstreamer/examples/flatmap_stream/go.mod index 5fe8ff39..1b14a67a 100644 --- a/pkg/mapstreamer/examples/flatmap_stream/go.mod +++ b/pkg/mapstreamer/examples/flatmap_stream/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapstreamer/examples/flatmap_stream/go.sum b/pkg/mapstreamer/examples/flatmap_stream/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapstreamer/examples/flatmap_stream/go.sum +++ b/pkg/mapstreamer/examples/flatmap_stream/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 5a19ab71..9ee71e09 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -87,8 +87,6 @@ func (fs *Service) invokeHandler(ctx context.Context, req *mappb.MapRequest, mes err = fmt.Errorf("panic inside mapStream handler: %v", r) return } - // close the message channel after the handler is done processing the request - close(messageCh) }() streamReq := req.GetRequest() hd := NewHandlerDatum(streamReq.GetValue(), streamReq.GetEventTime().AsTime(), streamReq.GetWatermark().AsTime(), streamReq.GetHeaders()) diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index a012afea..8a58ec6a 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -197,5 +197,19 @@ func (fs *Service) processData(ctx context.Context, stream sinkpb.Sink_SinkFnSer return err } } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + // send the end of transmission message + eotResponse := &sinkpb.SinkResponse{ + Status: &sinkpb.TransmissionStatus{Eot: true}, + } + if err := stream.Send(eotResponse); err != nil { + log.Printf("error sending end of transmission message: %v", err) + return err + } return nil } diff --git a/pkg/sinker/service_test.go b/pkg/sinker/service_test.go index 4a66c36c..796bc79c 100644 --- a/pkg/sinker/service_test.go +++ b/pkg/sinker/service_test.go @@ -90,7 +90,7 @@ func TestService_SinkFn(t *testing.T) { }, { Request: &sinkpb.SinkRequest_Request{}, - Status: &sinkpb.SinkRequest_Status{Eot: true}, + Status: &sinkpb.TransmissionStatus{Eot: true}, }, }, sh: SinkerFunc(func(ctx context.Context, rch <-chan Datum) Responses { @@ -129,6 +129,9 @@ func TestService_SinkFn(t *testing.T) { ErrMsg: "", }, }, + { + Status: &sinkpb.TransmissionStatus{Eot: true}, + }, }, }, { @@ -171,7 +174,7 @@ func TestService_SinkFn(t *testing.T) { }, { Request: &sinkpb.SinkRequest_Request{}, - Status: &sinkpb.SinkRequest_Status{Eot: true}, + Status: &sinkpb.TransmissionStatus{Eot: true}, }, }, sh: SinkerFunc(func(ctx context.Context, rch <-chan Datum) Responses { @@ -210,6 +213,9 @@ func TestService_SinkFn(t *testing.T) { ErrMsg: "unknown error", }, }, + { + Status: &sinkpb.TransmissionStatus{Eot: true}, + }, }, }, }