Skip to content

Commit

Permalink
chore: code review (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
vigith authored Oct 8, 2024
1 parent 702acef commit 445ee15
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
14 changes: 7 additions & 7 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

// Use error group to manage goroutines, the grpCtx is cancelled when any of the
// Use error group to manage goroutines, the groupCtx is cancelled when any of the
// goroutines return an error for the first time or the first time the wait returns.
g, grpCtx := errgroup.WithContext(ctx)
g, groupCtx := errgroup.WithContext(ctx)

// Channel to collect responses
responseCh := make(chan *mappb.MapResponse, 500) // FIXME: identify the right buffer size
Expand All @@ -59,11 +59,11 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
select {
case resp := <-responseCh:
if err := stream.Send(resp); err != nil {
log.Printf("failed to send response: %v", err)
log.Printf("Failed to send response: %v", err)
return err
}
case <-grpCtx.Done():
return grpCtx.Err()
case <-groupCtx.Done():
return groupCtx.Err()
}
}
})
Expand All @@ -73,7 +73,7 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
outer:
for {
select {
case <-grpCtx.Done():
case <-groupCtx.Done():
break outer
default:
}
Expand All @@ -90,7 +90,7 @@ outer:
break outer
}
g.Go(func() error {
return fs.handleRequest(grpCtx, req, responseCh)
return fs.handleRequest(groupCtx, req, responseCh)
})
}

Expand Down
16 changes: 9 additions & 7 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,20 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()

// Use error group to manage goroutines, the grpCtx is cancelled when any of the
// Use error group to manage goroutines, the groupCtx is cancelled when any of the
// goroutines return an error for the first time or the first time the wait returns.
grp, grpCtx := errgroup.WithContext(ctx)
grp, groupCtx := errgroup.WithContext(ctx)

senderCh := make(chan *v1.SourceTransformResponse, 500) // FIXME: identify the right buffer size
// goroutine to send the responses back to the client
grp.Go(func() error {
for {
select {
case <-grpCtx.Done():
return grpCtx.Err()
case <-groupCtx.Done():
return groupCtx.Err()
case resp := <-senderCh:
if err := stream.Send(resp); err != nil {
log.Printf("Failed to send response: %v", err)
return fmt.Errorf("failed to send response to client: %w", err)
}
}
Expand All @@ -76,10 +77,11 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
outer:
for {
select {

case <-grpCtx.Done(): // Stop reading new messages when we are shutting down
case <-groupCtx.Done():
// Stop reading new messages when we are shutting down
break outer
default:
// get out of select and process
}
d, err := stream.Recv()
if err == io.EOF {
Expand All @@ -94,7 +96,7 @@ outer:
break outer
}
grp.Go(func() (err error) {
return fs.handleRequest(grpCtx, d, senderCh)
return fs.handleRequest(groupCtx, d, senderCh)
})
}

Expand Down

0 comments on commit 445ee15

Please sign in to comment.