diff --git a/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go b/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go index ffac5f17..a1c8b302 100644 --- a/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go +++ b/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go @@ -75,9 +75,7 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest } func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) { - for _, offset := range request.Offsets() { - delete(s.toAckSet, deserializeOffset(offset.Value())) - } + delete(s.toAckSet, deserializeOffset(request.Offset().Value())) } func (s *SimpleSource) Partitions(_ context.Context) []int32 {