Skip to content

Commit

Permalink
panic handler and graceful shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 24, 2024
1 parent a2bc6b1 commit 9006d72
Show file tree
Hide file tree
Showing 22 changed files with 527 additions and 158 deletions.
55 changes: 42 additions & 13 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package mapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -15,8 +17,10 @@ import (

// server is a map gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
shutdownCh <-chan struct{}
opts *options
}

// NewServer creates a new map server.
Expand All @@ -25,11 +29,19 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.Mapper = m
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
Mapper: m,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the map server.
Expand All @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the map service
mappb.RegisterMapServer(grpcServer, m.svc)
mappb.RegisterMapServer(m.grpcServer, m.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
20 changes: 16 additions & 4 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mapper
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -19,7 +21,8 @@ const (
// handler.
type Service struct {
mappb.UnimplementedMapServer
Mapper Mapper
Mapper Mapper
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -28,10 +31,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
}

// MapFn applies a user defined function to each request element and returns a list of results.
func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapResponse, error) {
func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.MapResponse, err error) {
var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders())
messages := fs.Mapper.Map(ctx, d.GetKeys(), hd)
var elements []*mappb.MapResponse_Result

// Use defer and recover to handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{} // Send shutdown signal
err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r)
}
}()

messages := fs.Mapper.Map(ctx, d.GetKeys(), hd)
for _, m := range messages.Items() {
elements = append(elements, &mappb.MapResponse_Result{
Keys: m.Keys(),
Expand All @@ -42,5 +54,5 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe
datumList := &mappb.MapResponse{
Results: elements,
}
return datumList, nil
return datumList, err
}
57 changes: 44 additions & 13 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -14,8 +17,10 @@ import (

// server is a map streaming gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
opts *options
shutdownCh <-chan struct{}
}

// NewServer creates a new map streaming server.
Expand All @@ -24,14 +29,22 @@ func NewServer(ms MapStreamer, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.MapperStream = ms
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
MapperStream: ms,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the map streaming gRPC server.
// Start starts the map server.
func (m *server) Start(ctx context.Context) error {
ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
defer stop()
Expand All @@ -50,12 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the map stream service
mapstreampb.RegisterMapStreamServer(m.grpcServer, m.svc)

// register the map streaming service
mapstreampb.RegisterMapStreamServer(grpcServer, m.svc)
// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
9 changes: 8 additions & 1 deletion pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
// streaming function.
type Service struct {
mapstreampb.UnimplementedMapStreamServer

shutdownCh chan<- struct{}
MapperStream MapStreamer
}

Expand All @@ -36,9 +36,16 @@ func (fs *Service) MapStreamFn(d *mapstreampb.MapStreamRequest, stream mapstream

done := make(chan bool)
go func() {
// handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{}
}
}()
fs.MapperStream.MapStream(ctx, d.GetKeys(), hd, messageCh)
done <- true
}()

finished := false
for {
select {
Expand Down
53 changes: 42 additions & 11 deletions pkg/reducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

numaflow "github.com/numaproj/numaflow-go/pkg"
reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -14,8 +17,10 @@ import (

// server is a reduce gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
opts *options
shutdownCh <-chan struct{}
}

// NewServer creates a new reduce server.
Expand All @@ -24,11 +29,19 @@ func NewServer(r ReducerCreator, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.reducerCreatorHandle = r
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
reducerCreatorHandle: r,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the reduce gRPC server.
Expand All @@ -46,12 +59,30 @@ func (r *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(r.opts.maxMessageSize)
defer grpcServer.GracefulStop()
r.grpcServer = shared.CreateGRPCServer(r.opts.maxMessageSize)
defer r.grpcServer.GracefulStop()

// register the reduce service
reducepb.RegisterReduceServer(grpcServer, r.svc)
reducepb.RegisterReduceServer(r.grpcServer, r.svc)

// start a go routine to stop the server gracefully
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-r.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(r.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := r.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
3 changes: 2 additions & 1 deletion pkg/reducer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
type Service struct {
reducepb.UnimplementedReduceServer
reducerCreatorHandle ReducerCreator
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -41,7 +42,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error {
g errgroup.Group
)

taskManager := newReduceTaskManager(fs.reducerCreatorHandle)
taskManager := newReduceTaskManager(fs.reducerCreatorHandle, fs.shutdownCh)

// err group for the go routine which reads from the output channel and sends to the stream
g.Go(func() error {
Expand Down
Loading

0 comments on commit 9006d72

Please sign in to comment.