From 8f4062fa35045a4b7e8409bdf4e4b25f900d9a29 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 30 Oct 2024 10:28:00 +0530 Subject: [PATCH] chore: update side input example (#164) Signed-off-by: Yashash H L --- .../impl/simple_source_sideinput.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go b/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go index a1c8b302..ffac5f17 100644 --- a/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go +++ b/pkg/sideinput/examples/simple_source_with_sideinput/impl/simple_source_sideinput.go @@ -75,7 +75,9 @@ func (s *SimpleSource) Read(_ context.Context, readRequest sourcesdk.ReadRequest } func (s *SimpleSource) Ack(_ context.Context, request sourcesdk.AckRequest) { - delete(s.toAckSet, deserializeOffset(request.Offset().Value())) + for _, offset := range request.Offsets() { + delete(s.toAckSet, deserializeOffset(offset.Value())) + } } func (s *SimpleSource) Partitions(_ context.Context) []int32 {