From dfa1117a8ba4700731dac46e2fc0be7101419544 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 30 Oct 2024 10:19:47 +0530 Subject: [PATCH] update example Signed-off-by: Yashash H L --- pkg/sourcer/examples/simple_source/impl/simple_source.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/sourcer/examples/simple_source/impl/simple_source.go b/pkg/sourcer/examples/simple_source/impl/simple_source.go index 87ec9dcb..33423ccf 100644 --- a/pkg/sourcer/examples/simple_source/impl/simple_source.go +++ b/pkg/sourcer/examples/simple_source/impl/simple_source.go @@ -76,8 +76,9 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) { s.lock.Lock() defer s.lock.Unlock() - offset := deserializeOffset(request.Offset().Value()) - delete(s.toAckSet, offset) + for _, offset := range request.Offsets() { + delete(s.toAckSet, deserializeOffset(offset.Value())) + } } func (s *SimpleSource) Partitions(_ context.Context) []int32 {