From 32a012063082850847c64f88d541f1aa9229ed76 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Mon, 23 Sep 2024 11:34:48 +0530 Subject: [PATCH] feat: bidirectional streaming sink Signed-off-by: Yashash H L --- pkg/apis/proto/sink/v1/sink.pb.go | 425 +++++++++++++++----- pkg/apis/proto/sink/v1/sink.proto | 37 +- pkg/apis/proto/sink/v1/sink_grpc.pb.go | 12 +- pkg/apis/proto/sink/v1/sinkmock/sinkmock.go | 30 +- pkg/sinker/service.go | 160 +++++--- pkg/sinker/service_test.go | 183 ++++++--- 6 files changed, 602 insertions(+), 245 deletions(-) diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index c4b59369..e08dfd50 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -79,12 +79,13 @@ type SinkRequest struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Id string `protobuf:"bytes,5,opt,name=id,proto3" json:"id,omitempty"` - Headers map[string]string `protobuf:"bytes,6,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // Required field indicating the request. + Request *SinkRequest_Request `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + // Required field indicating the status of the request. + // If eot is set to true, it indicates the end of transmission. + Status *SinkRequest_Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + // optional field indicating the handshake message. + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *SinkRequest) Reset() { @@ -119,46 +120,74 @@ func (*SinkRequest) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{0} } -func (x *SinkRequest) GetKeys() []string { +func (x *SinkRequest) GetRequest() *SinkRequest_Request { if x != nil { - return x.Keys + return x.Request } return nil } -func (x *SinkRequest) GetValue() []byte { +func (x *SinkRequest) GetStatus() *SinkRequest_Status { if x != nil { - return x.Value + return x.Status } return nil } -func (x *SinkRequest) GetEventTime() *timestamppb.Timestamp { +func (x *SinkRequest) GetHandshake() *Handshake { if x != nil { - return x.EventTime + return x.Handshake } return nil } -func (x *SinkRequest) GetWatermark() *timestamppb.Timestamp { - if x != nil { - return x.Watermark +// Handshake message between client and server to indicate the start of transmission. +type Handshake struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Required field indicating the start of transmission. + Sot bool `protobuf:"varint,1,opt,name=sot,proto3" json:"sot,omitempty"` +} + +func (x *Handshake) Reset() { + *x = Handshake{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return nil } -func (x *SinkRequest) GetId() string { - if x != nil { - return x.Id +func (x *Handshake) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Handshake) ProtoMessage() {} + +func (x *Handshake) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms } - return "" + return mi.MessageOf(x) } -func (x *SinkRequest) GetHeaders() map[string]string { +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{1} +} + +func (x *Handshake) GetSot() bool { if x != nil { - return x.Headers + return x.Sot } - return nil + return false } // * @@ -174,7 +203,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[1] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -187,7 +216,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[1] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -200,7 +229,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{1} + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{2} } func (x *ReadyResponse) GetReady() bool { @@ -217,13 +246,14 @@ type SinkResponse struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Results []*SinkResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + Result *SinkResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` + Handshake *Handshake `protobuf:"bytes,2,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *SinkResponse) Reset() { *x = SinkResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[2] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -236,7 +266,7 @@ func (x *SinkResponse) String() string { func (*SinkResponse) ProtoMessage() {} func (x *SinkResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[2] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -249,16 +279,157 @@ func (x *SinkResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SinkResponse.ProtoReflect.Descriptor instead. func (*SinkResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{2} + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{3} } -func (x *SinkResponse) GetResults() []*SinkResponse_Result { +func (x *SinkResponse) GetResult() *SinkResponse_Result { if x != nil { - return x.Results + return x.Result } return nil } +func (x *SinkResponse) GetHandshake() *Handshake { + if x != nil { + return x.Handshake + } + return nil +} + +type SinkRequest_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Id string `protobuf:"bytes,5,opt,name=id,proto3" json:"id,omitempty"` + Headers map[string]string `protobuf:"bytes,6,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *SinkRequest_Request) Reset() { + *x = SinkRequest_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SinkRequest_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SinkRequest_Request) ProtoMessage() {} + +func (x *SinkRequest_Request) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SinkRequest_Request.ProtoReflect.Descriptor instead. +func (*SinkRequest_Request) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{0, 0} +} + +func (x *SinkRequest_Request) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *SinkRequest_Request) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *SinkRequest_Request) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *SinkRequest_Request) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +func (x *SinkRequest_Request) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *SinkRequest_Request) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +type SinkRequest_Status struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` +} + +func (x *SinkRequest_Status) Reset() { + *x = SinkRequest_Status{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SinkRequest_Status) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SinkRequest_Status) ProtoMessage() {} + +func (x *SinkRequest_Status) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SinkRequest_Status.ProtoReflect.Descriptor instead. +func (*SinkRequest_Status) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{0, 1} +} + +func (x *SinkRequest_Status) GetEot() bool { + if x != nil { + return x.Eot + } + return false +} + type SinkResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -275,7 +446,7 @@ type SinkResponse_Result struct { func (x *SinkResponse_Result) Reset() { *x = SinkResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[7] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -288,7 +459,7 @@ func (x *SinkResponse_Result) String() string { func (*SinkResponse_Result) ProtoMessage() {} func (x *SinkResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[7] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -301,7 +472,7 @@ func (x *SinkResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use SinkResponse_Result.ProtoReflect.Descriptor instead. func (*SinkResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{2, 0} + return file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP(), []int{3, 0} } func (x *SinkResponse_Result) GetId() string { @@ -334,8 +505,19 @@ var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, - 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb5, 0x02, 0x0a, 0x0b, 0x53, - 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, + 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x97, 0x04, 0x0a, 0x0b, 0x53, + 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x07, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, + 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x33, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1b, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, + 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x35, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x73, 0x69, 0x6e, + 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, + 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0xb9, + 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, @@ -346,42 +528,52 @@ var file_pkg_apis_proto_sink_v1_sink_proto_rawDesc = []byte{ 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, - 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x3b, 0x0a, 0x07, 0x68, 0x65, 0x61, - 0x64, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x73, 0x69, 0x6e, + 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x43, 0x0a, 0x07, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x73, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0xa2, 0x01, 0x0a, 0x0c, 0x53, 0x69, - 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x07, 0x72, 0x65, - 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, + 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, + 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x1a, 0x0a, 0x06, 0x53, 0x74, + 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, + 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, + 0x73, 0x6f, 0x74, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x22, 0xe5, 0x01, 0x0a, 0x0c, 0x53, + 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x34, 0x0a, 0x06, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x73, 0x1a, 0x5a, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x27, 0x0a, 0x06, - 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x73, - 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, - 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x65, 0x72, 0x72, 0x5f, 0x6d, 0x73, 0x67, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, 0x72, 0x4d, 0x73, 0x67, 0x2a, 0x30, - 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, - 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, - 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x41, 0x4c, 0x4c, 0x42, 0x41, 0x43, 0x4b, 0x10, 0x02, - 0x32, 0x7a, 0x0a, 0x04, 0x53, 0x69, 0x6e, 0x6b, 0x12, 0x37, 0x0a, 0x06, 0x53, 0x69, 0x6e, 0x6b, - 0x46, 0x6e, 0x12, 0x14, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, - 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, - 0x01, 0x12, 0x39, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, - 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x52, - 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x38, 0x5a, 0x36, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, - 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, - 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, - 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, + 0x74, 0x12, 0x35, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x48, + 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, + 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x5a, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x12, 0x27, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x74, 0x61, + 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x65, + 0x72, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x65, 0x72, + 0x72, 0x4d, 0x73, 0x67, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, + 0x6b, 0x65, 0x2a, 0x30, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, + 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, + 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x46, 0x41, 0x4c, 0x4c, 0x42, 0x41, + 0x43, 0x4b, 0x10, 0x02, 0x32, 0x7c, 0x0a, 0x04, 0x53, 0x69, 0x6e, 0x6b, 0x12, 0x39, 0x0a, 0x06, + 0x53, 0x69, 0x6e, 0x6b, 0x46, 0x6e, 0x12, 0x14, 0x2e, 0x73, 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, + 0x69, 0x6e, 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x69, 0x6e, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x39, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x16, 0x2e, 0x73, 0x69, 0x6e, + 0x6b, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x42, 0x38, 0x5a, 0x36, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, + 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x69, 0x6e, 0x6b, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -397,32 +589,39 @@ func file_pkg_apis_proto_sink_v1_sink_proto_rawDescGZIP() []byte { } var file_pkg_apis_proto_sink_v1_sink_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_pkg_apis_proto_sink_v1_sink_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_apis_proto_sink_v1_sink_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_pkg_apis_proto_sink_v1_sink_proto_goTypes = []any{ (Status)(0), // 0: sink.v1.Status (*SinkRequest)(nil), // 1: sink.v1.SinkRequest - (*ReadyResponse)(nil), // 2: sink.v1.ReadyResponse - (*SinkResponse)(nil), // 3: sink.v1.SinkResponse - nil, // 4: sink.v1.SinkRequest.HeadersEntry - (*SinkResponse_Result)(nil), // 5: sink.v1.SinkResponse.Result - (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 7: google.protobuf.Empty + (*Handshake)(nil), // 2: sink.v1.Handshake + (*ReadyResponse)(nil), // 3: sink.v1.ReadyResponse + (*SinkResponse)(nil), // 4: sink.v1.SinkResponse + (*SinkRequest_Request)(nil), // 5: sink.v1.SinkRequest.Request + (*SinkRequest_Status)(nil), // 6: sink.v1.SinkRequest.Status + nil, // 7: sink.v1.SinkRequest.Request.HeadersEntry + (*SinkResponse_Result)(nil), // 8: sink.v1.SinkResponse.Result + (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 10: google.protobuf.Empty } var file_pkg_apis_proto_sink_v1_sink_proto_depIdxs = []int32{ - 6, // 0: sink.v1.SinkRequest.event_time:type_name -> google.protobuf.Timestamp - 6, // 1: sink.v1.SinkRequest.watermark:type_name -> google.protobuf.Timestamp - 4, // 2: sink.v1.SinkRequest.headers:type_name -> sink.v1.SinkRequest.HeadersEntry - 5, // 3: sink.v1.SinkResponse.results:type_name -> sink.v1.SinkResponse.Result - 0, // 4: sink.v1.SinkResponse.Result.status:type_name -> sink.v1.Status - 1, // 5: sink.v1.Sink.SinkFn:input_type -> sink.v1.SinkRequest - 7, // 6: sink.v1.Sink.IsReady:input_type -> google.protobuf.Empty - 3, // 7: sink.v1.Sink.SinkFn:output_type -> sink.v1.SinkResponse - 2, // 8: sink.v1.Sink.IsReady:output_type -> sink.v1.ReadyResponse - 7, // [7:9] is the sub-list for method output_type - 5, // [5:7] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 5, // 0: sink.v1.SinkRequest.request:type_name -> sink.v1.SinkRequest.Request + 6, // 1: sink.v1.SinkRequest.status:type_name -> sink.v1.SinkRequest.Status + 2, // 2: sink.v1.SinkRequest.handshake:type_name -> sink.v1.Handshake + 8, // 3: sink.v1.SinkResponse.result:type_name -> sink.v1.SinkResponse.Result + 2, // 4: sink.v1.SinkResponse.handshake:type_name -> sink.v1.Handshake + 9, // 5: sink.v1.SinkRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 9, // 6: sink.v1.SinkRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 7, // 7: sink.v1.SinkRequest.Request.headers:type_name -> sink.v1.SinkRequest.Request.HeadersEntry + 0, // 8: sink.v1.SinkResponse.Result.status:type_name -> sink.v1.Status + 1, // 9: sink.v1.Sink.SinkFn:input_type -> sink.v1.SinkRequest + 10, // 10: sink.v1.Sink.IsReady:input_type -> google.protobuf.Empty + 4, // 11: sink.v1.Sink.SinkFn:output_type -> sink.v1.SinkResponse + 3, // 12: sink.v1.Sink.IsReady:output_type -> sink.v1.ReadyResponse + 11, // [11:13] is the sub-list for method output_type + 9, // [9:11] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_pkg_apis_proto_sink_v1_sink_proto_init() } @@ -444,7 +643,7 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*ReadyResponse); i { + switch v := v.(*Handshake); i { case 0: return &v.state case 1: @@ -456,6 +655,18 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*ReadyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*SinkResponse); i { case 0: return &v.state @@ -468,6 +679,30 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*SinkRequest_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[5].Exporter = func(v any, i int) any { + switch v := v.(*SinkRequest_Status); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[7].Exporter = func(v any, i int) any { switch v := v.(*SinkResponse_Result); i { case 0: return &v.state @@ -480,13 +715,15 @@ func file_pkg_apis_proto_sink_v1_sink_proto_init() { } } } + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[0].OneofWrappers = []any{} + file_pkg_apis_proto_sink_v1_sink_proto_msgTypes[3].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_sink_v1_sink_proto_rawDesc, NumEnums: 1, - NumMessages: 5, + NumMessages: 8, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/sink/v1/sink.proto b/pkg/apis/proto/sink/v1/sink.proto index 0bdd5a27..37775673 100644 --- a/pkg/apis/proto/sink/v1/sink.proto +++ b/pkg/apis/proto/sink/v1/sink.proto @@ -10,7 +10,7 @@ package sink.v1; service Sink { // SinkFn writes the request to a user defined sink. - rpc SinkFn(stream SinkRequest) returns (SinkResponse); + rpc SinkFn(stream SinkRequest) returns (stream SinkResponse); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); @@ -20,12 +20,32 @@ service Sink { * SinkRequest represents a request element. */ message SinkRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - string id = 5; - map headers = 6; + message Request { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + string id = 5; + map headers = 6; + } + message Status { + bool eot = 1; + } + // Required field indicating the request. + Request request = 1; + // Required field indicating the status of the request. + // If eot is set to true, it indicates the end of transmission. + Status status = 2; + // optional field indicating the handshake message. + optional Handshake handshake = 3; +} + +/* + * Handshake message between client and server to indicate the start of transmission. + */ +message Handshake { + // Required field indicating the start of transmission. + bool sot = 1; } /** @@ -56,5 +76,6 @@ message SinkResponse { // err_msg is the error message, set it if success is set to false. string err_msg = 3; } - repeated Result results = 1; + Result result = 1; + optional Handshake handshake = 2; } \ No newline at end of file diff --git a/pkg/apis/proto/sink/v1/sink_grpc.pb.go b/pkg/apis/proto/sink/v1/sink_grpc.pb.go index 6c6dce9e..7a975673 100644 --- a/pkg/apis/proto/sink/v1/sink_grpc.pb.go +++ b/pkg/apis/proto/sink/v1/sink_grpc.pb.go @@ -54,7 +54,7 @@ func (c *sinkClient) SinkFn(ctx context.Context, opts ...grpc.CallOption) (Sink_ type Sink_SinkFnClient interface { Send(*SinkRequest) error - CloseAndRecv() (*SinkResponse, error) + Recv() (*SinkResponse, error) grpc.ClientStream } @@ -66,10 +66,7 @@ func (x *sinkSinkFnClient) Send(m *SinkRequest) error { return x.ClientStream.SendMsg(m) } -func (x *sinkSinkFnClient) CloseAndRecv() (*SinkResponse, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } +func (x *sinkSinkFnClient) Recv() (*SinkResponse, error) { m := new(SinkResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -126,7 +123,7 @@ func _Sink_SinkFn_Handler(srv interface{}, stream grpc.ServerStream) error { } type Sink_SinkFnServer interface { - SendAndClose(*SinkResponse) error + Send(*SinkResponse) error Recv() (*SinkRequest, error) grpc.ServerStream } @@ -135,7 +132,7 @@ type sinkSinkFnServer struct { grpc.ServerStream } -func (x *sinkSinkFnServer) SendAndClose(m *SinkResponse) error { +func (x *sinkSinkFnServer) Send(m *SinkResponse) error { return x.ServerStream.SendMsg(m) } @@ -181,6 +178,7 @@ var Sink_ServiceDesc = grpc.ServiceDesc{ { StreamName: "SinkFn", Handler: _Sink_SinkFn_Handler, + ServerStreams: true, ClientStreams: true, }, }, diff --git a/pkg/apis/proto/sink/v1/sinkmock/sinkmock.go b/pkg/apis/proto/sink/v1/sinkmock/sinkmock.go index 6ee22b6e..442fc9ca 100644 --- a/pkg/apis/proto/sink/v1/sinkmock/sinkmock.go +++ b/pkg/apis/proto/sink/v1/sinkmock/sinkmock.go @@ -101,21 +101,6 @@ func (m *MockSink_SinkFnClient) EXPECT() *MockSink_SinkFnClientMockRecorder { return m.recorder } -// CloseAndRecv mocks base method. -func (m *MockSink_SinkFnClient) CloseAndRecv() (*v1.SinkResponse, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CloseAndRecv") - ret0, _ := ret[0].(*v1.SinkResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// CloseAndRecv indicates an expected call of CloseAndRecv. -func (mr *MockSink_SinkFnClientMockRecorder) CloseAndRecv() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseAndRecv", reflect.TypeOf((*MockSink_SinkFnClient)(nil).CloseAndRecv)) -} - // CloseSend mocks base method. func (m *MockSink_SinkFnClient) CloseSend() error { m.ctrl.T.Helper() @@ -159,6 +144,21 @@ func (mr *MockSink_SinkFnClientMockRecorder) Header() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockSink_SinkFnClient)(nil).Header)) } +// Recv mocks base method. +func (m *MockSink_SinkFnClient) Recv() (*v1.SinkResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*v1.SinkResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockSink_SinkFnClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockSink_SinkFnClient)(nil).Recv)) +} + // RecvMsg mocks base method. func (m *MockSink_SinkFnClient) RecvMsg(arg0 interface{}) error { m.ctrl.T.Helper() diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index bb99dab7..dc76b583 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -2,12 +2,14 @@ package sinker import ( "context" + "errors" + "fmt" "io" "log" "runtime/debug" - "sync" "time" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/types/known/emptypb" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" @@ -72,69 +74,111 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sinkpb.ReadyRespon // SinkFn applies a sink function to a every element. func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { - var ( - resultList []*sinkpb.SinkResponse_Result - wg sync.WaitGroup - datumStreamCh = make(chan Datum) - ctx = stream.Context() - ) - - wg.Add(1) - go func() { - defer wg.Done() - // handle panic - defer func() { - if r := recover(); r != nil { - log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) - fs.shutdownCh <- struct{}{} + ctx := stream.Context() + // Perform handshake before entering the main loop + req, err := stream.Recv() + if err != nil { + log.Printf("error receiving handshake from stream: %v", err) + return err + } + + if req.Handshake == nil || !req.Handshake.Sot { + return fmt.Errorf("expected handshake message") + } + + // Send handshake response + handshakeResponse := &sinkpb.SinkResponse{ + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_SUCCESS, + }, + Handshake: &sinkpb.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return err + } + + for { + datumStreamCh := make(chan Datum) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + defer close(datumStreamCh) + defer func() { + if r := recover(); r != nil { + log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack())) + fs.shutdownCh <- struct{}{} + } + }() + + for { + // Receive sink requests from the stream + req, err := stream.Recv() + if err == io.EOF { + log.Printf("end of sink stream") + return err + } + if err != nil { + log.Printf("error receiving from sink stream: %v", err) + return err + } + + if req.Status != nil && req.Status.Eot { + // End of transmission, break to start a new sink invocation + break + } + + datum := &handlerDatum{ + id: req.GetRequest().GetId(), + value: req.GetRequest().GetValue(), + keys: req.GetRequest().GetKeys(), + eventTime: req.GetRequest().GetEventTime().AsTime(), + watermark: req.GetRequest().GetWatermark().AsTime(), + headers: req.GetRequest().GetHeaders(), + } + + // Send datum to the channel + datumStreamCh <- datum } - }() - messages := fs.Sinker.Sink(ctx, datumStreamCh) - for _, msg := range messages { - if msg.Fallback { - resultList = append(resultList, &sinkpb.SinkResponse_Result{ - Id: msg.ID, - Status: sinkpb.Status_FALLBACK, - }) - } else if msg.Success { - resultList = append(resultList, &sinkpb.SinkResponse_Result{ - Id: msg.ID, - Status: sinkpb.Status_SUCCESS, - }) - } else { - resultList = append(resultList, &sinkpb.SinkResponse_Result{ - Id: msg.ID, - Status: sinkpb.Status_FAILURE, - ErrMsg: msg.Err, - }) + return nil + }) + + // invoke the sink function, and send the responses back to the client + g.Go(func() error { + responses := fs.Sinker.Sink(ctx, datumStreamCh) + for _, response := range responses { + var status sinkpb.Status + if response.Fallback { + status = sinkpb.Status_FALLBACK + } else if response.Success { + status = sinkpb.Status_SUCCESS + } else { + status = sinkpb.Status_FAILURE + } + + sinkResponse := &sinkpb.SinkResponse{ + Result: &sinkpb.SinkResponse_Result{ + Id: response.ID, + Status: status, + ErrMsg: response.Err, + }, + } + if err := stream.Send(sinkResponse); err != nil { + log.Printf("error sending sink response: %v", err) + return err + } } - } - }() + return nil + }) - for { - d, err := stream.Recv() - if err == io.EOF { - close(datumStreamCh) - break + // Wait for the goroutines to finish + err := g.Wait() + if errors.Is(err, io.EOF) { + return nil } if err != nil { - close(datumStreamCh) - // TODO: research on gRPC errors and revisit the error handler return err } - var hd = &handlerDatum{ - id: d.GetId(), - value: d.GetValue(), - keys: d.GetKeys(), - eventTime: d.GetEventTime().AsTime(), - watermark: d.GetWatermark().AsTime(), - headers: d.GetHeaders(), - } - datumStreamCh <- hd } - - wg.Wait() - return stream.SendAndClose(&sinkpb.SinkResponse{ - Results: resultList, - }) } diff --git a/pkg/sinker/service_test.go b/pkg/sinker/service_test.go index d6ad1b5b..4a66c36c 100644 --- a/pkg/sinker/service_test.go +++ b/pkg/sinker/service_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -19,12 +20,12 @@ import ( type SinkFnServerTest struct { ctx context.Context inputCh chan *sinkpb.SinkRequest - rl *sinkpb.SinkResponse + rl []*sinkpb.SinkResponse grpc.ServerStream } -func (t *SinkFnServerTest) SendAndClose(list *sinkpb.SinkResponse) error { - t.rl = list +func (t *SinkFnServerTest) Send(response *sinkpb.SinkResponse) error { + t.rl = append(t.rl, response) return nil } @@ -45,35 +46,51 @@ func TestService_SinkFn(t *testing.T) { name string sh Sinker input []*sinkpb.SinkRequest - expected []*sinkpb.SinkResponse_Result + expected []*sinkpb.SinkResponse }{ { name: "sink_fn_test_success", input: []*sinkpb.SinkRequest{ { - Id: "one-processed", - Keys: []string{"sink-test"}, - Value: []byte(strconv.Itoa(10)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Headers: map[string]string{"x-txn-id": "test-txn-1"}, + Request: &sinkpb.SinkRequest_Request{}, + Handshake: &sinkpb.Handshake{ + Sot: true, + }, }, { - Id: "two-processed", - Keys: []string{"sink-test"}, - Value: []byte(strconv.Itoa(20)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Headers: map[string]string{"x-txn-id": "test-txn-2"}, + Request: &sinkpb.SinkRequest_Request{ + Id: "one-processed", + Keys: []string{"sink-test"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Headers: map[string]string{"x-txn-id": "test-txn-1"}, + }, }, { - Id: "three-processed", - Keys: []string{"sink-test"}, - Value: []byte(strconv.Itoa(30)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Headers: map[string]string{"x-txn-id": "test-txn-3"}, + Request: &sinkpb.SinkRequest_Request{ + Id: "two-processed", + Keys: []string{"sink-test"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Headers: map[string]string{"x-txn-id": "test-txn-2"}, + }, + }, + { + Request: &sinkpb.SinkRequest_Request{ + Id: "three-processed", + Keys: []string{"sink-test"}, + Value: []byte(strconv.Itoa(30)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Headers: map[string]string{"x-txn-id": "test-txn-3"}, + }, + }, + { + Request: &sinkpb.SinkRequest_Request{}, + Status: &sinkpb.SinkRequest_Status{Eot: true}, }, }, sh: SinkerFunc(func(ctx context.Context, rch <-chan Datum) Responses { @@ -84,50 +101,77 @@ func TestService_SinkFn(t *testing.T) { } return result }), - expected: []*sinkpb.SinkResponse_Result{ + expected: []*sinkpb.SinkResponse{ { - Status: sinkpb.Status_SUCCESS, - Id: "one-processed", - ErrMsg: "", + Result: &sinkpb.SinkResponse_Result{}, + Handshake: &sinkpb.Handshake{ + Sot: true, + }, }, { - Status: sinkpb.Status_SUCCESS, - Id: "two-processed", - ErrMsg: "", + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_SUCCESS, + Id: "one-processed", + ErrMsg: "", + }, }, { - Status: sinkpb.Status_SUCCESS, - Id: "three-processed", - ErrMsg: "", + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_SUCCESS, + Id: "two-processed", + ErrMsg: "", + }, + }, + { + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_SUCCESS, + Id: "three-processed", + ErrMsg: "", + }, }, }, }, { name: "sink_fn_test_failure", - input: []*sinkpb.SinkRequest{ { - Id: "one-processed", - Keys: []string{"sink-test-1", "sink-test-2"}, - Value: []byte(strconv.Itoa(10)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Headers: map[string]string{"x-txn-id": "test-txn-1"}, + Request: &sinkpb.SinkRequest_Request{}, + Handshake: &sinkpb.Handshake{ + Sot: true, + }, }, { - Id: "two-processed", - Keys: []string{"sink-test-1", "sink-test-2"}, - Value: []byte(strconv.Itoa(20)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + Request: &sinkpb.SinkRequest_Request{ + Id: "one-processed", + Keys: []string{"sink-test-1", "sink-test-2"}, + Value: []byte(strconv.Itoa(10)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Headers: map[string]string{"x-txn-id": "test-txn-1"}, + }, }, { - Id: "three-processed", - Keys: []string{"sink-test-1", "sink-test-2"}, - Value: []byte(strconv.Itoa(30)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), - Headers: map[string]string{"x-txn-id": "test-txn-2"}, + Request: &sinkpb.SinkRequest_Request{ + Id: "two-processed", + Keys: []string{"sink-test-1", "sink-test-2"}, + Value: []byte(strconv.Itoa(20)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, + }, + { + Request: &sinkpb.SinkRequest_Request{ + Id: "three-processed", + Keys: []string{"sink-test-1", "sink-test-2"}, + Value: []byte(strconv.Itoa(30)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + Headers: map[string]string{"x-txn-id": "test-txn-2"}, + }, + }, + { + Request: &sinkpb.SinkRequest_Request{}, + Status: &sinkpb.SinkRequest_Status{Eot: true}, }, }, sh: SinkerFunc(func(ctx context.Context, rch <-chan Datum) Responses { @@ -138,21 +182,33 @@ func TestService_SinkFn(t *testing.T) { } return result }), - expected: []*sinkpb.SinkResponse_Result{ + expected: []*sinkpb.SinkResponse{ + { + Result: &sinkpb.SinkResponse_Result{}, + Handshake: &sinkpb.Handshake{ + Sot: true, + }, + }, { - Status: sinkpb.Status_FAILURE, - Id: "one-processed", - ErrMsg: "unknown error", + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_FAILURE, + Id: "one-processed", + ErrMsg: "unknown error", + }, }, { - Status: sinkpb.Status_FAILURE, - Id: "two-processed", - ErrMsg: "unknown error", + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_FAILURE, + Id: "two-processed", + ErrMsg: "unknown error", + }, }, { - Status: sinkpb.Status_FAILURE, - Id: "three-processed", - ErrMsg: "unknown error", + Result: &sinkpb.SinkResponse_Result{ + Status: sinkpb.Status_FAILURE, + Id: "three-processed", + ErrMsg: "unknown error", + }, }, }, }, @@ -173,7 +229,8 @@ func TestService_SinkFn(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - _ = ss.SinkFn(udfReduceFnStream) + err := ss.SinkFn(udfReduceFnStream) + assert.NoError(t, err) }() for _, val := range tt.input { @@ -183,8 +240,8 @@ func TestService_SinkFn(t *testing.T) { wg.Wait() - if !reflect.DeepEqual(udfReduceFnStream.rl.Results, tt.expected) { - t.Errorf("ReduceFn() got = %v, want %v", udfReduceFnStream.rl.Results, tt.expected) + if !reflect.DeepEqual(udfReduceFnStream.rl, tt.expected) { + t.Errorf("ReduceFn() got = %v, want %v", udfReduceFnStream.rl, tt.expected) } }) }