Skip to content

Commit

Permalink
chore: shutdown when we see non retryable udf errors
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Nov 5, 2024
1 parent 8f4062f commit 6e941e9
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 55 deletions.
47 changes: 35 additions & 12 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {

for {
datumStreamCh := make(chan Datum)
g, ctx := errgroup.WithContext(ctx)
g, groupCtx := errgroup.WithContext(ctx)

g.Go(func() error {
return fs.receiveRequests(ctx, stream, datumStreamCh)
return fs.receiveRequests(groupCtx, stream, datumStreamCh)
})

g.Go(func() error {
return fs.processData(ctx, stream, datumStreamCh)
return fs.processData(groupCtx, stream, datumStreamCh)
})

// Wait for the goroutines to finish
Expand Down Expand Up @@ -91,19 +91,39 @@ func (fs *Service) performHandshake(stream mappb.Map_MapFnServer) error {
return nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream mappb.Map_MapFnServer) (*mappb.MapRequest, error) {
type recvResult struct {
req *mappb.MapRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// receiveRequests receives the requests from the client and writes them to the datumStreamCh channel.
func (fs *Service) receiveRequests(ctx context.Context, stream mappb.Map_MapFnServer, datumStreamCh chan<- Datum) error {
defer close(datumStreamCh)

for {
select {
case <-ctx.Done():
return nil
default:
req, err := recvWithContext(ctx, stream)
if errors.Is(err, context.Canceled) {
log.Printf("Context cancelled, stopping the MapBatchFn")
return err
}
req, err := stream.Recv()
if err == io.EOF {
log.Printf("end of batch map stream")
if errors.Is(err, io.EOF) {
log.Printf("EOF received, stopping the MapBatchFn")
return err
}
if err != nil {
Expand All @@ -123,8 +143,11 @@ func (fs *Service) receiveRequests(ctx context.Context, stream mappb.Map_MapFnSe
watermark: req.GetRequest().GetWatermark().AsTime(),
headers: req.GetRequest().GetHeaders(),
}

datumStreamCh <- datum
select {
case <-ctx.Done():
return ctx.Err()
case datumStreamCh <- datum:
}
}
return nil
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mapper

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -35,6 +36,27 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
return &mappb.ReadyResponse{Ready: true}, nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream mappb.Map_MapFnServer) (*mappb.MapRequest, error) {
type recvResult struct {
req *mappb.MapRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// MapFn applies a user defined function to each request element and returns a list of results.
func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
// perform handshake with client before processing requests
Expand Down Expand Up @@ -72,13 +94,13 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
// Read requests from the stream and process them
outer:
for {
select {
case <-groupCtx.Done():
req, err := recvWithContext(groupCtx, stream)
if errors.Is(err, context.Canceled) {
log.Printf("Context cancelled, stopping the MapFn")
break outer
default:
}
req, err := stream.Recv()
if err == io.EOF {
if errors.Is(err, io.EOF) {
log.Printf("EOF received, stopping the MapFn")
break outer
}
if err != nil {
Expand All @@ -96,6 +118,7 @@ outer:

// wait for all goroutines to finish
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
return status.Errorf(codes.Internal, "error processing requests: %v", err)
}
Expand Down
33 changes: 30 additions & 3 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package mapstreamer

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -35,6 +36,27 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
return &mappb.ReadyResponse{Ready: true}, nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream mappb.Map_MapFnServer) (*mappb.MapRequest, error) {
type recvResult struct {
req *mappb.MapRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// MapFn applies a function to each request element and streams the results back.
func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
// perform handshake with client before processing requests
Expand All @@ -43,12 +65,17 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
}

ctx := stream.Context()

for {
req, err := stream.Recv()
if err == io.EOF {
req, err := recvWithContext(ctx, stream)
if errors.Is(err, context.Canceled) {
log.Printf("Context cancelled, stopping the MapStreamFn")
break
}
if errors.Is(err, io.EOF) {
log.Printf("EOF received, stopping the MapStreamFn")
break
}

if err != nil {
log.Printf("Failed to receive request: %v", err)
return err
Expand Down
37 changes: 31 additions & 6 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {

for {
datumStreamCh := make(chan Datum)
g, ctx := errgroup.WithContext(ctx)
g, groupCtx := errgroup.WithContext(ctx)

g.Go(func() error {
return fs.receiveRequests(stream, datumStreamCh)
return fs.receiveRequests(groupCtx, stream, datumStreamCh)
})

g.Go(func() error {
return fs.processData(ctx, stream, datumStreamCh)
return fs.processData(groupCtx, stream, datumStreamCh)
})

// Wait for the goroutines to finish
Expand Down Expand Up @@ -130,12 +130,33 @@ func (fs *Service) performHandshake(stream sinkpb.Sink_SinkFnServer) error {
return nil
}

// recvWithContext wraps stream.Recv() to respect context cancellation.
func recvWithContext(ctx context.Context, stream sinkpb.Sink_SinkFnServer) (*sinkpb.SinkRequest, error) {
type recvResult struct {
req *sinkpb.SinkRequest
err error
}

resultCh := make(chan recvResult, 1)
go func() {
req, err := stream.Recv()
resultCh <- recvResult{req: req, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case result := <-resultCh:
return result.req, result.err
}
}

// receiveRequests receives the requests from the client writes them to the datumStreamCh channel.
func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamCh chan<- Datum) error {
func (fs *Service) receiveRequests(ctx context.Context, stream sinkpb.Sink_SinkFnServer, datumStreamCh chan<- Datum) error {
defer close(datumStreamCh)

for {
req, err := stream.Recv()
req, err := recvWithContext(ctx, stream)
if err == io.EOF {
log.Printf("end of sink stream")
return err
Expand All @@ -158,7 +179,11 @@ func (fs *Service) receiveRequests(stream sinkpb.Sink_SinkFnServer, datumStreamC
headers: req.GetRequest().GetHeaders(),
}

datumStreamCh <- datum
select {
case <-ctx.Done():
return nil
case datumStreamCh <- datum:
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sourcer/examples/simple_source/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.21
replace github.com/numaproj/numaflow-go => ../../../..

require (
github.com/google/uuid v1.6.0
github.com/numaproj/numaflow-go v0.8.1
github.com/stretchr/testify v1.9.0
)
Expand All @@ -16,6 +15,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/sourcer/examples/simple_source/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -17,6 +15,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
Expand Down
Loading

0 comments on commit 6e941e9

Please sign in to comment.