diff --git a/pkg/apis/proto/source/v1/source.pb.go b/pkg/apis/proto/source/v1/source.pb.go index 8e68737e..d2ac0624 100644 --- a/pkg/apis/proto/source/v1/source.pb.go +++ b/pkg/apis/proto/source/v1/source.pb.go @@ -1127,32 +1127,32 @@ var file_pkg_apis_proto_source_v1_source_proto_rawDesc = []byte{ 0x66, 0x73, 0x65, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x6f, 0x66, 0x66, 0x73, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, - 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xc6, 0x02, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x32, 0xc8, 0x02, 0x0a, 0x06, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x3d, 0x0a, 0x06, 0x52, 0x65, 0x61, 0x64, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, - 0x38, 0x0a, 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, 0x12, 0x15, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x3a, 0x0a, 0x05, 0x41, 0x63, 0x6b, 0x46, 0x6e, 0x12, 0x15, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x41, 0x63, 0x6b, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x12, 0x3f, 0x0a, 0x09, 0x50, 0x65, 0x6e, - 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1a, - 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, 0x64, 0x69, - 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0c, 0x50, 0x61, - 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, - 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, - 0x74, 0x79, 0x1a, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, - 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, 0x2e, 0x67, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x3f, 0x0a, 0x09, 0x50, + 0x65, 0x6e, 0x64, 0x69, 0x6e, 0x67, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, + 0x1a, 0x1a, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x65, 0x6e, + 0x64, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x45, 0x0a, 0x0c, + 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x46, 0x6e, 0x12, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, - 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, - 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x3a, - 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, 0x75, 0x6d, - 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, 0x2d, 0x67, - 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x1d, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x76, 0x31, + 0x2e, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x3b, 0x0a, 0x07, 0x49, 0x73, 0x52, 0x65, 0x61, 0x64, 0x79, 0x12, 0x16, + 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x18, 0x2e, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, + 0x76, 0x31, 0x2e, 0x52, 0x65, 0x61, 0x64, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x3a, 0x5a, 0x38, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6e, + 0x75, 0x6d, 0x61, 0x70, 0x72, 0x6f, 0x6a, 0x2f, 0x6e, 0x75, 0x6d, 0x61, 0x66, 0x6c, 0x6f, 0x77, + 0x2d, 0x67, 0x6f, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x2f, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/apis/proto/source/v1/source.proto b/pkg/apis/proto/source/v1/source.proto index 91e86ead..54a70230 100644 --- a/pkg/apis/proto/source/v1/source.proto +++ b/pkg/apis/proto/source/v1/source.proto @@ -19,7 +19,7 @@ service Source { // The caller (numa) expects the AckFn to be successful, and it does not expect any errors. // If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request, // then it is best to crash because there are no other retry mechanisms possible. - rpc AckFn(stream AckRequest) returns (AckResponse); + rpc AckFn(stream AckRequest) returns (stream AckResponse); // PendingFn returns the number of pending records at the user defined source. rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); diff --git a/pkg/apis/proto/source/v1/source_grpc.pb.go b/pkg/apis/proto/source/v1/source_grpc.pb.go index fd9cd6bf..6348fe00 100644 --- a/pkg/apis/proto/source/v1/source_grpc.pb.go +++ b/pkg/apis/proto/source/v1/source_grpc.pb.go @@ -102,7 +102,7 @@ func (c *sourceClient) AckFn(ctx context.Context, opts ...grpc.CallOption) (Sour type Source_AckFnClient interface { Send(*AckRequest) error - CloseAndRecv() (*AckResponse, error) + Recv() (*AckResponse, error) grpc.ClientStream } @@ -114,10 +114,7 @@ func (x *sourceAckFnClient) Send(m *AckRequest) error { return x.ClientStream.SendMsg(m) } -func (x *sourceAckFnClient) CloseAndRecv() (*AckResponse, error) { - if err := x.ClientStream.CloseSend(); err != nil { - return nil, err - } +func (x *sourceAckFnClient) Recv() (*AckResponse, error) { m := new(AckResponse) if err := x.ClientStream.RecvMsg(m); err != nil { return nil, err @@ -242,7 +239,7 @@ func _Source_AckFn_Handler(srv interface{}, stream grpc.ServerStream) error { } type Source_AckFnServer interface { - SendAndClose(*AckResponse) error + Send(*AckResponse) error Recv() (*AckRequest, error) grpc.ServerStream } @@ -251,7 +248,7 @@ type sourceAckFnServer struct { grpc.ServerStream } -func (x *sourceAckFnServer) SendAndClose(m *AckResponse) error { +func (x *sourceAckFnServer) Send(m *AckResponse) error { return x.ServerStream.SendMsg(m) } @@ -347,6 +344,7 @@ var Source_ServiceDesc = grpc.ServiceDesc{ { StreamName: "AckFn", Handler: _Source_AckFn_Handler, + ServerStreams: true, ClientStreams: true, }, }, diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 6dd6a05d..72fe58e8 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -79,10 +79,11 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error { // Receive read requests from the stream req, err := stream.Recv() if err == io.EOF { + log.Printf("end of read stream") return } if err != nil { - log.Printf("error receiving from stream: %v", err) + log.Printf("error receiving from read stream: %v", err) errCh <- err return } @@ -172,12 +173,11 @@ func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error { // Receive ack requests from the stream req, err := stream.Recv() if err == io.EOF { - return stream.SendAndClose(&sourcepb.AckResponse{ - Result: &sourcepb.AckResponse_Result{}, - }) + log.Printf("end of ack stream") + return nil } if err != nil { - log.Printf("error receiving from stream: %v", err) + log.Printf("error receiving from ack stream: %v", err) return err } @@ -185,6 +185,17 @@ func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error { offset: NewOffset(req.Request.Offset.GetOffset(), req.Request.Offset.GetPartitionId()), } fs.Source.Ack(ctx, &request) + + // Send ack response + ackResponse := &sourcepb.AckResponse{ + Result: &sourcepb.AckResponse_Result{ + Success: &emptypb.Empty{}, + }, + } + if err := stream.Send(ackResponse); err != nil { + log.Printf("error sending ack response: %v", err) + return err + } } } diff --git a/pkg/sourcer/service_test.go b/pkg/sourcer/service_test.go index cee11cc8..92bbefdf 100644 --- a/pkg/sourcer/service_test.go +++ b/pkg/sourcer/service_test.go @@ -117,8 +117,9 @@ func (te *ReadFnServerErrTest) Context() context.Context { } type AckFnServerTest struct { - ctx context.Context - offsets []*sourcepb.Offset + ctx context.Context + offsets []*sourcepb.Offset + responses []*sourcepb.AckResponse grpc.ServerStream index int } @@ -136,20 +137,22 @@ func (a *AckFnServerTest) Recv() (*sourcepb.AckRequest, error) { }, nil } +func (a *AckFnServerTest) Send(response *sourcepb.AckResponse) error { + a.responses = append(a.responses, response) + return nil +} + func NewAckFnServerTest( ctx context.Context, offsets []*sourcepb.Offset, ) *AckFnServerTest { return &AckFnServerTest{ - ctx: ctx, - offsets: offsets, + ctx: ctx, + offsets: offsets, + responses: make([]*sourcepb.AckResponse, 0), } } -func (a *AckFnServerTest) SendAndClose(*sourcepb.AckResponse) error { - return nil -} - func (a *AckFnServerTest) Context() context.Context { return a.ctx } @@ -269,6 +272,18 @@ func TestService_AckFn(t *testing.T) { err := fs.AckFn(ackFnStream) assert.NoError(t, err) + + expectedResponses := []*sourcepb.AckResponse{ + { + Result: &sourcepb.AckResponse_Result{ + Success: &emptypb.Empty{}, + }, + }, + } + + if !reflect.DeepEqual(ackFnStream.responses, expectedResponses) { + t.Errorf("AckFn() responses = %v, want %v", ackFnStream.responses, expectedResponses) + } } func TestService_PendingFn(t *testing.T) {