Skip to content

Commit

Permalink
fix: server stuck during shutdown in multi-partitioned pipeline (#175)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Feb 22, 2025
1 parent 1d29855 commit 0400b03
Show file tree
Hide file tree
Showing 14 changed files with 54 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
svc := &Service{
BatchMapper: m,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &server{
Expand Down
8 changes: 6 additions & 2 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"runtime/debug"
"sync"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
Expand All @@ -31,6 +32,7 @@ type Service struct {
mappb.UnimplementedMapServer
BatchMapper BatchMapper
shutdownCh chan<- struct{}
once sync.Once
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand Down Expand Up @@ -65,8 +67,10 @@ func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error {
log.Printf("Stopping the BatchMapFn")
return nil
}
log.Printf("Stopping the BatchMapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
log.Printf("Stopping the BatchMapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
})
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
svc := &Service{
Mapper: m,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &server{
Expand Down
8 changes: 6 additions & 2 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"runtime/debug"
"sync"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
Expand All @@ -32,6 +33,7 @@ type Service struct {
mappb.UnimplementedMapServer
Mapper Mapper
shutdownCh chan<- struct{}
once sync.Once
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand Down Expand Up @@ -121,8 +123,10 @@ outer:

// wait for all goroutines to finish
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
})
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewServer(ms MapStreamer, inputOptions ...Option) numaflow.Server {
svc := &Service{
MapperStream: ms,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &server{
Expand Down
8 changes: 6 additions & 2 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"runtime/debug"
"sync"

"golang.org/x/sync/errgroup"
epb "google.golang.org/genproto/googleapis/rpc/errdetails"
Expand All @@ -32,6 +33,7 @@ type Service struct {
mappb.UnimplementedMapServer
shutdownCh chan<- struct{}
MapperStream MapStreamer
once sync.Once
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand Down Expand Up @@ -112,8 +114,10 @@ outer:

// wait for all goroutines to finish
if err := g.Wait(); err != nil {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
log.Printf("Stopping the MapFn with err, %s", err)
fs.shutdownCh <- struct{}{}
})
return err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewSideInputServer(r SideInputRetriever, inputOptions ...Option) numaflow.S
svc := &Service{
Retriever: r,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &server{
Expand Down
6 changes: 5 additions & 1 deletion pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"log"
"runtime/debug"
"sync"

epb "google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
Expand All @@ -29,6 +30,7 @@ type Service struct {
sideinputpb.UnimplementedSideInputServer
Retriever SideInputRetriever
shutdownCh chan<- struct{}
once sync.Once
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -42,7 +44,9 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (res
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
fs.shutdownCh <- struct{}{}
})
st, _ := status.Newf(codes.Internal, "%s: %v", errSideInputHandlerPanic, r).WithDetails(&epb.DebugInfo{
Detail: string(debug.Stack()),
})
Expand Down
1 change: 1 addition & 0 deletions pkg/sinker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewServer(h Sinker, inputOptions ...Option) numaflow.Server {
svc := &Service{
Sinker: h,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &sinkServer{
Expand Down
8 changes: 6 additions & 2 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"runtime/debug"
"sync"
"time"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -70,6 +71,7 @@ type Service struct {
sinkpb.UnimplementedSinkServer
shutdownCh chan<- struct{}
Sinker Sinker
once sync.Once
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand Down Expand Up @@ -104,8 +106,10 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
log.Printf("Stopping the SinkFn")
return nil
}
log.Printf("Stopping the SinkFn with err, %s", err)
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
log.Printf("Stopping the SinkFn with err, %s", err)
fs.shutdownCh <- struct{}{}
})
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sourcer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func NewServer(
svc := &Service{
Source: source,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &server{
Expand Down
16 changes: 12 additions & 4 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"runtime/debug"
"sync"
"time"

"golang.org/x/sync/errgroup"
Expand All @@ -31,6 +32,7 @@ type Service struct {
sourcepb.UnimplementedSourceServer
Source Sourcer
shutdownCh chan<- struct{}
once sync.Once
}

var errSourceReadPanic = errors.New("UDF_EXECUTION_ERROR(source)")
Expand All @@ -49,8 +51,10 @@ func (fs *Service) ReadFn(stream sourcepb.Source_ReadFnServer) error {
if errors.Is(err, io.EOF) {
return nil
}
log.Printf("error processing requests: %v", err)
fs.shutdownCh <- struct{}{}
log.Printf("error processing read requests: %v", err)
fs.once.Do(func() {
fs.shutdownCh <- struct{}{}
})
return err
}
}
Expand Down Expand Up @@ -280,7 +284,9 @@ func (fs *Service) receiveAckRequests(ctx context.Context, stream sourcepb.Sourc
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
fs.shutdownCh <- struct{}{}
})
err = fmt.Errorf("panic inside source handler: %v", r)
}
}()
Expand Down Expand Up @@ -328,7 +334,9 @@ func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.P
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sourcer handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
fs.shutdownCh <- struct{}{}
})
}
}()

Expand Down
1 change: 1 addition & 0 deletions pkg/sourcetransformer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewServer(m SourceTransformer, inputOptions ...Option) numaflow.Server {
svc := &Service{
Transformer: m,
shutdownCh: shutdownCh,
once: sync.Once{},
}

return &server{
Expand Down
8 changes: 6 additions & 2 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"log"
"runtime/debug"
"sync"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
Expand All @@ -32,6 +33,7 @@ type Service struct {
v1.UnimplementedSourceTransformServer
Transformer SourceTransformer
shutdownCh chan<- struct{}
once sync.Once
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand Down Expand Up @@ -122,8 +124,10 @@ outer:

// wait for all the goroutines to finish, if any of the goroutines return an error, wait will return that error immediately.
if err := grp.Wait(); err != nil {
log.Printf("Stopping the SourceTransformFn with err, %s", err)
fs.shutdownCh <- struct{}{}
fs.once.Do(func() {
log.Printf("Stopping the SourceTransformFn with err, %s", err)
fs.shutdownCh <- struct{}{}
})
return err
}

Expand Down

0 comments on commit 0400b03

Please sign in to comment.