From ee2f3086d64e0111b27ca374d9155c8288703b0e Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Tue, 17 Sep 2024 10:59:11 +0530 Subject: [PATCH] add handshake at top level Signed-off-by: Yashash H L --- pkg/apis/proto/source/v1/source.pb.go | 111 +++++++++++++------------- pkg/apis/proto/source/v1/source.proto | 2 +- pkg/sourcer/service.go | 6 +- pkg/sourcer/service_test.go | 6 +- 4 files changed, 63 insertions(+), 62 deletions(-) diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index e17d3b37..8e68737e 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -231,7 +231,8 @@ type ReadResponse struct { // Required field holding the result. Result *ReadResponse_Result `protobuf:"bytes,1,opt,name=result,proto3" json:"result,omitempty"` // Status of the response. Holds the end of transmission flag and the status code. - Status *ReadResponse_Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + Status *ReadResponse_Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *ReadResponse) Reset() { @@ -280,6 +281,13 @@ func (x *ReadResponse) GetStatus() *ReadResponse_Status { return nil } +func (x *ReadResponse) GetHandshake() *Handshake { + if x != nil { + return x.Handshake + } + return nil +} + // AckRequest is the request for acknowledging datum. // It takes a list of offsets to be acknowledged. type AckRequest struct { @@ -750,11 +758,10 @@ type ReadResponse_Status struct { unknownFields protoimpl.UnknownFields // End of transmission flag. - Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` - Code ReadResponse_Status_Code `protobuf:"varint,2,opt,name=code,proto3,enum=source.v1.ReadResponse_Status_Code" json:"code,omitempty"` - Error *ReadResponse_Status_Error `protobuf:"varint,3,opt,name=error,proto3,enum=source.v1.ReadResponse_Status_Error,oneof" json:"error,omitempty"` - Msg *string `protobuf:"bytes,4,opt,name=msg,proto3,oneof" json:"msg,omitempty"` - Handshake *Handshake `protobuf:"bytes,5,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` + Eot bool `protobuf:"varint,1,opt,name=eot,proto3" json:"eot,omitempty"` + Code ReadResponse_Status_Code `protobuf:"varint,2,opt,name=code,proto3,enum=source.v1.ReadResponse_Status_Code" json:"code,omitempty"` + Error *ReadResponse_Status_Error `protobuf:"varint,3,opt,name=error,proto3,enum=source.v1.ReadResponse_Status_Error,oneof" json:"error,omitempty"` + Msg *string `protobuf:"bytes,4,opt,name=msg,proto3,oneof" json:"msg,omitempty"` } func (x *ReadResponse_Status) Reset() { @@ -817,13 +824,6 @@ func (x *ReadResponse_Status) GetMsg() string { return "" } -func (x *ReadResponse_Status) GetHandshake() *Handshake { - if x != nil { - return x.Handshake - } - return nil -} - type AckRequest_Request struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1051,39 +1051,39 @@ var file_pkg_apis_proto_source_v1_source_proto_rawDesc = []byte{ 0x74, 0x75, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, - 0x73, 0x1a, 0x9f, 0x02, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x18, 0x0a, 0x07, - 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, - 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x29, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, - 0x76, 0x31, 0x2e, 0x4f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, - 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, - 0x6b, 0x65, 0x79, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, - 0x12, 0x45, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, - 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, - 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, - 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, - 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, - 0x02, 0x38, 0x01, 0x1a, 0xc7, 0x02, 0x0a, 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, - 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, - 0x12, 0x37, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x23, - 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x43, - 0x6f, 0x64, 0x65, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x3f, 0x0a, 0x05, 0x65, 0x72, 0x72, - 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x24, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x48, 0x00, - 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x88, 0x01, 0x01, 0x12, 0x15, 0x0a, 0x03, 0x6d, 0x73, - 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x88, 0x01, - 0x01, 0x12, 0x37, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x05, + 0x73, 0x12, 0x37, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x02, 0x52, 0x09, 0x68, 0x61, - 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x22, 0x20, 0x0a, 0x04, 0x43, 0x6f, + 0x2e, 0x48, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x9f, 0x02, 0x0a, 0x06, 0x52, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, + 0x29, 0x0a, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x11, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x4f, 0x66, 0x66, 0x73, + 0x65, 0x74, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, + 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x04, 0x20, + 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x45, 0x0a, 0x07, 0x68, 0x65, 0x61, + 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, + 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, + 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x80, 0x02, 0x0a, + 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6f, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x03, 0x65, 0x6f, 0x74, 0x12, 0x37, 0x0a, 0x04, 0x63, 0x6f, 0x64, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2e, 0x43, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x63, 0x6f, + 0x64, 0x65, 0x12, 0x3f, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x0e, 0x32, 0x24, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x75, + 0x73, 0x2e, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, + 0x88, 0x01, 0x01, 0x12, 0x15, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x48, 0x01, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x88, 0x01, 0x01, 0x22, 0x20, 0x0a, 0x04, 0x43, 0x6f, 0x64, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x55, 0x43, 0x43, 0x45, 0x53, 0x53, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x46, 0x41, 0x49, 0x4c, 0x55, 0x52, 0x45, 0x10, 0x01, 0x22, 0x1f, 0x0a, 0x05, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x41, 0x43, 0x4b, 0x45, 0x44, @@ -1197,16 +1197,16 @@ var file_pkg_apis_proto_source_v1_source_proto_depIdxs = []int32{ 2, // 1: source.v1.ReadRequest.handshake:type_name -> source.v1.Handshake 12, // 2: source.v1.ReadResponse.result:type_name -> source.v1.ReadResponse.Result 13, // 3: source.v1.ReadResponse.status:type_name -> source.v1.ReadResponse.Status - 15, // 4: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request - 16, // 5: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result - 17, // 6: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result - 18, // 7: source.v1.PartitionsResponse.result:type_name -> source.v1.PartitionsResponse.Result - 10, // 8: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset - 19, // 9: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp - 14, // 10: source.v1.ReadResponse.Result.headers:type_name -> source.v1.ReadResponse.Result.HeadersEntry - 0, // 11: source.v1.ReadResponse.Status.code:type_name -> source.v1.ReadResponse.Status.Code - 1, // 12: source.v1.ReadResponse.Status.error:type_name -> source.v1.ReadResponse.Status.Error - 2, // 13: source.v1.ReadResponse.Status.handshake:type_name -> source.v1.Handshake + 2, // 4: source.v1.ReadResponse.handshake:type_name -> source.v1.Handshake + 15, // 5: source.v1.AckRequest.request:type_name -> source.v1.AckRequest.Request + 16, // 6: source.v1.AckResponse.result:type_name -> source.v1.AckResponse.Result + 17, // 7: source.v1.PendingResponse.result:type_name -> source.v1.PendingResponse.Result + 18, // 8: source.v1.PartitionsResponse.result:type_name -> source.v1.PartitionsResponse.Result + 10, // 9: source.v1.ReadResponse.Result.offset:type_name -> source.v1.Offset + 19, // 10: source.v1.ReadResponse.Result.event_time:type_name -> google.protobuf.Timestamp + 14, // 11: source.v1.ReadResponse.Result.headers:type_name -> source.v1.ReadResponse.Result.HeadersEntry + 0, // 12: source.v1.ReadResponse.Status.code:type_name -> source.v1.ReadResponse.Status.Code + 1, // 13: source.v1.ReadResponse.Status.error:type_name -> source.v1.ReadResponse.Status.Error 10, // 14: source.v1.AckRequest.Request.offset:type_name -> source.v1.Offset 20, // 15: source.v1.AckResponse.Result.success:type_name -> google.protobuf.Empty 3, // 16: source.v1.Source.ReadFn:input_type -> source.v1.ReadRequest @@ -1426,6 +1426,7 @@ func file_pkg_apis_proto_source_v1_source_proto_init() { } } file_pkg_apis_proto_source_v1_source_proto_msgTypes[1].OneofWrappers = []any{} + file_pkg_apis_proto_source_v1_source_proto_msgTypes[2].OneofWrappers = []any{} file_pkg_apis_proto_source_v1_source_proto_msgTypes[11].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index cc49d307..91e86ead 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -95,13 +95,13 @@ message ReadResponse { Code code = 2; optional Error error = 3; optional string msg = 4; - optional Handshake handshake = 5; } // Required field holding the result. Result result = 1; // Status of the response. Holds the end of transmission flag and the status code. // Status status = 2; + optional Handshake handshake = 3; } /* diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index ff8177df..6dd6a05d 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -65,9 +65,9 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { Status: &sourcepb.ReadResponse_Status{ Eot: false, Code: sourcepb.ReadResponse_Status_SUCCESS, - Handshake: &sourcepb.Handshake{ - Sot: true, - }, + }, + Handshake: &sourcepb.Handshake{ + Sot: true, }, } if err := stream.Send(handshakeResponse); err != nil { diff --git a/pkg/sourcer/service_test.go b/pkg/sourcer/service_test.go index be88368e..cee11cc8 100644 --- a/pkg/sourcer/service_test.go +++ b/pkg/sourcer/service_test.go @@ -167,9 +167,9 @@ func TestService_ReadFn(t *testing.T) { Status: &sourcepb.ReadResponse_Status{ Eot: false, Code: sourcepb.ReadResponse_Status_SUCCESS, - Handshake: &sourcepb.Handshake{ - Sot: true, - }, + }, + Handshake: &sourcepb.Handshake{ + Sot: true, }, }, {