Skip to content

Commit

Permalink
pass one offset at a time
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Sep 6, 2024
1 parent 5ae7944 commit b9d07d7
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 80 deletions.
133 changes: 65 additions & 68 deletions pkg/apis/proto/source/v1/source.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions pkg/apis/proto/source/v1/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,8 @@ message ReadResponse {
*/
message AckRequest {
message Request {
// Required field holding a list of offsets to be acknowledged.
// The offsets must be strictly corresponding to the previously read batch,
// meaning the offsets must be in the same order as the datum responses in the ReadResponse.
// By enforcing ordering, we can save deserialization effort on the server side, assuming the server keeps a local copy of the raw/un-serialized offsets.
repeated Offset offsets = 1;
// Required field holding the offset to be acked
Offset offset = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down
8 changes: 3 additions & 5 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,10 @@ func (fs *Service) AckFn(stream sourcepb.Source_AckFnServer) error {
return err
}

for _, offset := range req.Request.GetOffsets() {
request := ackRequest{
offset: NewOffset(offset.GetOffset(), offset.GetPartitionId()),
}
fs.Source.Ack(ctx, &request)
request := ackRequest{
offset: NewOffset(req.Request.Offset.GetOffset(), req.Request.Offset.GetPartitionId()),
}
fs.Source.Ack(ctx, &request)
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcer/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (a *AckFnServerTest) Recv() (*sourcepb.AckRequest, error) {
a.index++
return &sourcepb.AckRequest{
Request: &sourcepb.AckRequest_Request{
Offsets: []*sourcepb.Offset{offset},
Offset: offset,
},
}, nil
}
Expand All @@ -146,7 +146,7 @@ func NewAckFnServerTest(
}
}

func (a *AckFnServerTest) SendAndClose(response *sourcepb.AckResponse) error {
func (a *AckFnServerTest) SendAndClose(*sourcepb.AckResponse) error {
return nil
}

Expand Down

0 comments on commit b9d07d7

Please sign in to comment.