From 301b46b3f1bbf6cc4eccb5b65e787abb0b62086c Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Thu, 26 Sep 2024 12:49:27 +0530 Subject: [PATCH] Handshake before handling messages Signed-off-by: Sreekanth --- go.mod | 2 +- go.sum | 4 +- .../proto/sourcetransform/v1/transform.pb.go | 414 ++++++++++++------ .../proto/sourcetransform/v1/transform.proto | 28 +- pkg/sourcetransformer/service.go | 37 +- pkg/sourcetransformer/service_test.go | 79 +++- 6 files changed, 398 insertions(+), 166 deletions(-) diff --git a/go.mod b/go.mod index 89f343aa..5d75e67a 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( golang.org/x/net v0.29.0 golang.org/x/sync v0.8.0 google.golang.org/grpc v1.66.0 - google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 + google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 google.golang.org/protobuf v1.34.2 ) diff --git a/go.sum b/go.sum index d75c5d4e..1a5ca3a6 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0 h1:rNBFJjBCOgVr9pWD7rs/knKL4FRTKgpZmsRfV214zcA= -google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.3.0/go.mod h1:Dk1tviKTvMCz5tvh7t+fh94dhmQVHuCt2OzJB3CTW9Y= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0 h1:9SxA29VM43MF5Z9dQu694wmY5t8E/Gxr7s+RSxiIDmc= +google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.4.0/go.mod h1:yZOK5zhQMiALmuweVdIVoQPa6eIJyXn2B9g5dJDhqX4= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index 2675f305..e851979e 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -22,24 +22,18 @@ const ( _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) ) -// * -// SourceTransformerRequest represents a request element. -type SourceTransformRequest struct { +// Handshake message between client and server to indicate the start of transmission. +type Handshake struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` - Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` - EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` - Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` - Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` - // This ID is used uniquely identify a transform request - Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` + // Required field indicating the start of transmission. + Sot bool `protobuf:"varint,1,opt,name=sot,proto3" json:"sot,omitempty"` } -func (x *SourceTransformRequest) Reset() { - *x = SourceTransformRequest{} +func (x *Handshake) Reset() { + *x = Handshake{} if protoimpl.UnsafeEnabled { mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[0] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -47,13 +41,13 @@ func (x *SourceTransformRequest) Reset() { } } -func (x *SourceTransformRequest) String() string { +func (x *Handshake) String() string { return protoimpl.X.MessageStringOf(x) } -func (*SourceTransformRequest) ProtoMessage() {} +func (*Handshake) ProtoMessage() {} -func (x *SourceTransformRequest) ProtoReflect() protoreflect.Message { +func (x *Handshake) ProtoReflect() protoreflect.Message { mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[0] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -65,51 +59,73 @@ func (x *SourceTransformRequest) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use SourceTransformRequest.ProtoReflect.Descriptor instead. -func (*SourceTransformRequest) Descriptor() ([]byte, []int) { +// Deprecated: Use Handshake.ProtoReflect.Descriptor instead. +func (*Handshake) Descriptor() ([]byte, []int) { return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{0} } -func (x *SourceTransformRequest) GetKeys() []string { +func (x *Handshake) GetSot() bool { if x != nil { - return x.Keys + return x.Sot } - return nil + return false } -func (x *SourceTransformRequest) GetValue() []byte { - if x != nil { - return x.Value - } - return nil +// * +// SourceTransformerRequest represents a request element. +type SourceTransformRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Request *SourceTransformRequest_Request `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + Handshake *Handshake `protobuf:"bytes,2,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } -func (x *SourceTransformRequest) GetEventTime() *timestamppb.Timestamp { - if x != nil { - return x.EventTime +func (x *SourceTransformRequest) Reset() { + *x = SourceTransformRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } - return nil } -func (x *SourceTransformRequest) GetWatermark() *timestamppb.Timestamp { - if x != nil { - return x.Watermark +func (x *SourceTransformRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SourceTransformRequest) ProtoMessage() {} + +func (x *SourceTransformRequest) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms } - return nil + return mi.MessageOf(x) } -func (x *SourceTransformRequest) GetHeaders() map[string]string { +// Deprecated: Use SourceTransformRequest.ProtoReflect.Descriptor instead. +func (*SourceTransformRequest) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1} +} + +func (x *SourceTransformRequest) GetRequest() *SourceTransformRequest_Request { if x != nil { - return x.Headers + return x.Request } return nil } -func (x *SourceTransformRequest) GetId() string { +func (x *SourceTransformRequest) GetHandshake() *Handshake { if x != nil { - return x.Id + return x.Handshake } - return "" + return nil } // * @@ -122,12 +138,14 @@ type SourceTransformResponse struct { Results []*SourceTransformResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` // This ID is used to refer the responses to the request it corresponds to. Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + // Handshake message between client and server to indicate the start of transmission. + Handshake *Handshake `protobuf:"bytes,3,opt,name=handshake,proto3,oneof" json:"handshake,omitempty"` } func (x *SourceTransformResponse) Reset() { *x = SourceTransformResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -140,7 +158,7 @@ func (x *SourceTransformResponse) String() string { func (*SourceTransformResponse) ProtoMessage() {} func (x *SourceTransformResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -153,7 +171,7 @@ func (x *SourceTransformResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use SourceTransformResponse.ProtoReflect.Descriptor instead. func (*SourceTransformResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1} + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{2} } func (x *SourceTransformResponse) GetResults() []*SourceTransformResponse_Result { @@ -170,6 +188,13 @@ func (x *SourceTransformResponse) GetId() string { return "" } +func (x *SourceTransformResponse) GetHandshake() *Handshake { + if x != nil { + return x.Handshake + } + return nil +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -183,7 +208,7 @@ type ReadyResponse struct { func (x *ReadyResponse) Reset() { *x = ReadyResponse{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[3] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -196,7 +221,7 @@ func (x *ReadyResponse) String() string { func (*ReadyResponse) ProtoMessage() {} func (x *ReadyResponse) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[3] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -209,7 +234,7 @@ func (x *ReadyResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use ReadyResponse.ProtoReflect.Descriptor instead. func (*ReadyResponse) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{2} + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{3} } func (x *ReadyResponse) GetReady() bool { @@ -219,6 +244,94 @@ func (x *ReadyResponse) GetReady() bool { return false } +type SourceTransformRequest_Request struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Keys []string `protobuf:"bytes,1,rep,name=keys,proto3" json:"keys,omitempty"` + Value []byte `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"` + EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` + Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` + Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // This ID is used to uniquely identify a transform request + Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *SourceTransformRequest_Request) Reset() { + *x = SourceTransformRequest_Request{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SourceTransformRequest_Request) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SourceTransformRequest_Request) ProtoMessage() {} + +func (x *SourceTransformRequest_Request) ProtoReflect() protoreflect.Message { + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SourceTransformRequest_Request.ProtoReflect.Descriptor instead. +func (*SourceTransformRequest_Request) Descriptor() ([]byte, []int) { + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *SourceTransformRequest_Request) GetKeys() []string { + if x != nil { + return x.Keys + } + return nil +} + +func (x *SourceTransformRequest_Request) GetValue() []byte { + if x != nil { + return x.Value + } + return nil +} + +func (x *SourceTransformRequest_Request) GetEventTime() *timestamppb.Timestamp { + if x != nil { + return x.EventTime + } + return nil +} + +func (x *SourceTransformRequest_Request) GetWatermark() *timestamppb.Timestamp { + if x != nil { + return x.Watermark + } + return nil +} + +func (x *SourceTransformRequest_Request) GetHeaders() map[string]string { + if x != nil { + return x.Headers + } + return nil +} + +func (x *SourceTransformRequest_Request) GetId() string { + if x != nil { + return x.Id + } + return "" +} + type SourceTransformResponse_Result struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -233,7 +346,7 @@ type SourceTransformResponse_Result struct { func (x *SourceTransformResponse_Result) Reset() { *x = SourceTransformResponse_Result{} if protoimpl.UnsafeEnabled { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[6] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -246,7 +359,7 @@ func (x *SourceTransformResponse_Result) String() string { func (*SourceTransformResponse_Result) ProtoMessage() {} func (x *SourceTransformResponse_Result) ProtoReflect() protoreflect.Message { - mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4] + mi := &file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[6] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -259,7 +372,7 @@ func (x *SourceTransformResponse_Result) ProtoReflect() protoreflect.Message { // Deprecated: Use SourceTransformResponse_Result.ProtoReflect.Descriptor instead. func (*SourceTransformResponse_Result) Descriptor() ([]byte, []int) { - return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{1, 0} + return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP(), []int{2, 0} } func (x *SourceTransformResponse_Result) GetKeys() []string { @@ -301,65 +414,83 @@ var file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, - 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd8, 0x02, 0x0a, 0x16, 0x53, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1d, 0x0a, 0x09, 0x48, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x73, 0x6f, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x03, 0x73, 0x6f, 0x74, 0x22, 0x8e, 0x04, 0x0a, 0x16, 0x53, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x4e, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x42, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, 0x6e, 0x64, + 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, + 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0xd1, 0x02, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, + 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, + 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, 0x6b, + 0x12, 0x5b, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, + 0x0b, 0x32, 0x41, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, + 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, 0x0a, + 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, + 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, + 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x68, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0xcf, 0x02, 0x0a, 0x17, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, + 0x20, 0x03, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, + 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x2e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x02, 0x69, 0x64, 0x12, 0x42, 0x0a, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x48, 0x61, + 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x48, 0x00, 0x52, 0x09, 0x68, 0x61, 0x6e, 0x64, 0x73, + 0x68, 0x61, 0x6b, 0x65, 0x88, 0x01, 0x01, 0x1a, 0x81, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x38, 0x0a, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, - 0x6d, 0x61, 0x72, 0x6b, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, - 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x61, 0x74, 0x65, 0x72, 0x6d, 0x61, 0x72, - 0x6b, 0x12, 0x53, 0x0a, 0x07, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x39, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x22, 0xfd, 0x01, 0x0a, 0x17, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, - 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, - 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, - 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, 0x0e, - 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x81, - 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, - 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, - 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, - 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, - 0x61, 0x6d, 0x70, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, - 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, - 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xcf, 0x01, 0x0a, 0x0f, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x74, 0x0a, - 0x11, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, - 0x46, 0x6e, 0x12, 0x2c, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x2d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, - 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, - 0x01, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, - 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, - 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, - 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, - 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, - 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, - 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, + 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, + 0x68, 0x61, 0x6e, 0x64, 0x73, 0x68, 0x61, 0x6b, 0x65, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, + 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, + 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, + 0x32, 0xcf, 0x01, 0x0a, 0x0f, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x74, 0x0a, 0x11, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, + 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x46, 0x6e, 0x12, 0x2c, 0x2e, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, + 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x07, 0x49, 0x73, + 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x23, 0x2e, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, + 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, + 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -374,31 +505,36 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescGZIP() []byte return file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDescData } -var file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes = make([]protoimpl.MessageInfo, 7) var file_pkg_apis_proto_sourcetransform_v1_transform_proto_goTypes = []any{ - (*SourceTransformRequest)(nil), // 0: sourcetransformer.v1.SourceTransformRequest - (*SourceTransformResponse)(nil), // 1: sourcetransformer.v1.SourceTransformResponse - (*ReadyResponse)(nil), // 2: sourcetransformer.v1.ReadyResponse - nil, // 3: sourcetransformer.v1.SourceTransformRequest.HeadersEntry - (*SourceTransformResponse_Result)(nil), // 4: sourcetransformer.v1.SourceTransformResponse.Result - (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 6: google.protobuf.Empty + (*Handshake)(nil), // 0: sourcetransformer.v1.Handshake + (*SourceTransformRequest)(nil), // 1: sourcetransformer.v1.SourceTransformRequest + (*SourceTransformResponse)(nil), // 2: sourcetransformer.v1.SourceTransformResponse + (*ReadyResponse)(nil), // 3: sourcetransformer.v1.ReadyResponse + (*SourceTransformRequest_Request)(nil), // 4: sourcetransformer.v1.SourceTransformRequest.Request + nil, // 5: sourcetransformer.v1.SourceTransformRequest.Request.HeadersEntry + (*SourceTransformResponse_Result)(nil), // 6: sourcetransformer.v1.SourceTransformResponse.Result + (*timestamppb.Timestamp)(nil), // 7: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 8: google.protobuf.Empty } var file_pkg_apis_proto_sourcetransform_v1_transform_proto_depIdxs = []int32{ - 5, // 0: sourcetransformer.v1.SourceTransformRequest.event_time:type_name -> google.protobuf.Timestamp - 5, // 1: sourcetransformer.v1.SourceTransformRequest.watermark:type_name -> google.protobuf.Timestamp - 3, // 2: sourcetransformer.v1.SourceTransformRequest.headers:type_name -> sourcetransformer.v1.SourceTransformRequest.HeadersEntry - 4, // 3: sourcetransformer.v1.SourceTransformResponse.results:type_name -> sourcetransformer.v1.SourceTransformResponse.Result - 5, // 4: sourcetransformer.v1.SourceTransformResponse.Result.event_time:type_name -> google.protobuf.Timestamp - 0, // 5: sourcetransformer.v1.SourceTransform.SourceTransformFn:input_type -> sourcetransformer.v1.SourceTransformRequest - 6, // 6: sourcetransformer.v1.SourceTransform.IsReady:input_type -> google.protobuf.Empty - 1, // 7: sourcetransformer.v1.SourceTransform.SourceTransformFn:output_type -> sourcetransformer.v1.SourceTransformResponse - 2, // 8: sourcetransformer.v1.SourceTransform.IsReady:output_type -> sourcetransformer.v1.ReadyResponse - 7, // [7:9] is the sub-list for method output_type - 5, // [5:7] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 4, // 0: sourcetransformer.v1.SourceTransformRequest.request:type_name -> sourcetransformer.v1.SourceTransformRequest.Request + 0, // 1: sourcetransformer.v1.SourceTransformRequest.handshake:type_name -> sourcetransformer.v1.Handshake + 6, // 2: sourcetransformer.v1.SourceTransformResponse.results:type_name -> sourcetransformer.v1.SourceTransformResponse.Result + 0, // 3: sourcetransformer.v1.SourceTransformResponse.handshake:type_name -> sourcetransformer.v1.Handshake + 7, // 4: sourcetransformer.v1.SourceTransformRequest.Request.event_time:type_name -> google.protobuf.Timestamp + 7, // 5: sourcetransformer.v1.SourceTransformRequest.Request.watermark:type_name -> google.protobuf.Timestamp + 5, // 6: sourcetransformer.v1.SourceTransformRequest.Request.headers:type_name -> sourcetransformer.v1.SourceTransformRequest.Request.HeadersEntry + 7, // 7: sourcetransformer.v1.SourceTransformResponse.Result.event_time:type_name -> google.protobuf.Timestamp + 1, // 8: sourcetransformer.v1.SourceTransform.SourceTransformFn:input_type -> sourcetransformer.v1.SourceTransformRequest + 8, // 9: sourcetransformer.v1.SourceTransform.IsReady:input_type -> google.protobuf.Empty + 2, // 10: sourcetransformer.v1.SourceTransform.SourceTransformFn:output_type -> sourcetransformer.v1.SourceTransformResponse + 3, // 11: sourcetransformer.v1.SourceTransform.IsReady:output_type -> sourcetransformer.v1.ReadyResponse + 10, // [10:12] is the sub-list for method output_type + 8, // [8:10] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() } @@ -408,7 +544,7 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } if !protoimpl.UnsafeEnabled { file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*SourceTransformRequest); i { + switch v := v.(*Handshake); i { case 0: return &v.state case 1: @@ -420,7 +556,7 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*SourceTransformResponse); i { + switch v := v.(*SourceTransformRequest); i { case 0: return &v.state case 1: @@ -432,6 +568,18 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*SourceTransformResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[3].Exporter = func(v any, i int) any { switch v := v.(*ReadyResponse); i { case 0: return &v.state @@ -444,6 +592,18 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*SourceTransformRequest_Request); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[6].Exporter = func(v any, i int) any { switch v := v.(*SourceTransformResponse_Result); i { case 0: return &v.state @@ -456,13 +616,15 @@ func file_pkg_apis_proto_sourcetransform_v1_transform_proto_init() { } } } + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[1].OneofWrappers = []any{} + file_pkg_apis_proto_sourcetransform_v1_transform_proto_msgTypes[2].OneofWrappers = []any{} type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc, NumEnums: 0, - NumMessages: 5, + NumMessages: 7, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/apis/proto/sourcetransform/v1/transform.proto b/pkg/apis/proto/sourcetransform/v1/transform.proto index 0ae09a7c..7eec50c9 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.proto +++ b/pkg/apis/proto/sourcetransform/v1/transform.proto @@ -17,17 +17,29 @@ service SourceTransform { rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } +/* + * Handshake message between client and server to indicate the start of transmission. + */ + message Handshake { + // Required field indicating the start of transmission. + bool sot = 1; +} + /** * SourceTransformerRequest represents a request element. */ message SourceTransformRequest { - repeated string keys = 1; - bytes value = 2; - google.protobuf.Timestamp event_time = 3; - google.protobuf.Timestamp watermark = 4; - map headers = 5; - // This ID is used uniquely identify a transform request - string id = 6; + message Request { + repeated string keys = 1; + bytes value = 2; + google.protobuf.Timestamp event_time = 3; + google.protobuf.Timestamp watermark = 4; + map headers = 5; + // This ID is used to uniquely identify a transform request + string id = 6; + } + Request request = 1; + optional Handshake handshake = 2; } /** @@ -43,6 +55,8 @@ message SourceTransformResponse { repeated Result results = 1; // This ID is used to refer the responses to the request it corresponds to. string id = 2; + // Handshake message between client and server to indicate the start of transmission. + optional Handshake handshake = 3; } /** diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 5dc40479..86d945ea 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -3,13 +3,15 @@ package sourcetransformer import ( "context" "errors" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" + "fmt" "io" "log" "runtime/debug" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -48,6 +50,24 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn } }() + req, err := stream.Recv() + if err != nil { + return fmt.Errorf("reading handshake message from stream: %w", err) + } + + if req.Handshake == nil || !req.Handshake.Sot { + return fmt.Errorf("invalid handshake message: %+v", req) + } + + handshakeResponse := &v1.SourceTransformResponse{ + Handshake: &v1.Handshake{ + Sot: true, + }, + } + if err := stream.Send(handshakeResponse); err != nil { + return fmt.Errorf("sending handshake response to client over gRPC stream: %w", err) + } + ctx := stream.Context() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -77,9 +97,14 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn } return err } + if d.Handshake != nil { + return fmt.Errorf("expected source transform messages, received handshake message") + } + + req := d.Request grp.Go(func() error { - var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) - messageTs := fs.Transformer.Transform(grpCtx, d.GetKeys(), hd) + var hd = NewHandlerDatum(req.GetValue(), req.EventTime.AsTime(), req.Watermark.AsTime(), req.Headers) + messageTs := fs.Transformer.Transform(grpCtx, req.GetKeys(), hd) var results []*v1.SourceTransformResponse_Result for _, m := range messageTs.Items() { results = append(results, &v1.SourceTransformResponse_Result{ @@ -91,7 +116,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn } resp := &v1.SourceTransformResponse{ Results: results, - Id: d.GetId(), + Id: req.GetId(), } select { case senderCh <- resp: diff --git a/pkg/sourcetransformer/service_test.go b/pkg/sourcetransformer/service_test.go index 3e298875..b9f89167 100644 --- a/pkg/sourcetransformer/service_test.go +++ b/pkg/sourcetransformer/service_test.go @@ -4,13 +4,15 @@ import ( "context" "errors" "fmt" + "net" + "testing" + "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/test/bufconn" - "net" - "testing" - "time" proto "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" "google.golang.org/protobuf/types/known/timestamppb" @@ -90,10 +92,12 @@ func TestService_sourceTransformFn(t *testing.T) { args: args{ ctx: context.Background(), d: &proto.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, want: &proto.SourceTransformResponse{ @@ -115,10 +119,12 @@ func TestService_sourceTransformFn(t *testing.T) { args: args{ ctx: context.Background(), d: &proto.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, want: &proto.SourceTransformResponse{ @@ -138,10 +144,12 @@ func TestService_sourceTransformFn(t *testing.T) { args: args{ ctx: context.Background(), d: &proto.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(`test`), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(`test`), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, }, }, want: &proto.SourceTransformResponse{ @@ -170,6 +178,8 @@ func TestService_sourceTransformFn(t *testing.T) { stream, err := client.SourceTransformFn(context.Background()) assert.NoError(t, err, "Creating stream") + doHandshake(t, stream) + err = stream.Send(tt.args.d) assert.NoError(t, err, "Sending message over the stream") @@ -181,6 +191,23 @@ func TestService_sourceTransformFn(t *testing.T) { } } +func doHandshake(t *testing.T, stream proto.SourceTransform_SourceTransformFnClient) { + t.Helper() + handshakeReq := &proto.SourceTransformRequest{ + Handshake: &proto.Handshake{Sot: true}, + } + err := stream.Send(handshakeReq) + require.NoError(t, err, "Sending handshake request to the stream") + + handshakeResp, err := stream.Recv() + require.NoError(t, err, "Receiving handshake response") + + require.Empty(t, handshakeResp.Results, "Invalid handshake response") + require.Empty(t, handshakeResp.Id, "Invalid handshake response") + require.NotNil(t, handshakeResp.Handshake, "Invalid handshake response") + require.True(t, handshakeResp.Handshake.Sot, "Invalid handshake response") +} + func TestService_SourceTransformFn_Multiple_Messages(t *testing.T) { svc := &Service{ Transformer: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages { @@ -194,21 +221,25 @@ func TestService_SourceTransformFn_Multiple_Messages(t *testing.T) { client := proto.NewSourceTransformClient(conn) stream, err := client.SourceTransformFn(context.Background()) - assert.NoError(t, err, "Creating stream") + require.NoError(t, err, "Creating stream") + + doHandshake(t, stream) const msgCount = 10 for i := 0; i < msgCount; i++ { msg := proto.SourceTransformRequest{ - Keys: []string{"client"}, - Value: []byte(fmt.Sprintf("test_%d", i)), - EventTime: timestamppb.New(time.Time{}), - Watermark: timestamppb.New(time.Time{}), + Request: &proto.SourceTransformRequest_Request{ + Keys: []string{"client"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + }, } err = stream.Send(&msg) - assert.NoError(t, err, "Sending message over the stream") + require.NoError(t, err, "Sending message over the stream") } err = stream.CloseSend() - assert.NoError(t, err, "Closing the send direction of the stream") + require.NoError(t, err, "Closing the send direction of the stream") expectedResults := make([][]*proto.SourceTransformResponse_Result, msgCount) for i := 0; i < msgCount; i++ { @@ -224,8 +255,8 @@ func TestService_SourceTransformFn_Multiple_Messages(t *testing.T) { results := make([][]*proto.SourceTransformResponse_Result, msgCount) for i := 0; i < msgCount; i++ { got, err := stream.Recv() - assert.NoError(t, err, "Receiving message from the stream") + require.NoError(t, err, "Receiving message from the stream") results[i] = got.Results } - assert.ElementsMatch(t, results, expectedResults) + require.ElementsMatch(t, results, expectedResults) }