diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index 479247d9..2675f305 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -34,6 +34,8 @@ type SourceTransformRequest struct { EventTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=event_time,json=eventTime,proto3" json:"event_time,omitempty"` Watermark *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=watermark,proto3" json:"watermark,omitempty"` Headers map[string]string `protobuf:"bytes,5,rep,name=headers,proto3" json:"headers,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + // This ID is used uniquely identify a transform request + Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` } func (x *SourceTransformRequest) Reset() { @@ -103,6 +105,13 @@ func (x *SourceTransformRequest) GetHeaders() map[string]string { return nil } +func (x *SourceTransformRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + // * // SourceTransformerResponse represents a response element. type SourceTransformResponse struct { @@ -111,6 +120,8 @@ type SourceTransformResponse struct { unknownFields protoimpl.UnknownFields Results []*SourceTransformResponse_Result `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + // This ID is used to refer the responses to the request it corresponds to. + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` } func (x *SourceTransformResponse) Reset() { @@ -152,6 +163,13 @@ func (x *SourceTransformResponse) GetResults() []*SourceTransformResponse_Result return nil } +func (x *SourceTransformResponse) GetId() string { + if x != nil { + return x.Id + } + return "" +} + // * // ReadyResponse is the health check result. type ReadyResponse struct { @@ -283,7 +301,7 @@ var file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc = []byte{ 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, - 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xc8, 0x02, 0x0a, 0x16, 0x53, 0x6f, 0x75, 0x72, + 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd8, 0x02, 0x0a, 0x16, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, @@ -300,17 +318,19 @@ var file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc = []byte{ 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x07, 0x68, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, + 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x3a, 0x0a, 0x0c, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, - 0x38, 0x01, 0x22, 0xed, 0x01, 0x0a, 0x17, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, + 0x38, 0x01, 0x22, 0xfd, 0x01, 0x0a, 0x17, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x4e, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x34, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x52, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x1a, 0x81, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x1a, 0x81, 0x01, 0x0a, 0x06, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x65, 0x79, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76, 0x61, @@ -321,25 +341,25 @@ var file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc = []byte{ 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xcb, 0x01, 0x0a, 0x0f, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x70, 0x0a, + 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xcf, 0x01, 0x0a, 0x0f, 0x53, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x74, 0x0a, 0x11, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x46, 0x6e, 0x12, 0x2c, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x46, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, - 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, - 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, + 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/apis/proto/sourcetransform/v1/transform.proto b/pkg/apis/proto/sourcetransform/v1/transform.proto index 8885db08..0ae09a7c 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.proto +++ b/pkg/apis/proto/sourcetransform/v1/transform.proto @@ -11,7 +11,7 @@ service SourceTransform { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse); + rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); @@ -26,6 +26,8 @@ message SourceTransformRequest { google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; map headers = 5; + // This ID is used uniquely identify a transform request + string id = 6; } /** @@ -39,6 +41,8 @@ message SourceTransformResponse { repeated string tags = 4; } repeated Result results = 1; + // This ID is used to refer the responses to the request it corresponds to. + string id = 2; } /** diff --git a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go index 9fd6bd4a..52995c34 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go @@ -31,7 +31,7 @@ type SourceTransformClient interface { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - SourceTransformFn(ctx context.Context, in *SourceTransformRequest, opts ...grpc.CallOption) (*SourceTransformResponse, error) + SourceTransformFn(ctx context.Context, opts ...grpc.CallOption) (SourceTransform_SourceTransformFnClient, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) } @@ -44,14 +44,36 @@ func NewSourceTransformClient(cc grpc.ClientConnInterface) SourceTransformClient return &sourceTransformClient{cc} } -func (c *sourceTransformClient) SourceTransformFn(ctx context.Context, in *SourceTransformRequest, opts ...grpc.CallOption) (*SourceTransformResponse, error) { +func (c *sourceTransformClient) SourceTransformFn(ctx context.Context, opts ...grpc.CallOption) (SourceTransform_SourceTransformFnClient, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) - out := new(SourceTransformResponse) - err := c.cc.Invoke(ctx, SourceTransform_SourceTransformFn_FullMethodName, in, out, cOpts...) + stream, err := c.cc.NewStream(ctx, &SourceTransform_ServiceDesc.Streams[0], SourceTransform_SourceTransformFn_FullMethodName, cOpts...) if err != nil { return nil, err } - return out, nil + x := &sourceTransformSourceTransformFnClient{ClientStream: stream} + return x, nil +} + +type SourceTransform_SourceTransformFnClient interface { + Send(*SourceTransformRequest) error + Recv() (*SourceTransformResponse, error) + grpc.ClientStream +} + +type sourceTransformSourceTransformFnClient struct { + grpc.ClientStream +} + +func (x *sourceTransformSourceTransformFnClient) Send(m *SourceTransformRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sourceTransformSourceTransformFnClient) Recv() (*SourceTransformResponse, error) { + m := new(SourceTransformResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } func (c *sourceTransformClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { @@ -71,7 +93,7 @@ type SourceTransformServer interface { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - SourceTransformFn(context.Context, *SourceTransformRequest) (*SourceTransformResponse, error) + SourceTransformFn(SourceTransform_SourceTransformFnServer) error // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) mustEmbedUnimplementedSourceTransformServer() @@ -81,8 +103,8 @@ type SourceTransformServer interface { type UnimplementedSourceTransformServer struct { } -func (UnimplementedSourceTransformServer) SourceTransformFn(context.Context, *SourceTransformRequest) (*SourceTransformResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SourceTransformFn not implemented") +func (UnimplementedSourceTransformServer) SourceTransformFn(SourceTransform_SourceTransformFnServer) error { + return status.Errorf(codes.Unimplemented, "method SourceTransformFn not implemented") } func (UnimplementedSourceTransformServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") @@ -100,22 +122,30 @@ func RegisterSourceTransformServer(s grpc.ServiceRegistrar, srv SourceTransformS s.RegisterService(&SourceTransform_ServiceDesc, srv) } -func _SourceTransform_SourceTransformFn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SourceTransformRequest) - if err := dec(in); err != nil { +func _SourceTransform_SourceTransformFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SourceTransformServer).SourceTransformFn(&sourceTransformSourceTransformFnServer{ServerStream: stream}) +} + +type SourceTransform_SourceTransformFnServer interface { + Send(*SourceTransformResponse) error + Recv() (*SourceTransformRequest, error) + grpc.ServerStream +} + +type sourceTransformSourceTransformFnServer struct { + grpc.ServerStream +} + +func (x *sourceTransformSourceTransformFnServer) Send(m *SourceTransformResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sourceTransformSourceTransformFnServer) Recv() (*SourceTransformRequest, error) { + m := new(SourceTransformRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(SourceTransformServer).SourceTransformFn(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: SourceTransform_SourceTransformFn_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SourceTransformServer).SourceTransformFn(ctx, req.(*SourceTransformRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil } func _SourceTransform_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { @@ -143,15 +173,18 @@ var SourceTransform_ServiceDesc = grpc.ServiceDesc{ ServiceName: "sourcetransformer.v1.SourceTransform", HandlerType: (*SourceTransformServer)(nil), Methods: []grpc.MethodDesc{ - { - MethodName: "SourceTransformFn", - Handler: _SourceTransform_SourceTransformFn_Handler, - }, { MethodName: "IsReady", Handler: _SourceTransform_IsReady_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SourceTransformFn", + Handler: _SourceTransform_SourceTransformFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "pkg/apis/proto/sourcetransform/v1/transform.proto", } diff --git a/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go b/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go index fbc66555..8b347d06 100644 --- a/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go +++ b/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go @@ -58,21 +58,21 @@ func (mr *MockSourceTransformClientMockRecorder) IsReady(arg0, arg1 interface{}, } // SourceTransformFn mocks base method. -func (m *MockSourceTransformClient) SourceTransformFn(arg0 context.Context, arg1 *v1.SourceTransformRequest, arg2 ...grpc.CallOption) (*v1.SourceTransformResponse, error) { +func (m *MockSourceTransformClient) SourceTransformFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.SourceTransform_SourceTransformFnClient, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { + varargs := []interface{}{arg0} + for _, a := range arg1 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "SourceTransformFn", varargs...) - ret0, _ := ret[0].(*v1.SourceTransformResponse) + ret0, _ := ret[0].(v1.SourceTransform_SourceTransformFnClient) ret1, _ := ret[1].(error) return ret0, ret1 } // SourceTransformFn indicates an expected call of SourceTransformFn. -func (mr *MockSourceTransformClientMockRecorder) SourceTransformFn(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockSourceTransformClientMockRecorder) SourceTransformFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) + varargs := append([]interface{}{arg0}, arg1...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SourceTransformFn", reflect.TypeOf((*MockSourceTransformClient)(nil).SourceTransformFn), varargs...) } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 1651f825..5dc40479 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -2,6 +2,11 @@ package sourcetransformer import ( "context" + "errors" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" "log" "runtime/debug" @@ -34,7 +39,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. -func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformRequest) (*v1.SourceTransformResponse, error) { +func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFnServer) error { // handle panic defer func() { if r := recover(); r != nil { @@ -42,19 +47,64 @@ func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformR fs.shutdownCh <- struct{}{} } }() - var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) - messageTs := fs.Transformer.Transform(ctx, d.GetKeys(), hd) - var results []*v1.SourceTransformResponse_Result - for _, m := range messageTs.Items() { - results = append(results, &v1.SourceTransformResponse_Result{ - EventTime: timestamppb.New(m.EventTime()), - Keys: m.Keys(), - Value: m.Value(), - Tags: m.Tags(), + + ctx := stream.Context() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + grp, grpCtx := errgroup.WithContext(ctx) + + senderCh := make(chan *v1.SourceTransformResponse, 500) // TODO: identify the right buffer size + // goroutine to send the response to the stream + grp.Go(func() error { + for { + select { + case <-grpCtx.Done(): + return grpCtx.Err() + default: + } + if err := stream.Send(<-senderCh); err != nil { + cancel() + return err + } + } + }) + + for { + d, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + grp.Go(func() error { + var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) + messageTs := fs.Transformer.Transform(grpCtx, d.GetKeys(), hd) + var results []*v1.SourceTransformResponse_Result + for _, m := range messageTs.Items() { + results = append(results, &v1.SourceTransformResponse_Result{ + EventTime: timestamppb.New(m.EventTime()), + Keys: m.Keys(), + Value: m.Value(), + Tags: m.Tags(), + }) + } + resp := &v1.SourceTransformResponse{ + Results: results, + Id: d.GetId(), + } + select { + case senderCh <- resp: + case <-grpCtx.Done(): + return grpCtx.Err() + } + return nil }) } - responseList := &v1.SourceTransformResponse{ - Results: results, + + if err := grp.Wait(); err != nil { + statusErr := status.Errorf(codes.Internal, err.Error()) + return statusErr } - return responseList, nil + return nil } diff --git a/pkg/sourcetransformer/service_test.go b/pkg/sourcetransformer/service_test.go index d57d937e..3e298875 100644 --- a/pkg/sourcetransformer/service_test.go +++ b/pkg/sourcetransformer/service_test.go @@ -2,28 +2,84 @@ package sourcetransformer import ( "context" - "reflect" + "errors" + "fmt" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" + "net" "testing" "time" + proto "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" "google.golang.org/protobuf/types/known/timestamppb" - - stpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" ) +func newServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn { + lis := bufconn.Listen(1024 * 1024) + t.Cleanup(func() { + _ = lis.Close() + }) + + server := grpc.NewServer() + t.Cleanup(func() { + server.Stop() + }) + + register(server) + + errChan := make(chan error, 1) + go func() { + // t.Fatal should only be called from the goroutine running the test + if err := server.Serve(lis); err != nil { + errChan <- err + } + }() + + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(func() { + cancel() + }) + + conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { + _ = conn.Close() + }) + if err != nil { + t.Fatalf("Creating new gRPC client connection: %v", err) + } + + var grpcServerErr error + select { + case grpcServerErr = <-errChan: + case <-time.After(500 * time.Millisecond): + grpcServerErr = errors.New("gRPC server didn't start in 500ms") + } + if err != nil { + t.Fatalf("Failed to start gRPC server: %v", grpcServerErr) + } + + return conn +} + +var testTime = time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local) + func TestService_sourceTransformFn(t *testing.T) { type args struct { ctx context.Context - d *stpb.SourceTransformRequest + d *proto.SourceTransformRequest } - testTime := time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local) tests := []struct { name string handler SourceTransformer args args - want *stpb.SourceTransformResponse - wantErr bool + want *proto.SourceTransformResponse }{ { name: "sourceTransform_fn_forward_msg", @@ -33,15 +89,15 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ + d: &proto.SourceTransformRequest{ Keys: []string{"client"}, Value: []byte(`test`), EventTime: timestamppb.New(time.Time{}), Watermark: timestamppb.New(time.Time{}), }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Keys: []string{"client_test"}, @@ -49,7 +105,6 @@ func TestService_sourceTransformFn(t *testing.T) { }, }, }, - wantErr: false, }, { name: "sourceTransform_fn_forward_msg_forward_to_all", @@ -59,22 +114,21 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ + d: &proto.SourceTransformRequest{ Keys: []string{"client"}, Value: []byte(`test`), EventTime: timestamppb.New(time.Time{}), Watermark: timestamppb.New(time.Time{}), }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Value: []byte(`test`), }, }, }, - wantErr: false, }, { name: "sourceTransform_fn_forward_msg_drop_msg", @@ -83,42 +137,95 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ + d: &proto.SourceTransformRequest{ Keys: []string{"client"}, Value: []byte(`test`), EventTime: timestamppb.New(time.Time{}), Watermark: timestamppb.New(time.Time{}), }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Tags: []string{DROP}, - Value: []byte{}, + Value: nil, }, }, }, - wantErr: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fs := &Service{ + svc := &Service{ Transformer: tt.handler, } - // here's a trick for testing: - // because we are not using gRPC, we directly set a new incoming ctx - // instead of the regular outgoing context in the real gRPC connection. - ctx := context.Background() - got, err := fs.SourceTransformFn(ctx, tt.args.d) - if (err != nil) != tt.wantErr { - t.Errorf("SourceTransformFn() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("SourceTransformFn() got = %v, want %v", got, tt.want) - } + + conn := newServer(t, func(server *grpc.Server) { + proto.RegisterSourceTransformServer(server, svc) + }) + + client := proto.NewSourceTransformClient(conn) + stream, err := client.SourceTransformFn(context.Background()) + assert.NoError(t, err, "Creating stream") + + err = stream.Send(tt.args.d) + assert.NoError(t, err, "Sending message over the stream") + + got, err := stream.Recv() + assert.NoError(t, err, "Receiving message from the stream") + + assert.Equal(t, got.Results, tt.want.Results) }) } } + +func TestService_SourceTransformFn_Multiple_Messages(t *testing.T) { + svc := &Service{ + Transformer: SourceTransformFunc(func(ctx context.Context, keys []string, datum Datum) Messages { + msg := datum.Value() + return MessagesBuilder().Append(NewMessage(msg, testTime).WithKeys([]string{keys[0] + "_test"})) + }), + } + conn := newServer(t, func(server *grpc.Server) { + proto.RegisterSourceTransformServer(server, svc) + }) + + client := proto.NewSourceTransformClient(conn) + stream, err := client.SourceTransformFn(context.Background()) + assert.NoError(t, err, "Creating stream") + + const msgCount = 10 + for i := 0; i < msgCount; i++ { + msg := proto.SourceTransformRequest{ + Keys: []string{"client"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + EventTime: timestamppb.New(time.Time{}), + Watermark: timestamppb.New(time.Time{}), + } + err = stream.Send(&msg) + assert.NoError(t, err, "Sending message over the stream") + } + err = stream.CloseSend() + assert.NoError(t, err, "Closing the send direction of the stream") + + expectedResults := make([][]*proto.SourceTransformResponse_Result, msgCount) + for i := 0; i < msgCount; i++ { + expectedResults[i] = []*proto.SourceTransformResponse_Result{ + { + EventTime: timestamppb.New(testTime), + Keys: []string{"client_test"}, + Value: []byte(fmt.Sprintf("test_%d", i)), + }, + } + } + + results := make([][]*proto.SourceTransformResponse_Result, msgCount) + for i := 0; i < msgCount; i++ { + got, err := stream.Recv() + assert.NoError(t, err, "Receiving message from the stream") + results[i] = got.Results + } + assert.ElementsMatch(t, results, expectedResults) +}