From b32d0e7c4d5c41d3575fa075f8269f5e4e2da3cf Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Tue, 8 Oct 2024 13:25:57 +0530 Subject: [PATCH] address review comments Signed-off-by: Yashash H L --- pkg/mapper/examples/even_odd/go.mod | 1 + pkg/mapper/examples/even_odd/go.sum | 2 ++ pkg/mapper/examples/flatmap/go.mod | 1 + pkg/mapper/examples/flatmap/go.sum | 2 ++ pkg/mapper/examples/forward_message/go.mod | 1 + pkg/mapper/examples/forward_message/go.sum | 2 ++ pkg/mapper/examples/retry/go.mod | 1 + pkg/mapper/examples/retry/go.sum | 2 ++ pkg/mapper/examples/tickgen/go.mod | 1 + pkg/mapper/examples/tickgen/go.sum | 2 ++ pkg/mapper/service.go | 30 +++++++++---------- pkg/mapper/service_test.go | 6 ++-- .../examples/map_sideinput/udf/go.mod | 1 + .../examples/map_sideinput/udf/go.sum | 2 ++ .../examples/simple_sideinput/udf/go.mod | 1 + .../examples/simple_sideinput/udf/go.sum | 2 ++ pkg/sourcetransformer/service.go | 30 +++++++++---------- 17 files changed, 54 insertions(+), 33 deletions(-) diff --git a/pkg/mapper/examples/even_odd/go.mod b/pkg/mapper/examples/even_odd/go.mod index 3c53ab1b..5d4b69a8 100644 --- a/pkg/mapper/examples/even_odd/go.mod +++ b/pkg/mapper/examples/even_odd/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/even_odd/go.sum b/pkg/mapper/examples/even_odd/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/even_odd/go.sum +++ b/pkg/mapper/examples/even_odd/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/flatmap/go.mod b/pkg/mapper/examples/flatmap/go.mod index 24b28bb1..e4cdd7b8 100644 --- a/pkg/mapper/examples/flatmap/go.mod +++ b/pkg/mapper/examples/flatmap/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/flatmap/go.sum b/pkg/mapper/examples/flatmap/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/flatmap/go.sum +++ b/pkg/mapper/examples/flatmap/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/forward_message/go.mod b/pkg/mapper/examples/forward_message/go.mod index 2b0abb6d..c1dc3be7 100644 --- a/pkg/mapper/examples/forward_message/go.mod +++ b/pkg/mapper/examples/forward_message/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/forward_message/go.sum b/pkg/mapper/examples/forward_message/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/forward_message/go.sum +++ b/pkg/mapper/examples/forward_message/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/retry/go.mod b/pkg/mapper/examples/retry/go.mod index 9612714e..41a9d813 100644 --- a/pkg/mapper/examples/retry/go.mod +++ b/pkg/mapper/examples/retry/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/retry/go.sum b/pkg/mapper/examples/retry/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/retry/go.sum +++ b/pkg/mapper/examples/retry/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/examples/tickgen/go.mod b/pkg/mapper/examples/tickgen/go.mod index 0be651a1..988c6b88 100644 --- a/pkg/mapper/examples/tickgen/go.mod +++ b/pkg/mapper/examples/tickgen/go.mod @@ -8,6 +8,7 @@ require github.com/numaproj/numaflow-go v0.8.1 require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/mapper/examples/tickgen/go.sum b/pkg/mapper/examples/tickgen/go.sum index 09e06a2c..36997a49 100644 --- a/pkg/mapper/examples/tickgen/go.sum +++ b/pkg/mapper/examples/tickgen/go.sum @@ -8,6 +8,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 87c32534..e9a537de 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -76,22 +76,22 @@ outer: case <-grpCtx.Done(): break outer default: - req, err := stream.Recv() - if err == io.EOF { - break outer - } - if err != nil { - log.Printf("failed to receive request: %v", err) - readErr = err - // read loop is not part of the error group, so we need to cancel the context - // to signal the other goroutines to stop processing. - cancel() - break outer - } - g.Go(func() error { - return fs.handleRequest(grpCtx, req, responseCh) - }) } + req, err := stream.Recv() + if err == io.EOF { + break outer + } + if err != nil { + log.Printf("Failed to receive request: %v", err) + readErr = err + // read loop is not part of the error group, so we need to cancel the context + // to signal the other goroutines to stop processing. + cancel() + break outer + } + g.Go(func() error { + return fs.handleRequest(grpCtx, req, responseCh) + }) } // wait for all goroutines to finish diff --git a/pkg/mapper/service_test.go b/pkg/mapper/service_test.go index a6cda1f5..7ec939d1 100644 --- a/pkg/mapper/service_test.go +++ b/pkg/mapper/service_test.go @@ -80,7 +80,7 @@ func TestService_mapFn(t *testing.T) { want *proto.MapResponse }{ { - name: "sourceTransform_fn_forward_msg", + name: "map_fn_forward_msg", handler: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { msg := datum.Value() return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) @@ -106,7 +106,7 @@ func TestService_mapFn(t *testing.T) { }, }, { - name: "sourceTransform_fn_forward_msg_forward_to_all", + name: "map_fn_forward_msg_forward_to_all", handler: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { msg := datum.Value() return MessagesBuilder().Append(NewMessage(msg)) @@ -131,7 +131,7 @@ func TestService_mapFn(t *testing.T) { }, }, { - name: "sourceTransform_fn_forward_msg_drop_msg", + name: "map_fn_forward_msg_drop_msg", handler: MapperFunc(func(ctx context.Context, keys []string, datum Datum) Messages { return MessagesBuilder().Append(MessageToDrop()) }), diff --git a/pkg/sideinput/examples/map_sideinput/udf/go.mod b/pkg/sideinput/examples/map_sideinput/udf/go.mod index d783a749..78ff598e 100644 --- a/pkg/sideinput/examples/map_sideinput/udf/go.mod +++ b/pkg/sideinput/examples/map_sideinput/udf/go.mod @@ -11,6 +11,7 @@ require ( require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/sideinput/examples/map_sideinput/udf/go.sum b/pkg/sideinput/examples/map_sideinput/udf/go.sum index 8ba4f4c5..249ac9a2 100644 --- a/pkg/sideinput/examples/map_sideinput/udf/go.sum +++ b/pkg/sideinput/examples/map_sideinput/udf/go.sum @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/pkg/sideinput/examples/simple_sideinput/udf/go.mod b/pkg/sideinput/examples/simple_sideinput/udf/go.mod index bbf5f7b3..8eef3bde 100644 --- a/pkg/sideinput/examples/simple_sideinput/udf/go.mod +++ b/pkg/sideinput/examples/simple_sideinput/udf/go.mod @@ -11,6 +11,7 @@ require ( require ( golang.org/x/net v0.29.0 // indirect + golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect diff --git a/pkg/sideinput/examples/simple_sideinput/udf/go.sum b/pkg/sideinput/examples/simple_sideinput/udf/go.sum index 8ba4f4c5..249ac9a2 100644 --- a/pkg/sideinput/examples/simple_sideinput/udf/go.sum +++ b/pkg/sideinput/examples/simple_sideinput/udf/go.sum @@ -10,6 +10,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 6dd2577d..c7212320 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -80,22 +80,22 @@ outer: case <-grpCtx.Done(): // Stop reading new messages when we are shutting down break outer default: - d, err := stream.Recv() - if err == io.EOF { - break outer - } - if err != nil { - log.Printf("failed to receive request: %v", err) - readErr = err - // read loop is not part of the error group, so we need to cancel the context - // to signal the other goroutines to stop processing. - cancel() - break outer - } - grp.Go(func() (err error) { - return fs.handleRequest(grpCtx, d, senderCh) - }) } + d, err := stream.Recv() + if err == io.EOF { + break outer + } + if err != nil { + log.Printf("Failed to receive request: %v", err) + readErr = err + // read loop is not part of the error group, so we need to cancel the context + // to signal the other goroutines to stop processing. + cancel() + break outer + } + grp.Go(func() (err error) { + return fs.handleRequest(grpCtx, d, senderCh) + }) } // wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately.