Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: shutdown when we see non retryable udf errors #165

Merged
merged 1 commit into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {

for {
datumStreamCh := make(chan Datum)
g, ctx := errgroup.WithContext(ctx)
g, groupCtx := errgroup.WithContext(ctx)

g.Go(func() error {
return fs.receiveRequests(ctx, stream, datumStreamCh)
return fs.receiveRequests(groupCtx, stream, datumStreamCh)
})

g.Go(func() error {
return fs.processData(ctx, stream, datumStreamCh)
return fs.processData(groupCtx, stream, datumStreamCh)
})

// Wait for the goroutines to finish
Expand Down Expand Up @@ -91,19 +91,39 @@ func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error {
return nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream mappb.Map_MapFnServer) (*mappb.MapRequest, error) {
type recvResult struct {
req *mappb.MapRequest
err error
}

resultCh := make(chan recvResult, 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this channel and the goroutine if we put stream.Recv inside select?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot do that since stream.Recv() is a blocking call.

go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// receiveRequests receives the requests from the client and writes them to the datumStreamCh channel.
func (fs *Service) receiveRequests(ctx context.Context, stream mappb.Map_MapFnServer, datumStreamCh chan<- Datum) error {
defer close(datumStreamCh)

for {
select {
case <-ctx.Done():
return nil
default:
req, err := recvWithContext(ctx, stream)
if errors.Is(err, context.Canceled) {
log.Printf("Context cancelled, stopping the MapBatchFn")
return err
}
req, err := stream.Recv()
if err == io.EOF {
log.Printf("end of batch map stream")
if errors.Is(err, io.EOF) {
log.Printf("EOF received, stopping the MapBatchFn")
return err
}
if err != nil {
Expand All @@ -123,8 +143,11 @@ func (fs *Service) receiveRequests(ctx context.Context, stream mappb.Map_MapFnSe
watermark: req.GetRequest().GetWatermark().AsTime(),
headers: req.GetRequest().GetHeaders(),
}

datumStreamCh <- datum
select {
case <-ctx.Done():
return ctx.Err()
case datumStreamCh <- datum:
}
}
return nil
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mapper

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -35,6 +36,27 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
return &mappb.ReadyResponse{Ready: true}, nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream mappb.Map_MapFnServer) (*mappb.MapRequest, error) {
type recvResult struct {
req *mappb.MapRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// MapFn applies a user defined function to each request element and returns a list of results.
func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
// perform handshake with client before processing requests
Expand Down Expand Up @@ -72,13 +94,13 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
// Read requests from the stream and process them
outer:
for {
select {
case <-groupCtx.Done():
req, err := recvWithContext(groupCtx, stream)
if errors.Is(err, context.Canceled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see different behaviour between map and map batchmap above. batchmap is returning the err while map is breaking the outer loop without returning the err. Is this expected? same for map stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batchmap operations are invoked inside a separate function that is the reason we use return.

log.Printf("Context cancelled, stopping the MapFn")
break outer
default:
}
req, err := stream.Recv()
if err == io.EOF {
if errors.Is(err, io.EOF) {
log.Printf("EOF received, stopping the MapFn")
break outer
}
if err != nil {
Expand All @@ -96,6 +118,7 @@ outer:

// wait for all goroutines to finish
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mapstreamer

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -35,6 +36,27 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
return &mappb.ReadyResponse{Ready: true}, nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream mappb.Map_MapFnServer) (*mappb.MapRequest, error) {
type recvResult struct {
req *mappb.MapRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// MapFn applies a function to each request element and streams the results back.
func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
// perform handshake with client before processing requests
Expand All @@ -43,12 +65,17 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
}

ctx := stream.Context()

for {
req, err := stream.Recv()
if err == io.EOF {
req, err := recvWithContext(ctx, stream)
if errors.Is(err, context.Canceled) {
log.Printf("Context cancelled, stopping the MapStreamFn")
break
}
if errors.Is(err, io.EOF) {
log.Printf("EOF received, stopping the MapStreamFn")
break
}

if err != nil {
log.Printf("Failed to receive request: %v", err)
return err
Expand Down
37 changes: 31 additions & 6 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {

for {
datumStreamCh := make(chan Datum)
g, ctx := errgroup.WithContext(ctx)
g, groupCtx := errgroup.WithContext(ctx)

g.Go(func() error {
return fs.receiveRequests(stream, datumStreamCh)
return fs.receiveRequests(groupCtx, stream, datumStreamCh)
})

g.Go(func() error {
return fs.processData(ctx, stream, datumStreamCh)
return fs.processData(groupCtx, stream, datumStreamCh)
})

// Wait for the goroutines to finish
Expand Down Expand Up @@ -130,12 +130,33 @@ func (fs *Service) performHandshake(stream sinkpb.Sink_SinkFnServer) error {
return nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream sinkpb.Sink_SinkFnServer) (*sinkpb.SinkRequest, error) {
type recvResult struct {
req *sinkpb.SinkRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// receiveRequests receives the requests from the client writes them to the datumStreamCh channel.
func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamCh chan<- Datum) error {
func (fs *Service) receiveRequests(ctx context.Context, stream sinkpb.Sink_SinkFnServer, datumStreamCh chan<- Datum) error {
defer close(datumStreamCh)

for {
req, err := stream.Recv()
req, err := recvWithContext(ctx, stream)
if err == io.EOF {
log.Printf("end of sink stream")
return err
Expand All @@ -158,7 +179,11 @@ func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamC
headers: req.GetRequest().GetHeaders(),
}

datumStreamCh <- datum
select {
case <-ctx.Done():
return nil
case datumStreamCh <- datum:
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcer/examples/simple_source/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
replace github.com/numaproj/numaflow-go => ../../../..

require (
github.com/google/uuid v1.6.0
github.com/numaproj/numaflow-go v0.8.1
github.com/stretchr/testify v1.9.0
)
Expand All @@ -16,6 +15,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcer/examples/simple_source/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -17,6 +15,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
Loading
Loading