diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index e9a537de..a3fbdfae 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -45,9 +45,9 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { ctx, cancel := context.WithCancel(stream.Context()) defer cancel() - // Use error group to manage goroutines, the grpCtx is cancelled when any of the + // Use error group to manage goroutines, the groupCtx is cancelled when any of the // goroutines return an error for the first time or the first time the wait returns. - g, grpCtx := errgroup.WithContext(ctx) + g, groupCtx := errgroup.WithContext(ctx) // Channel to collect responses responseCh := make(chan *mappb.MapResponse, 500) // FIXME: identify the right buffer size @@ -59,11 +59,11 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { select { case resp := <-responseCh: if err := stream.Send(resp); err != nil { - log.Printf("failed to send response: %v", err) + log.Printf("Failed to send response: %v", err) return err } - case <-grpCtx.Done(): - return grpCtx.Err() + case <-groupCtx.Done(): + return groupCtx.Err() } } }) @@ -73,7 +73,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error { outer: for { select { - case <-grpCtx.Done(): + case <-groupCtx.Done(): break outer default: } @@ -90,7 +90,7 @@ outer: break outer } g.Go(func() error { - return fs.handleRequest(grpCtx, req, responseCh) + return fs.handleRequest(groupCtx, req, responseCh) }) } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index c7212320..c98ce5dd 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -53,19 +53,20 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn ctx, cancel := context.WithCancel(stream.Context()) defer cancel() - // Use error group to manage goroutines, the grpCtx is cancelled when any of the + // Use error group to manage goroutines, the groupCtx is cancelled when any of the // goroutines return an error for the first time or the first time the wait returns. - grp, grpCtx := errgroup.WithContext(ctx) + grp, groupCtx := errgroup.WithContext(ctx) senderCh := make(chan *v1.SourceTransformResponse, 500) // FIXME: identify the right buffer size // goroutine to send the responses back to the client grp.Go(func() error { for { select { - case <-grpCtx.Done(): - return grpCtx.Err() + case <-groupCtx.Done(): + return groupCtx.Err() case resp := <-senderCh: if err := stream.Send(resp); err != nil { + log.Printf("Failed to send response: %v", err) return fmt.Errorf("failed to send response to client: %w", err) } } @@ -76,10 +77,11 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn outer: for { select { - - case <-grpCtx.Done(): // Stop reading new messages when we are shutting down + case <-groupCtx.Done(): + // Stop reading new messages when we are shutting down break outer default: + // get out of select and process } d, err := stream.Recv() if err == io.EOF { @@ -94,7 +96,7 @@ outer: break outer } grp.Go(func() (err error) { - return fs.handleRequest(grpCtx, d, senderCh) + return fs.handleRequest(groupCtx, d, senderCh) }) }