From 2c5ec4727f644e549327d4e790ccdec7fbc97e46 Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Thu, 12 Sep 2024 19:56:35 +0530 Subject: [PATCH] feat: Use bidirectional streaming for Source Transformer Signed-off-by: Sreekanth --- pkg/apis/proto/batchmap/v1/batchmap.pb.go | 4 +- .../proto/batchmap/v1/batchmap_grpc.pb.go | 15 ++- pkg/apis/proto/map/v1/map.pb.go | 4 +- pkg/apis/proto/map/v1/map_grpc.pb.go | 17 ++- pkg/apis/proto/mapstream/v1/mapstream.pb.go | 4 +- .../proto/mapstream/v1/mapstream_grpc.pb.go | 15 ++- pkg/apis/proto/reduce/v1/reduce.pb.go | 4 +- pkg/apis/proto/reduce/v1/reduce_grpc.pb.go | 15 ++- .../sessionreduce/v1/sessionreduce.pb.go | 4 +- .../sessionreduce/v1/sessionreduce_grpc.pb.go | 15 ++- pkg/apis/proto/sideinput/v1/sideinput.pb.go | 4 +- .../proto/sideinput/v1/sideinput_grpc.pb.go | 17 ++- pkg/apis/proto/sink/v1/sink.pb.go | 4 +- pkg/apis/proto/sink/v1/sink_grpc.pb.go | 15 ++- pkg/apis/proto/source/v1/source.pb.go | 4 +- pkg/apis/proto/source/v1/source_grpc.pb.go | 30 +++-- .../proto/sourcetransform/v1/transform.pb.go | 30 ++--- .../proto/sourcetransform/v1/transform.proto | 2 +- .../sourcetransform/v1/transform_grpc.pb.go | 100 +++++++++++----- .../v1/transformmock/transformmock.go | 12 +- pkg/sourcetransformer/service.go | 75 +++++++++--- pkg/sourcetransformer/service_test.go | 107 ++++++++++++------ 22 files changed, 335 insertions(+), 162 deletions(-) diff --git a/pkg/apis/proto/batchmap/v1/batchmap.pb.go b/pkg/apis/proto/batchmap/v1/batchmap.pb.go index 1d34241c..10cbcf00 100644 --- a/pkg/apis/proto/batchmap/v1/batchmap.pb.go +++ b/pkg/apis/proto/batchmap/v1/batchmap.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.21.12 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/batchmap/v1/batchmap.proto package v1 diff --git a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go index 11abd087..37f8245e 100644 --- a/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go +++ b/pkg/apis/proto/batchmap/v1/batchmap_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v3.21.12 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/batchmap/v1/batchmap.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + BatchMap_IsReady_FullMethodName = "/batchmap.v1.BatchMap/IsReady" + BatchMap_BatchMapFn_FullMethodName = "/batchmap.v1.BatchMap/BatchMapFn" +) + // BatchMapClient is the client API for BatchMap service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -41,7 +46,7 @@ func NewBatchMapClient(cc grpc.ClientConnInterface) BatchMapClient { func (c *batchMapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/batchmap.v1.BatchMap/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, BatchMap_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -49,7 +54,7 @@ func (c *batchMapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts .. } func (c *batchMapClient) BatchMapFn(ctx context.Context, opts ...grpc.CallOption) (BatchMap_BatchMapFnClient, error) { - stream, err := c.cc.NewStream(ctx, &BatchMap_ServiceDesc.Streams[0], "/batchmap.v1.BatchMap/BatchMapFn", opts...) + stream, err := c.cc.NewStream(ctx, &BatchMap_ServiceDesc.Streams[0], BatchMap_BatchMapFn_FullMethodName, opts...) if err != nil { return nil, err } @@ -125,7 +130,7 @@ func _BatchMap_IsReady_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/batchmap.v1.BatchMap/IsReady", + FullMethod: BatchMap_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(BatchMapServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/map/v1/map.pb.go b/pkg/apis/proto/map/v1/map.pb.go index 58f978de..3637d4f7 100644 --- a/pkg/apis/proto/map/v1/map.pb.go +++ b/pkg/apis/proto/map/v1/map.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/map/v1/map.proto package v1 diff --git a/pkg/apis/proto/map/v1/map_grpc.pb.go b/pkg/apis/proto/map/v1/map_grpc.pb.go index d3844348..e37b30c5 100644 --- a/pkg/apis/proto/map/v1/map_grpc.pb.go +++ b/pkg/apis/proto/map/v1/map_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/map/v1/map.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + Map_MapFn_FullMethodName = "/map.v1.Map/MapFn" + Map_IsReady_FullMethodName = "/map.v1.Map/IsReady" +) + // MapClient is the client API for Map service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -39,7 +44,7 @@ func NewMapClient(cc grpc.ClientConnInterface) MapClient { func (c *mapClient) MapFn(ctx context.Context, in *MapRequest, opts ...grpc.CallOption) (*MapResponse, error) { out := new(MapResponse) - err := c.cc.Invoke(ctx, "/map.v1.Map/MapFn", in, out, opts...) + err := c.cc.Invoke(ctx, Map_MapFn_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -48,7 +53,7 @@ func (c *mapClient) MapFn(ctx context.Context, in *MapRequest, opts ...grpc.Call func (c *mapClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/map.v1.Map/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, Map_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -99,7 +104,7 @@ func _Map_MapFn_Handler(srv interface{}, ctx context.Context, dec func(interface } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/map.v1.Map/MapFn", + FullMethod: Map_MapFn_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(MapServer).MapFn(ctx, req.(*MapRequest)) @@ -117,7 +122,7 @@ func _Map_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interfa } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/map.v1.Map/IsReady", + FullMethod: Map_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(MapServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/mapstream/v1/mapstream.pb.go b/pkg/apis/proto/mapstream/v1/mapstream.pb.go index 6a276ae2..ba793fa4 100644 --- a/pkg/apis/proto/mapstream/v1/mapstream.pb.go +++ b/pkg/apis/proto/mapstream/v1/mapstream.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/mapstream/v1/mapstream.proto package v1 diff --git a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go index e8927dc5..fc97b288 100644 --- a/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go +++ b/pkg/apis/proto/mapstream/v1/mapstream_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/mapstream/v1/mapstream.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + MapStream_MapStreamFn_FullMethodName = "/mapstream.v1.MapStream/MapStreamFn" + MapStream_IsReady_FullMethodName = "/mapstream.v1.MapStream/IsReady" +) + // MapStreamClient is the client API for MapStream service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -38,7 +43,7 @@ func NewMapStreamClient(cc grpc.ClientConnInterface) MapStreamClient { } func (c *mapStreamClient) MapStreamFn(ctx context.Context, in *MapStreamRequest, opts ...grpc.CallOption) (MapStream_MapStreamFnClient, error) { - stream, err := c.cc.NewStream(ctx, &MapStream_ServiceDesc.Streams[0], "/mapstream.v1.MapStream/MapStreamFn", opts...) + stream, err := c.cc.NewStream(ctx, &MapStream_ServiceDesc.Streams[0], MapStream_MapStreamFn_FullMethodName, opts...) if err != nil { return nil, err } @@ -71,7 +76,7 @@ func (x *mapStreamMapStreamFnClient) Recv() (*MapStreamResponse, error) { func (c *mapStreamClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/mapstream.v1.MapStream/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, MapStream_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -143,7 +148,7 @@ func _MapStream_IsReady_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/mapstream.v1.MapStream/IsReady", + FullMethod: MapStream_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(MapStreamServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/reduce/v1/reduce.pb.go b/pkg/apis/proto/reduce/v1/reduce.pb.go index 597d4c7c..ed72e255 100644 --- a/pkg/apis/proto/reduce/v1/reduce.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 diff --git a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go index afe493d1..ce7e8940 100644 --- a/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go +++ b/pkg/apis/proto/reduce/v1/reduce_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/reduce/v1/reduce.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + Reduce_ReduceFn_FullMethodName = "/reduce.v1.Reduce/ReduceFn" + Reduce_IsReady_FullMethodName = "/reduce.v1.Reduce/IsReady" +) + // ReduceClient is the client API for Reduce service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -38,7 +43,7 @@ func NewReduceClient(cc grpc.ClientConnInterface) ReduceClient { } func (c *reduceClient) ReduceFn(ctx context.Context, opts ...grpc.CallOption) (Reduce_ReduceFnClient, error) { - stream, err := c.cc.NewStream(ctx, &Reduce_ServiceDesc.Streams[0], "/reduce.v1.Reduce/ReduceFn", opts...) + stream, err := c.cc.NewStream(ctx, &Reduce_ServiceDesc.Streams[0], Reduce_ReduceFn_FullMethodName, opts...) if err != nil { return nil, err } @@ -70,7 +75,7 @@ func (x *reduceReduceFnClient) Recv() (*ReduceResponse, error) { func (c *reduceClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/reduce.v1.Reduce/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, Reduce_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -147,7 +152,7 @@ func _Reduce_IsReady_Handler(srv interface{}, ctx context.Context, dec func(inte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/reduce.v1.Reduce/IsReady", + FullMethod: Reduce_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(ReduceServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go b/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go index fe19a9ae..7633e708 100644 --- a/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go +++ b/pkg/apis/proto/sessionreduce/v1/sessionreduce.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/sessionreduce/v1/sessionreduce.proto package v1 diff --git a/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go b/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go index 51b7bd6c..eb0abe46 100644 --- a/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go +++ b/pkg/apis/proto/sessionreduce/v1/sessionreduce_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/sessionreduce/v1/sessionreduce.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + SessionReduce_SessionReduceFn_FullMethodName = "/sessionreduce.v1.SessionReduce/SessionReduceFn" + SessionReduce_IsReady_FullMethodName = "/sessionreduce.v1.SessionReduce/IsReady" +) + // SessionReduceClient is the client API for SessionReduce service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -38,7 +43,7 @@ func NewSessionReduceClient(cc grpc.ClientConnInterface) SessionReduceClient { } func (c *sessionReduceClient) SessionReduceFn(ctx context.Context, opts ...grpc.CallOption) (SessionReduce_SessionReduceFnClient, error) { - stream, err := c.cc.NewStream(ctx, &SessionReduce_ServiceDesc.Streams[0], "/sessionreduce.v1.SessionReduce/SessionReduceFn", opts...) + stream, err := c.cc.NewStream(ctx, &SessionReduce_ServiceDesc.Streams[0], SessionReduce_SessionReduceFn_FullMethodName, opts...) if err != nil { return nil, err } @@ -70,7 +75,7 @@ func (x *sessionReduceSessionReduceFnClient) Recv() (*SessionReduceResponse, err func (c *sessionReduceClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/sessionreduce.v1.SessionReduce/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, SessionReduce_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -147,7 +152,7 @@ func _SessionReduce_IsReady_Handler(srv interface{}, ctx context.Context, dec fu } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/sessionreduce.v1.SessionReduce/IsReady", + FullMethod: SessionReduce_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SessionReduceServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/sideinput/v1/sideinput.pb.go b/pkg/apis/proto/sideinput/v1/sideinput.pb.go index a117b3a1..26e358f3 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 diff --git a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go index 2562314d..d5dcf33d 100644 --- a/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go +++ b/pkg/apis/proto/sideinput/v1/sideinput_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/sideinput/v1/sideinput.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + SideInput_RetrieveSideInput_FullMethodName = "/sideinput.v1.SideInput/RetrieveSideInput" + SideInput_IsReady_FullMethodName = "/sideinput.v1.SideInput/IsReady" +) + // SideInputClient is the client API for SideInput service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -39,7 +44,7 @@ func NewSideInputClient(cc grpc.ClientConnInterface) SideInputClient { func (c *sideInputClient) RetrieveSideInput(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*SideInputResponse, error) { out := new(SideInputResponse) - err := c.cc.Invoke(ctx, "/sideinput.v1.SideInput/RetrieveSideInput", in, out, opts...) + err := c.cc.Invoke(ctx, SideInput_RetrieveSideInput_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -48,7 +53,7 @@ func (c *sideInputClient) RetrieveSideInput(ctx context.Context, in *emptypb.Emp func (c *sideInputClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/sideinput.v1.SideInput/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, SideInput_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -99,7 +104,7 @@ func _SideInput_RetrieveSideInput_Handler(srv interface{}, ctx context.Context, } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/sideinput.v1.SideInput/RetrieveSideInput", + FullMethod: SideInput_RetrieveSideInput_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SideInputServer).RetrieveSideInput(ctx, req.(*emptypb.Empty)) @@ -117,7 +122,7 @@ func _SideInput_IsReady_Handler(srv interface{}, ctx context.Context, dec func(i } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/sideinput.v1.SideInput/IsReady", + FullMethod: SideInput_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SideInputServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/sink/v1/sink.pb.go b/pkg/apis/proto/sink/v1/sink.pb.go index 3f0dae1f..2701f64d 100644 --- a/pkg/apis/proto/sink/v1/sink.pb.go +++ b/pkg/apis/proto/sink/v1/sink.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/sink/v1/sink.proto package v1 diff --git a/pkg/apis/proto/sink/v1/sink_grpc.pb.go b/pkg/apis/proto/sink/v1/sink_grpc.pb.go index bc50da8e..8db48e93 100644 --- a/pkg/apis/proto/sink/v1/sink_grpc.pb.go +++ b/pkg/apis/proto/sink/v1/sink_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/sink/v1/sink.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + Sink_SinkFn_FullMethodName = "/sink.v1.Sink/SinkFn" + Sink_IsReady_FullMethodName = "/sink.v1.Sink/IsReady" +) + // SinkClient is the client API for Sink service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -38,7 +43,7 @@ func NewSinkClient(cc grpc.ClientConnInterface) SinkClient { } func (c *sinkClient) SinkFn(ctx context.Context, opts ...grpc.CallOption) (Sink_SinkFnClient, error) { - stream, err := c.cc.NewStream(ctx, &Sink_ServiceDesc.Streams[0], "/sink.v1.Sink/SinkFn", opts...) + stream, err := c.cc.NewStream(ctx, &Sink_ServiceDesc.Streams[0], Sink_SinkFn_FullMethodName, opts...) if err != nil { return nil, err } @@ -73,7 +78,7 @@ func (x *sinkSinkFnClient) CloseAndRecv() (*SinkResponse, error) { func (c *sinkClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/sink.v1.Sink/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, Sink_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -150,7 +155,7 @@ func _Sink_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interf } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/sink.v1.Sink/IsReady", + FullMethod: Sink_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SinkServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index 7927e25e..a9d94119 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/source/v1/source.proto package v1 diff --git a/pkg/apis/proto/source/v1/source_grpc.pb.go b/pkg/apis/proto/source/v1/source_grpc.pb.go index ebb38e85..ab169830 100644 --- a/pkg/apis/proto/source/v1/source_grpc.pb.go +++ b/pkg/apis/proto/source/v1/source_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/source/v1/source.proto package v1 @@ -19,6 +19,14 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + Source_ReadFn_FullMethodName = "/source.v1.Source/ReadFn" + Source_AckFn_FullMethodName = "/source.v1.Source/AckFn" + Source_PendingFn_FullMethodName = "/source.v1.Source/PendingFn" + Source_PartitionsFn_FullMethodName = "/source.v1.Source/PartitionsFn" + Source_IsReady_FullMethodName = "/source.v1.Source/IsReady" +) + // SourceClient is the client API for Source service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -50,7 +58,7 @@ func NewSourceClient(cc grpc.ClientConnInterface) SourceClient { } func (c *sourceClient) ReadFn(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (Source_ReadFnClient, error) { - stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], "/source.v1.Source/ReadFn", opts...) + stream, err := c.cc.NewStream(ctx, &Source_ServiceDesc.Streams[0], Source_ReadFn_FullMethodName, opts...) if err != nil { return nil, err } @@ -83,7 +91,7 @@ func (x *sourceReadFnClient) Recv() (*ReadResponse, error) { func (c *sourceClient) AckFn(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*AckResponse, error) { out := new(AckResponse) - err := c.cc.Invoke(ctx, "/source.v1.Source/AckFn", in, out, opts...) + err := c.cc.Invoke(ctx, Source_AckFn_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -92,7 +100,7 @@ func (c *sourceClient) AckFn(ctx context.Context, in *AckRequest, opts ...grpc.C func (c *sourceClient) PendingFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PendingResponse, error) { out := new(PendingResponse) - err := c.cc.Invoke(ctx, "/source.v1.Source/PendingFn", in, out, opts...) + err := c.cc.Invoke(ctx, Source_PendingFn_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -101,7 +109,7 @@ func (c *sourceClient) PendingFn(ctx context.Context, in *emptypb.Empty, opts .. func (c *sourceClient) PartitionsFn(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*PartitionsResponse, error) { out := new(PartitionsResponse) - err := c.cc.Invoke(ctx, "/source.v1.Source/PartitionsFn", in, out, opts...) + err := c.cc.Invoke(ctx, Source_PartitionsFn_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -110,7 +118,7 @@ func (c *sourceClient) PartitionsFn(ctx context.Context, in *emptypb.Empty, opts func (c *sourceClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/source.v1.Source/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, Source_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -203,7 +211,7 @@ func _Source_AckFn_Handler(srv interface{}, ctx context.Context, dec func(interf } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/source.v1.Source/AckFn", + FullMethod: Source_AckFn_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SourceServer).AckFn(ctx, req.(*AckRequest)) @@ -221,7 +229,7 @@ func _Source_PendingFn_Handler(srv interface{}, ctx context.Context, dec func(in } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/source.v1.Source/PendingFn", + FullMethod: Source_PendingFn_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SourceServer).PendingFn(ctx, req.(*emptypb.Empty)) @@ -239,7 +247,7 @@ func _Source_PartitionsFn_Handler(srv interface{}, ctx context.Context, dec func } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/source.v1.Source/PartitionsFn", + FullMethod: Source_PartitionsFn_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SourceServer).PartitionsFn(ctx, req.(*emptypb.Empty)) @@ -257,7 +265,7 @@ func _Source_IsReady_Handler(srv interface{}, ctx context.Context, dec func(inte } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/source.v1.Source/IsReady", + FullMethod: Source_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SourceServer).IsReady(ctx, req.(*emptypb.Empty)) diff --git a/pkg/apis/proto/sourcetransform/v1/transform.pb.go b/pkg/apis/proto/sourcetransform/v1/transform.pb.go index b56efc20..927565e4 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.33.0 -// protoc v4.25.1 +// protoc-gen-go v1.31.0 +// protoc v5.28.0 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 @@ -321,25 +321,25 @@ var file_pkg_apis_proto_sourcetransform_v1_transform_proto_rawDesc = []byte{ 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x22, 0x25, 0x0a, 0x0d, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xcb, 0x01, 0x0a, 0x0f, 0x53, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x70, 0x0a, + 0x28, 0x08, 0x52, 0x05, 0x72, 0x65, 0x61, 0x64, 0x79, 0x32, 0xcf, 0x01, 0x0a, 0x0f, 0x53, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x12, 0x74, 0x0a, 0x11, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x46, 0x6e, 0x12, 0x2c, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x72, - 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x46, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, - 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, 0x69, 0x74, 0x68, 0x75, - 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, - 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, - 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x46, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x23, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x74, + 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x65, 0x72, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, + 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x43, 0x5a, 0x41, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x70, 0x72, + 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, 0x6f, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x6f, 0x72, 0x6d, 0x2f, 0x76, 0x31, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/apis/proto/sourcetransform/v1/transform.proto b/pkg/apis/proto/sourcetransform/v1/transform.proto index 8885db08..102386be 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform.proto +++ b/pkg/apis/proto/sourcetransform/v1/transform.proto @@ -11,7 +11,7 @@ service SourceTransform { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - rpc SourceTransformFn(SourceTransformRequest) returns (SourceTransformResponse); + rpc SourceTransformFn(stream SourceTransformRequest) returns (stream SourceTransformResponse); // IsReady is the heartbeat endpoint for gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); diff --git a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go index a3f4c206..c4339f67 100644 --- a/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go +++ b/pkg/apis/proto/sourcetransform/v1/transform_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.2.0 -// - protoc v4.25.1 +// - protoc-gen-go-grpc v1.3.0 +// - protoc v5.28.0 // source: pkg/apis/proto/sourcetransform/v1/transform.proto package v1 @@ -19,6 +19,11 @@ import ( // Requires gRPC-Go v1.32.0 or later. const _ = grpc.SupportPackageIsVersion7 +const ( + SourceTransform_SourceTransformFn_FullMethodName = "/sourcetransformer.v1.SourceTransform/SourceTransformFn" + SourceTransform_IsReady_FullMethodName = "/sourcetransformer.v1.SourceTransform/IsReady" +) + // SourceTransformClient is the client API for SourceTransform service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. @@ -26,7 +31,7 @@ type SourceTransformClient interface { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - SourceTransformFn(ctx context.Context, in *SourceTransformRequest, opts ...grpc.CallOption) (*SourceTransformResponse, error) + SourceTransformFn(ctx context.Context, opts ...grpc.CallOption) (SourceTransform_SourceTransformFnClient, error) // IsReady is the heartbeat endpoint for gRPC. IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) } @@ -39,18 +44,40 @@ func NewSourceTransformClient(cc grpc.ClientConnInterface) SourceTransformClient return &sourceTransformClient{cc} } -func (c *sourceTransformClient) SourceTransformFn(ctx context.Context, in *SourceTransformRequest, opts ...grpc.CallOption) (*SourceTransformResponse, error) { - out := new(SourceTransformResponse) - err := c.cc.Invoke(ctx, "/sourcetransformer.v1.SourceTransform/SourceTransformFn", in, out, opts...) +func (c *sourceTransformClient) SourceTransformFn(ctx context.Context, opts ...grpc.CallOption) (SourceTransform_SourceTransformFnClient, error) { + stream, err := c.cc.NewStream(ctx, &SourceTransform_ServiceDesc.Streams[0], SourceTransform_SourceTransformFn_FullMethodName, opts...) if err != nil { return nil, err } - return out, nil + x := &sourceTransformSourceTransformFnClient{stream} + return x, nil +} + +type SourceTransform_SourceTransformFnClient interface { + Send(*SourceTransformRequest) error + Recv() (*SourceTransformResponse, error) + grpc.ClientStream +} + +type sourceTransformSourceTransformFnClient struct { + grpc.ClientStream +} + +func (x *sourceTransformSourceTransformFnClient) Send(m *SourceTransformRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *sourceTransformSourceTransformFnClient) Recv() (*SourceTransformResponse, error) { + m := new(SourceTransformResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } func (c *sourceTransformClient) IsReady(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*ReadyResponse, error) { out := new(ReadyResponse) - err := c.cc.Invoke(ctx, "/sourcetransformer.v1.SourceTransform/IsReady", in, out, opts...) + err := c.cc.Invoke(ctx, SourceTransform_IsReady_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -64,7 +91,7 @@ type SourceTransformServer interface { // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. - SourceTransformFn(context.Context, *SourceTransformRequest) (*SourceTransformResponse, error) + SourceTransformFn(SourceTransform_SourceTransformFnServer) error // IsReady is the heartbeat endpoint for gRPC. IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) mustEmbedUnimplementedSourceTransformServer() @@ -74,8 +101,8 @@ type SourceTransformServer interface { type UnimplementedSourceTransformServer struct { } -func (UnimplementedSourceTransformServer) SourceTransformFn(context.Context, *SourceTransformRequest) (*SourceTransformResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method SourceTransformFn not implemented") +func (UnimplementedSourceTransformServer) SourceTransformFn(SourceTransform_SourceTransformFnServer) error { + return status.Errorf(codes.Unimplemented, "method SourceTransformFn not implemented") } func (UnimplementedSourceTransformServer) IsReady(context.Context, *emptypb.Empty) (*ReadyResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method IsReady not implemented") @@ -93,22 +120,30 @@ func RegisterSourceTransformServer(s grpc.ServiceRegistrar, srv SourceTransformS s.RegisterService(&SourceTransform_ServiceDesc, srv) } -func _SourceTransform_SourceTransformFn_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(SourceTransformRequest) - if err := dec(in); err != nil { +func _SourceTransform_SourceTransformFn_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(SourceTransformServer).SourceTransformFn(&sourceTransformSourceTransformFnServer{stream}) +} + +type SourceTransform_SourceTransformFnServer interface { + Send(*SourceTransformResponse) error + Recv() (*SourceTransformRequest, error) + grpc.ServerStream +} + +type sourceTransformSourceTransformFnServer struct { + grpc.ServerStream +} + +func (x *sourceTransformSourceTransformFnServer) Send(m *SourceTransformResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *sourceTransformSourceTransformFnServer) Recv() (*SourceTransformRequest, error) { + m := new(SourceTransformRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { return nil, err } - if interceptor == nil { - return srv.(SourceTransformServer).SourceTransformFn(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/sourcetransformer.v1.SourceTransform/SourceTransformFn", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(SourceTransformServer).SourceTransformFn(ctx, req.(*SourceTransformRequest)) - } - return interceptor(ctx, in, info, handler) + return m, nil } func _SourceTransform_IsReady_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { @@ -121,7 +156,7 @@ func _SourceTransform_IsReady_Handler(srv interface{}, ctx context.Context, dec } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/sourcetransformer.v1.SourceTransform/IsReady", + FullMethod: SourceTransform_IsReady_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(SourceTransformServer).IsReady(ctx, req.(*emptypb.Empty)) @@ -136,15 +171,18 @@ var SourceTransform_ServiceDesc = grpc.ServiceDesc{ ServiceName: "sourcetransformer.v1.SourceTransform", HandlerType: (*SourceTransformServer)(nil), Methods: []grpc.MethodDesc{ - { - MethodName: "SourceTransformFn", - Handler: _SourceTransform_SourceTransformFn_Handler, - }, { MethodName: "IsReady", Handler: _SourceTransform_IsReady_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SourceTransformFn", + Handler: _SourceTransform_SourceTransformFn_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "pkg/apis/proto/sourcetransform/v1/transform.proto", } diff --git a/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go b/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go index fbc66555..8b347d06 100644 --- a/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go +++ b/pkg/apis/proto/sourcetransform/v1/transformmock/transformmock.go @@ -58,21 +58,21 @@ func (mr *MockSourceTransformClientMockRecorder) IsReady(arg0, arg1 interface{}, } // SourceTransformFn mocks base method. -func (m *MockSourceTransformClient) SourceTransformFn(arg0 context.Context, arg1 *v1.SourceTransformRequest, arg2 ...grpc.CallOption) (*v1.SourceTransformResponse, error) { +func (m *MockSourceTransformClient) SourceTransformFn(arg0 context.Context, arg1 ...grpc.CallOption) (v1.SourceTransform_SourceTransformFnClient, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0, arg1} - for _, a := range arg2 { + varargs := []interface{}{arg0} + for _, a := range arg1 { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "SourceTransformFn", varargs...) - ret0, _ := ret[0].(*v1.SourceTransformResponse) + ret0, _ := ret[0].(v1.SourceTransform_SourceTransformFnClient) ret1, _ := ret[1].(error) return ret0, ret1 } // SourceTransformFn indicates an expected call of SourceTransformFn. -func (mr *MockSourceTransformClientMockRecorder) SourceTransformFn(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { +func (mr *MockSourceTransformClientMockRecorder) SourceTransformFn(arg0 interface{}, arg1 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0, arg1}, arg2...) + varargs := append([]interface{}{arg0}, arg1...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SourceTransformFn", reflect.TypeOf((*MockSourceTransformClient)(nil).SourceTransformFn), varargs...) } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 1651f825..2abdfbfa 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -2,6 +2,11 @@ package sourcetransformer import ( "context" + "errors" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "io" "log" "runtime/debug" @@ -34,7 +39,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, // SourceTransformFn applies a function to each request element. // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. -func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformRequest) (*v1.SourceTransformResponse, error) { +func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFnServer) error { // handle panic defer func() { if r := recover(); r != nil { @@ -42,19 +47,63 @@ func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformR fs.shutdownCh <- struct{}{} } }() - var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) - messageTs := fs.Transformer.Transform(ctx, d.GetKeys(), hd) - var results []*v1.SourceTransformResponse_Result - for _, m := range messageTs.Items() { - results = append(results, &v1.SourceTransformResponse_Result{ - EventTime: timestamppb.New(m.EventTime()), - Keys: m.Keys(), - Value: m.Value(), - Tags: m.Tags(), + + ctx := stream.Context() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + grp, grpCtx := errgroup.WithContext(ctx) + + senderCh := make(chan *v1.SourceTransformResponse, 500) // TODO: identify the right buffer size + // goroutine to send the response to the stream + grp.Go(func() error { + for { + select { + case <-grpCtx.Done(): + return grpCtx.Err() + default: + } + if err := stream.Send(<-senderCh); err != nil { + cancel() + return err + } + } + }) + + for { + d, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + return err + } + grp.Go(func() error { + var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) + messageTs := fs.Transformer.Transform(grpCtx, d.GetKeys(), hd) + var results []*v1.SourceTransformResponse_Result + for _, m := range messageTs.Items() { + results = append(results, &v1.SourceTransformResponse_Result{ + EventTime: timestamppb.New(m.EventTime()), + Keys: m.Keys(), + Value: m.Value(), + Tags: m.Tags(), + }) + } + resp := &v1.SourceTransformResponse{ + Results: results, + } + select { + case senderCh <- resp: + case <-grpCtx.Done(): + return grpCtx.Err() + } + return nil }) } - responseList := &v1.SourceTransformResponse{ - Results: results, + + if err := grp.Wait(); err != nil { + statusErr := status.Errorf(codes.Internal, err.Error()) + return statusErr } - return responseList, nil + return nil } diff --git a/pkg/sourcetransformer/service_test.go b/pkg/sourcetransformer/service_test.go index d57d937e..77cf4f93 100644 --- a/pkg/sourcetransformer/service_test.go +++ b/pkg/sourcetransformer/service_test.go @@ -2,19 +2,61 @@ package sourcetransformer import ( "context" - "reflect" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" + "net" "testing" "time" + proto "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" "google.golang.org/protobuf/types/known/timestamppb" - - stpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" ) +func newServer(t *testing.T, register func(server *grpc.Server)) *grpc.ClientConn { + lis := bufconn.Listen(1024 * 1024) + t.Cleanup(func() { + lis.Close() + }) + + server := grpc.NewServer() + t.Cleanup(func() { + server.Stop() + }) + + register(server) + + go func() { + if err := server.Serve(lis); err != nil { + t.Fatal("Starting gRPC server:", err) + } + }() + + dialer := func(context.Context, string) (net.Conn, error) { + return lis.Dial() + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + t.Cleanup(func() { + cancel() + }) + + conn, err := grpc.DialContext(ctx, "", grpc.WithContextDialer(dialer), grpc.WithTransportCredentials(insecure.NewCredentials())) + t.Cleanup(func() { + conn.Close() + }) + if err != nil { + t.Fatalf("Creating new gRPC client connection: %v", err) + } + + return conn +} + func TestService_sourceTransformFn(t *testing.T) { type args struct { ctx context.Context - d *stpb.SourceTransformRequest + d *proto.SourceTransformRequest } testTime := time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local) @@ -22,8 +64,7 @@ func TestService_sourceTransformFn(t *testing.T) { name string handler SourceTransformer args args - want *stpb.SourceTransformResponse - wantErr bool + want *proto.SourceTransformResponse }{ { name: "sourceTransform_fn_forward_msg", @@ -33,15 +74,15 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ + d: &proto.SourceTransformRequest{ Keys: []string{"client"}, Value: []byte(`test`), EventTime: timestamppb.New(time.Time{}), Watermark: timestamppb.New(time.Time{}), }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Keys: []string{"client_test"}, @@ -49,7 +90,6 @@ func TestService_sourceTransformFn(t *testing.T) { }, }, }, - wantErr: false, }, { name: "sourceTransform_fn_forward_msg_forward_to_all", @@ -59,22 +99,21 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ + d: &proto.SourceTransformRequest{ Keys: []string{"client"}, Value: []byte(`test`), EventTime: timestamppb.New(time.Time{}), Watermark: timestamppb.New(time.Time{}), }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Value: []byte(`test`), }, }, }, - wantErr: false, }, { name: "sourceTransform_fn_forward_msg_drop_msg", @@ -83,42 +122,46 @@ func TestService_sourceTransformFn(t *testing.T) { }), args: args{ ctx: context.Background(), - d: &stpb.SourceTransformRequest{ + d: &proto.SourceTransformRequest{ Keys: []string{"client"}, Value: []byte(`test`), EventTime: timestamppb.New(time.Time{}), Watermark: timestamppb.New(time.Time{}), }, }, - want: &stpb.SourceTransformResponse{ - Results: []*stpb.SourceTransformResponse_Result{ + want: &proto.SourceTransformResponse{ + Results: []*proto.SourceTransformResponse_Result{ { EventTime: timestamppb.New(testTime), Tags: []string{DROP}, - Value: []byte{}, + Value: nil, }, }, }, - wantErr: false, }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - fs := &Service{ + svc := &Service{ Transformer: tt.handler, } - // here's a trick for testing: - // because we are not using gRPC, we directly set a new incoming ctx - // instead of the regular outgoing context in the real gRPC connection. - ctx := context.Background() - got, err := fs.SourceTransformFn(ctx, tt.args.d) - if (err != nil) != tt.wantErr { - t.Errorf("SourceTransformFn() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("SourceTransformFn() got = %v, want %v", got, tt.want) - } + + conn := newServer(t, func(server *grpc.Server) { + proto.RegisterSourceTransformServer(server, svc) + }) + + client := proto.NewSourceTransformClient(conn) + stream, err := client.SourceTransformFn(context.Background()) + assert.NoError(t, err, "Creating stream") + + err = stream.Send(tt.args.d) + assert.NoError(t, err, "Sending message over the stream") + + got, err := stream.Recv() + assert.NoError(t, err, "Receiving message from the stream") + + assert.Equal(t, got.Results, tt.want.Results) }) } }