From 7a3bad8e01bc999d9fee71f2a690bbc2f0b018ca Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 26 Jul 2024 08:05:39 +0530 Subject: [PATCH] chore: handle panic and do graceful shutdown of the server (#138) Signed-off-by: Yashash H L --- pkg/batchmapper/server.go | 55 ++++++++++++++----- pkg/batchmapper/service.go | 7 +++ pkg/mapper/server.go | 55 ++++++++++++++----- pkg/mapper/server_test.go | 86 +++++++++++++++++++++++++++++- pkg/mapper/service.go | 20 +++++-- pkg/mapstreamer/server.go | 57 +++++++++++++++----- pkg/mapstreamer/service.go | 9 +++- pkg/reducer/server.go | 53 ++++++++++++++---- pkg/reducer/service.go | 3 +- pkg/reducer/task_manager.go | 33 ++++++------ pkg/reducestreamer/server.go | 54 +++++++++++++++---- pkg/reducestreamer/service.go | 3 +- pkg/reducestreamer/task_manager.go | 11 +++- pkg/sessionreducer/server.go | 56 ++++++++++++++----- pkg/sessionreducer/service.go | 3 +- pkg/sessionreducer/task_manager.go | 18 ++++++- pkg/shared/util.go | 32 ++++++----- pkg/sideinput/server.go | 55 ++++++++++++++----- pkg/sideinput/service.go | 9 +++- pkg/sinker/server.go | 56 ++++++++++++++----- pkg/sinker/service.go | 10 +++- pkg/sourcer/server.go | 56 ++++++++++++++----- pkg/sourcer/service.go | 30 ++++++++++- pkg/sourcetransformer/server.go | 55 ++++++++++++++----- pkg/sourcetransformer/service.go | 7 +++ 25 files changed, 661 insertions(+), 172 deletions(-) diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 2d46ef09..61736427 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -3,10 +3,12 @@ package batchmapper import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -15,8 +17,10 @@ import ( // server is a map gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new batch map server. @@ -25,11 +29,19 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.BatchMapper = m - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + BatchMapper: m, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the batch map server. @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) // register the batch map service - batchmappb.RegisterBatchMapServer(grpcServer, m.svc) + batchmappb.RegisterBatchMapServer(m.grpcServer, m.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-m.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(m.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := m.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 5923e4af..ba4df99d 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -27,6 +27,7 @@ const ( type Service struct { batchmappb.UnimplementedBatchMapServer BatchMapper BatchMapper + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -51,6 +52,12 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error // go routine to invoke the user handler function, and process the responses. g.Go(func() error { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() // Apply the user BatchMap implementation function responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh) diff --git a/pkg/mapper/server.go b/pkg/mapper/server.go index 17f3e331..456f33c6 100644 --- a/pkg/mapper/server.go +++ b/pkg/mapper/server.go @@ -3,10 +3,12 @@ package mapper import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -15,8 +17,10 @@ import ( // server is a map gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + shutdownCh <-chan struct{} + opts *options } // NewServer creates a new map server. @@ -25,11 +29,19 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.Mapper = m - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + Mapper: m, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the map server. @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) // register the map service - mappb.RegisterMapServer(grpcServer, m.svc) + mappb.RegisterMapServer(m.grpcServer, m.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-m.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(m.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := m.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/mapper/server_test.go b/pkg/mapper/server_test.go index c4736e85..8c661ff0 100644 --- a/pkg/mapper/server_test.go +++ b/pkg/mapper/server_test.go @@ -3,10 +3,15 @@ package mapper import ( "context" "os" + "sync" "testing" "time" "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" ) func TestMapServer_Start(t *testing.T) { @@ -25,8 +30,87 @@ func TestMapServer_Start(t *testing.T) { return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) }) // note: using actual uds connection - ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx) assert.NoError(t, err) } + +// tests the case where the server is shutdown gracefully when a panic occurs in the map handler +func TestMapServer_GracefulShutdown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + dir := t.TempDir() + socketFile, _ := os.Create(dir + "/test.sock") + defer func() { + _ = os.RemoveAll(socketFile.Name()) + }() + + serverInfoFile, _ := os.Create(dir + "/numaflow-test-info") + defer func() { + _ = os.RemoveAll(serverInfoFile.Name()) + }() + + var mapHandler = MapperFunc(func(ctx context.Context, keys []string, d Datum) Messages { + msg := d.Value() + if keys[0] == "key2" { + time.Sleep(20 * time.Millisecond) + panic("panic test") + } + time.Sleep(50 * time.Millisecond) + return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"})) + }) + + done := make(chan struct{}) + go func() { + err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(socketFile.Name())).Start(ctx) + assert.NoError(t, err) + close(done) + }() + + // wait for the server to start + time.Sleep(10 * time.Millisecond) + + // create a client + conn, err := grpc.Dial( + "unix://"+socketFile.Name(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + t.Fatalf("Failed to dial server: %v", err) + } + defer conn.Close() + + client := mappb.NewMapClient(conn) + // send two map requests with key1 and key2 as keys simultaneously + keys := []string{"key1", "key2"} + var wg sync.WaitGroup + for _, key := range keys { + wg.Add(1) + go func(key string) { + defer wg.Done() + req := &mappb.MapRequest{ + Keys: []string{key}, + } + + resp, err := client.MapFn(ctx, req) + // since there is a panic in the map handler for key2, we should get an error + // other requests should be successful + if key == "key2" { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, resp) + } + }(key) + } + + wg.Wait() + // wait for the server to shutdown gracefully because of the panic + select { + case <-ctx.Done(): + t.Fatal("server did not shutdown gracefully") + case <-done: + } +} diff --git a/pkg/mapper/service.go b/pkg/mapper/service.go index 636f494e..7d5c2dc7 100644 --- a/pkg/mapper/service.go +++ b/pkg/mapper/service.go @@ -3,6 +3,8 @@ package mapper import ( "context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1" @@ -19,7 +21,8 @@ const ( // handler. type Service struct { mappb.UnimplementedMapServer - Mapper Mapper + Mapper Mapper + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -28,10 +31,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons } // MapFn applies a user defined function to each request element and returns a list of results. -func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapResponse, error) { +func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.MapResponse, err error) { var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders()) - messages := fs.Mapper.Map(ctx, d.GetKeys(), hd) var elements []*mappb.MapResponse_Result + + // Use defer and recover to handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} // Send shutdown signal + err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r) + } + }() + + messages := fs.Mapper.Map(ctx, d.GetKeys(), hd) for _, m := range messages.Items() { elements = append(elements, &mappb.MapResponse_Result{ Keys: m.Keys(), @@ -42,5 +54,5 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe datumList := &mappb.MapResponse{ Results: elements, } - return datumList, nil + return datumList, err } diff --git a/pkg/mapstreamer/server.go b/pkg/mapstreamer/server.go index 9bb39b20..5b810fc9 100644 --- a/pkg/mapstreamer/server.go +++ b/pkg/mapstreamer/server.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" mapstreampb "github.com/numaproj/numaflow-go/pkg/apis/proto/mapstream/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -14,8 +17,10 @@ import ( // server is a map streaming gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new map streaming server. @@ -24,14 +29,22 @@ func NewServer(ms MapStreamer, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.MapperStream = ms - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + MapperStream: ms, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } -// Start starts the map streaming gRPC server. +// Start starts the map server. func (m *server) Start(ctx context.Context) error { ctxWithSignal, stop := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM) defer stop() @@ -50,12 +63,30 @@ func (m *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) - defer grpcServer.GracefulStop() + m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) + + // register the map stream service + mapstreampb.RegisterMapStreamServer(m.grpcServer, m.svc) - // register the map streaming service - mapstreampb.RegisterMapStreamServer(grpcServer, m.svc) + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-m.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(m.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := m.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/mapstreamer/service.go b/pkg/mapstreamer/service.go index 2254f26f..3a865869 100644 --- a/pkg/mapstreamer/service.go +++ b/pkg/mapstreamer/service.go @@ -19,7 +19,7 @@ const ( // streaming function. type Service struct { mapstreampb.UnimplementedMapStreamServer - + shutdownCh chan<- struct{} MapperStream MapStreamer } @@ -36,9 +36,16 @@ func (fs *Service) MapStreamFn(d *mapstreampb.MapStreamRequest, stream mapstream done := make(chan bool) go func() { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() fs.MapperStream.MapStream(ctx, d.GetKeys(), hd, messageCh) done <- true }() + finished := false for { select { diff --git a/pkg/reducer/server.go b/pkg/reducer/server.go index 6de6d60b..e82f8039 100644 --- a/pkg/reducer/server.go +++ b/pkg/reducer/server.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + numaflow "github.com/numaproj/numaflow-go/pkg" reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -14,8 +17,10 @@ import ( // server is a reduce gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new reduce server. @@ -24,11 +29,19 @@ func NewServer(r ReducerCreator, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.reducerCreatorHandle = r - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + reducerCreatorHandle: r, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the reduce gRPC server. @@ -46,12 +59,30 @@ func (r *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(r.opts.maxMessageSize) - defer grpcServer.GracefulStop() + r.grpcServer = shared.CreateGRPCServer(r.opts.maxMessageSize) + defer r.grpcServer.GracefulStop() // register the reduce service - reducepb.RegisterReduceServer(grpcServer, r.svc) + reducepb.RegisterReduceServer(r.grpcServer, r.svc) + + // start a go routine to stop the server gracefully + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-r.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(r.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := r.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/reducer/service.go b/pkg/reducer/service.go index e61e9b4d..5786dcd1 100644 --- a/pkg/reducer/service.go +++ b/pkg/reducer/service.go @@ -26,6 +26,7 @@ const ( type Service struct { reducepb.UnimplementedReduceServer reducerCreatorHandle ReducerCreator + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -41,7 +42,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { g errgroup.Group ) - taskManager := newReduceTaskManager(fs.reducerCreatorHandle) + taskManager := newReduceTaskManager(fs.reducerCreatorHandle, fs.shutdownCh) // err group for the go routine which reads from the output channel and sends to the stream g.Go(func() error { diff --git a/pkg/reducer/task_manager.go b/pkg/reducer/task_manager.go index 537f6b59..d2f03730 100644 --- a/pkg/reducer/task_manager.go +++ b/pkg/reducer/task_manager.go @@ -10,12 +10,11 @@ import ( // reduceTask represents a task for a performing reduceStream operation. type reduceTask struct { - keys []string - window *v1.Window - reducer Reducer - inputCh chan Datum - outputCh chan Message - doneCh chan struct{} + keys []string + window *v1.Window + reducer Reducer + inputCh chan Datum + doneCh chan struct{} } // buildReduceResponse builds the reduce response from the messages. @@ -55,13 +54,15 @@ type reduceTaskManager struct { reducerCreatorHandle ReducerCreator tasks map[string]*reduceTask responseCh chan *v1.ReduceResponse + shutdownCh chan<- struct{} } -func newReduceTaskManager(reducerCreatorHandle ReducerCreator) *reduceTaskManager { +func newReduceTaskManager(reducerCreatorHandle ReducerCreator, shutdownCh chan<- struct{}) *reduceTaskManager { return &reduceTaskManager{ reducerCreatorHandle: reducerCreatorHandle, tasks: make(map[string]*reduceTask), responseCh: make(chan *v1.ReduceResponse), + shutdownCh: shutdownCh, } } @@ -75,17 +76,22 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce request.Operation.Windows[0].GetEnd().AsTime())) task := &reduceTask{ - keys: request.GetPayload().GetKeys(), - window: request.Operation.Windows[0], - inputCh: make(chan Datum), - outputCh: make(chan Message), - doneCh: make(chan struct{}), + keys: request.GetPayload().GetKeys(), + window: request.Operation.Windows[0], + inputCh: make(chan Datum), + doneCh: make(chan struct{}), } key := task.uniqueKey() rtm.tasks[key] = task go func() { + // handle panic + defer func() { + if r := recover(); r != nil { + rtm.shutdownCh <- struct{}{} + } + }() // invoke the reduce function // create a new reducer, since we got a new key reducerHandle := rtm.reducerCreatorHandle.Create() @@ -95,9 +101,6 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce // write the output to the output channel, service will forward it to downstream rtm.responseCh <- task.buildReduceResponse(message) } - // close the output channel after the reduce function is done - close(task.outputCh) - // send a done signal close(task.doneCh) }() diff --git a/pkg/reducestreamer/server.go b/pkg/reducestreamer/server.go index 0021b73c..0d460b6f 100644 --- a/pkg/reducestreamer/server.go +++ b/pkg/reducestreamer/server.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" reducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -14,8 +17,10 @@ import ( // server is a reduceStream gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new reduceStream server. @@ -24,11 +29,19 @@ func NewServer(r ReduceStreamerCreator, inputOptions ...Option) numaflow.Server for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.creatorHandle = r - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + creatorHandle: r, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the reduceStream gRPC server. @@ -46,12 +59,31 @@ func (r *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(r.opts.maxMessageSize) - defer grpcServer.GracefulStop() + r.grpcServer = shared.CreateGRPCServer(r.opts.maxMessageSize) + defer r.grpcServer.GracefulStop() // register the reduceStream service - reducepb.RegisterReduceServer(grpcServer, r.svc) + reducepb.RegisterReduceServer(r.grpcServer, r.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-r.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(r.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := r.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/reducestreamer/service.go b/pkg/reducestreamer/service.go index 38182c23..ac2eaee1 100644 --- a/pkg/reducestreamer/service.go +++ b/pkg/reducestreamer/service.go @@ -26,6 +26,7 @@ const ( type Service struct { reducepb.UnimplementedReduceServer creatorHandle ReduceStreamerCreator + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -41,7 +42,7 @@ func (fs *Service) ReduceFn(stream reducepb.Reduce_ReduceFnServer) error { g errgroup.Group ) - taskManager := newReduceTaskManager(fs.creatorHandle) + taskManager := newReduceTaskManager(fs.creatorHandle, fs.shutdownCh) // err group for the go routine which reads from the output channel and sends to the stream g.Go(func() error { diff --git a/pkg/reducestreamer/task_manager.go b/pkg/reducestreamer/task_manager.go index 15c2cc62..2a6db6e6 100644 --- a/pkg/reducestreamer/task_manager.go +++ b/pkg/reducestreamer/task_manager.go @@ -56,13 +56,15 @@ type reduceStreamTaskManager struct { creatorHandle ReduceStreamerCreator tasks map[string]*reduceStreamTask responseCh chan *v1.ReduceResponse + shutdownCh chan<- struct{} } -func newReduceTaskManager(reduceStreamerCreator ReduceStreamerCreator) *reduceStreamTaskManager { +func newReduceTaskManager(reduceStreamerCreator ReduceStreamerCreator, shutdownCh chan<- struct{}) *reduceStreamTaskManager { return &reduceStreamTaskManager{ creatorHandle: reduceStreamerCreator, tasks: make(map[string]*reduceStreamTask), responseCh: make(chan *v1.ReduceResponse), + shutdownCh: shutdownCh, } } @@ -97,6 +99,13 @@ func (rtm *reduceStreamTaskManager) CreateTask(ctx context.Context, request *v1. } }() + // handle panic + defer func() { + if r := recover(); r != nil { + rtm.shutdownCh <- struct{}{} + } + }() + reduceStreamerHandle := rtm.creatorHandle.Create() // invoke the reduceStream function reduceStreamerHandle.ReduceStream(ctx, request.GetPayload().GetKeys(), task.inputCh, task.outputCh, md) diff --git a/pkg/sessionreducer/server.go b/pkg/sessionreducer/server.go index ed5befb4..4249e76b 100644 --- a/pkg/sessionreducer/server.go +++ b/pkg/sessionreducer/server.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" sessionreducepb "github.com/numaproj/numaflow-go/pkg/apis/proto/sessionreduce/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -14,8 +17,10 @@ import ( // server is a session reduce gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new session reduce server. @@ -24,11 +29,19 @@ func NewServer(r SessionReducerCreator, inputOptions ...Option) numaflow.Server for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.creatorHandle = r - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + creatorHandle: r, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the session reduce gRPC server. @@ -46,12 +59,31 @@ func (r *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(r.opts.maxMessageSize) - defer grpcServer.GracefulStop() + r.grpcServer = shared.CreateGRPCServer(r.opts.maxMessageSize) + defer r.grpcServer.GracefulStop() + + // register the session reduce service + sessionreducepb.RegisterSessionReduceServer(r.grpcServer, r.svc) - // register the sessionReduce service - sessionreducepb.RegisterSessionReduceServer(grpcServer, r.svc) + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-r.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(r.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := r.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/sessionreducer/service.go b/pkg/sessionreducer/service.go index cf033ffa..28959207 100644 --- a/pkg/sessionreducer/service.go +++ b/pkg/sessionreducer/service.go @@ -24,6 +24,7 @@ const ( type Service struct { sessionreducepb.UnimplementedSessionReduceServer creatorHandle SessionReducerCreator + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -35,7 +36,7 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sessionreducepb.Re func (fs *Service) SessionReduceFn(stream sessionreducepb.SessionReduce_SessionReduceFnServer) error { ctx := stream.Context() - taskManager := newReduceTaskManager(fs.creatorHandle) + taskManager := newReduceTaskManager(fs.creatorHandle, fs.shutdownCh) // err group for the go routine which reads from the output channel and sends to the stream var g errgroup.Group diff --git a/pkg/sessionreducer/task_manager.go b/pkg/sessionreducer/task_manager.go index 49500eef..12e912f1 100644 --- a/pkg/sessionreducer/task_manager.go +++ b/pkg/sessionreducer/task_manager.go @@ -78,13 +78,15 @@ type sessionReduceTaskManager struct { tasks map[string]*sessionReduceTask responseCh chan *v1.SessionReduceResponse rw sync.RWMutex + shutdownCh chan<- struct{} } -func newReduceTaskManager(sessionReducerFactory SessionReducerCreator) *sessionReduceTaskManager { +func newReduceTaskManager(sessionReducerFactory SessionReducerCreator, shutdownCh chan<- struct{}) *sessionReduceTaskManager { return &sessionReduceTaskManager{ creatorHandle: sessionReducerFactory, tasks: make(map[string]*sessionReduceTask), responseCh: make(chan *v1.SessionReduceResponse), + shutdownCh: shutdownCh, } } @@ -129,6 +131,13 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1 } }() + // handle panic + defer func() { + if r := recover(); r != nil { + rtm.shutdownCh <- struct{}{} + } + }() + task.sessionReducer.SessionReduce(ctx, task.getKeyedWindow().GetKeys(), task.inputCh, task.outputCh) // close the output channel and wait for the response to be forwarded close(task.outputCh) @@ -195,6 +204,13 @@ func (rtm *sessionReduceTaskManager) CloseTask(request *v1.SessionReduceRequest) // MergeTasks merges the session reduce tasks. It will create a new task with the merged window and // merges the accumulators from the other tasks to the merged task. func (rtm *sessionReduceTaskManager) MergeTasks(ctx context.Context, request *v1.SessionReduceRequest) error { + // handle panic + defer func() { + if r := recover(); r != nil { + rtm.shutdownCh <- struct{}{} + } + }() + rtm.rw.Lock() mergedWindow := request.Operation.KeyedWindows[0] diff --git a/pkg/shared/util.go b/pkg/shared/util.go index 93fc7c4a..347ed1a7 100644 --- a/pkg/shared/util.go +++ b/pkg/shared/util.go @@ -1,11 +1,10 @@ package shared import ( - "context" "fmt" - "log" "net" "os" + "time" "google.golang.org/grpc" @@ -49,22 +48,21 @@ func CreateGRPCServer(maxMessageSize int) *grpc.Server { ) } -func StartGRPCServer(ctx context.Context, grpcServer *grpc.Server, lis net.Listener) error { - errCh := make(chan error, 1) - defer close(errCh) - go func(ch chan<- error) { - log.Println("starting the gRPC server with unix domain socket...", lis.Addr()) - err := grpcServer.Serve(lis) - if err != nil { - ch <- fmt.Errorf("failed to start the gRPC server: %v", err) - } - }(errCh) +func StopGRPCServer(grpcServer *grpc.Server) { + // Stop stops the gRPC server gracefully. + // wait for the server to stop gracefully for 30 seconds + // if it is not stopped, stop it forcefully + stopped := make(chan struct{}) + go func() { + grpcServer.GracefulStop() + close(stopped) + }() + t := time.NewTimer(30 * time.Second) select { - case err := <-errCh: - return err - case <-ctx.Done(): - log.Println("Got a signal: terminating gRPC server...") + case <-t.C: + grpcServer.Stop() + case <-stopped: + t.Stop() } - return nil } diff --git a/pkg/sideinput/server.go b/pkg/sideinput/server.go index d6202d99..e32ab7a1 100644 --- a/pkg/sideinput/server.go +++ b/pkg/sideinput/server.go @@ -3,10 +3,12 @@ package sideinput import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" sideinputpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sideinput/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -15,8 +17,10 @@ import ( // server is a side input gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewSideInputServer creates a new server object. @@ -25,11 +29,19 @@ func NewSideInputServer(r SideInputRetriever, inputOptions ...Option) numaflow.S for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.Retriever = r - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + Retriever: r, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the gRPC server via unix domain socket at configs.address and return error. @@ -46,13 +58,30 @@ func (s *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(s.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + s.grpcServer = shared.CreateGRPCServer(s.opts.maxMessageSize) // register the side input service - sideinputpb.RegisterSideInputServer(grpcServer, s.svc) + sideinputpb.RegisterSideInputServer(s.grpcServer, s.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-s.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(s.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := s.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/sideinput/service.go b/pkg/sideinput/service.go index 7a1e5d53..b10bdff7 100644 --- a/pkg/sideinput/service.go +++ b/pkg/sideinput/service.go @@ -19,7 +19,8 @@ const ( // Service implements the proto gen server interface and contains the retrieve operation handler type Service struct { sideinputpb.UnimplementedSideInputServer - Retriever SideInputRetriever + Retriever SideInputRetriever + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -29,6 +30,12 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sideinputpb.ReadyR // RetrieveSideInput applies the function for each side input retrieval request. func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*sideinputpb.SideInputResponse, error) { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() messageSi := fs.Retriever.RetrieveSideInput(ctx) var element *sideinputpb.SideInputResponse element = &sideinputpb.SideInputResponse{ diff --git a/pkg/sinker/server.go b/pkg/sinker/server.go index 5ae8748a..3e298f66 100644 --- a/pkg/sinker/server.go +++ b/pkg/sinker/server.go @@ -3,10 +3,12 @@ package sinker import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + numaflow "github.com/numaproj/numaflow-go/pkg" sinkpb "github.com/numaproj/numaflow-go/pkg/apis/proto/sink/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -15,8 +17,10 @@ import ( // sinkServer is a sink gRPC server. type sinkServer struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new sinkServer object. @@ -25,11 +29,19 @@ func NewServer(h Sinker, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(sinkServer) - s.svc = new(Service) - s.svc.Sinker = h - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + Sinker: h, + shutdownCh: shutdownCh, + } + + return &sinkServer{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the gRPC sinkServer via unix domain socket at configs.address and return error. @@ -47,14 +59,32 @@ func (s *sinkServer) Start(ctx context.Context) error { // close the listener defer func() { _ = lis.Close() }() + // create a grpc server - grpcServer := shared.CreateGRPCServer(s.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + s.grpcServer = shared.CreateGRPCServer(s.opts.maxMessageSize) // register the sink service - sinkpb.RegisterSinkServer(grpcServer, s.svc) + sinkpb.RegisterSinkServer(s.grpcServer, s.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-s.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(s.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := s.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/sinker/service.go b/pkg/sinker/service.go index 67f99838..d775cbdd 100644 --- a/pkg/sinker/service.go +++ b/pkg/sinker/service.go @@ -59,8 +59,8 @@ func (h *handlerDatum) Headers() map[string]string { // Service implements the proto gen server interface and contains the sinkfn operation handler. type Service struct { sinkpb.UnimplementedSinkServer - - Sinker Sinker + shutdownCh chan<- struct{} + Sinker Sinker } // IsReady returns true to indicate the gRPC connection is ready. @@ -80,6 +80,12 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error { wg.Add(1) go func() { defer wg.Done() + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() messages := fs.Sinker.Sink(ctx, datumStreamCh) for _, msg := range messages { if msg.Fallback { diff --git a/pkg/sourcer/server.go b/pkg/sourcer/server.go index 34708c91..247fa37a 100644 --- a/pkg/sourcer/server.go +++ b/pkg/sourcer/server.go @@ -3,10 +3,12 @@ package sourcer import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + numaflow "github.com/numaproj/numaflow-go/pkg" sourcepb "github.com/numaproj/numaflow-go/pkg/apis/proto/source/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -14,8 +16,10 @@ import ( ) type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new server object. @@ -27,11 +31,19 @@ func NewServer( for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.Source = source - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + Source: source, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the gRPC server via unix domain socket at shared.address and return error. @@ -49,14 +61,32 @@ func (s *server) Start(ctx context.Context) error { // close the listener defer func() { _ = lis.Close() }() + // create a grpc server - grpcServer := shared.CreateGRPCServer(s.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + s.grpcServer = shared.CreateGRPCServer(s.opts.maxMessageSize) // register the source service - sourcepb.RegisterSourceServer(grpcServer, s.svc) + sourcepb.RegisterSourceServer(s.grpcServer, s.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-s.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(s.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := s.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/sourcer/service.go b/pkg/sourcer/service.go index 185c0e8f..e4e5426d 100644 --- a/pkg/sourcer/service.go +++ b/pkg/sourcer/service.go @@ -20,7 +20,8 @@ const ( // Service implements the proto gen server interface type Service struct { sourcepb.UnimplementedSourceServer - Source Sourcer + Source Sourcer + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -30,6 +31,13 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*sourcepb.ReadyResp // PendingFn returns the number of pending messages. func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PendingResponse, error) { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() + return &sourcepb.PendingResponse{Result: &sourcepb.PendingResponse_Result{ Count: fs.Source.Pending(ctx), }}, nil @@ -61,6 +69,12 @@ func (fs *Service) ReadFn(d *sourcepb.ReadRequest, stream sourcepb.Source_ReadFn // Start the read in a goroutine go func() { defer close(messageCh) + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() fs.Source.Read(ctx, &request, messageCh) }() @@ -100,6 +114,13 @@ func (a *ackRequest) Offsets() []Offset { // AckFn applies a function to each datum element. func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb.AckResponse, error) { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() + offsets := make([]Offset, len(d.Request.GetOffsets())) for i, offset := range d.Request.GetOffsets() { offsets[i] = NewOffset(offset.GetOffset(), offset.GetPartitionId()) @@ -115,6 +136,13 @@ func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb } func (fs *Service) PartitionsFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.PartitionsResponse, error) { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() + partitions := fs.Source.Partitions(ctx) return &sourcepb.PartitionsResponse{ Result: &sourcepb.PartitionsResponse_Result{ diff --git a/pkg/sourcetransformer/server.go b/pkg/sourcetransformer/server.go index 0797e1ec..7f22989a 100644 --- a/pkg/sourcetransformer/server.go +++ b/pkg/sourcetransformer/server.go @@ -3,10 +3,12 @@ package sourcetransformer import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/sourcetransform/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -14,8 +16,10 @@ import ( ) type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new SourceTransformer server. @@ -24,11 +28,19 @@ func NewServer(m SourceTransformer, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.Transformer = m - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + Transformer: m, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the SourceTransformer server. @@ -46,13 +58,30 @@ func (m *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) // register the source transformer service - v1.RegisterSourceTransformServer(grpcServer, m.svc) + v1.RegisterSourceTransformServer(m.grpcServer, m.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-m.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(m.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := m.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 004e13dc..1602a50a 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -21,6 +21,7 @@ const ( type Service struct { v1.UnimplementedSourceTransformServer Transformer SourceTransformer + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -32,6 +33,12 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*v1.ReadyResponse, // In addition to map function, SourceTransformFn also supports assigning a new event time to response. // SourceTransformFn can be used only at source vertex by source data transformer. func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformRequest) (*v1.SourceTransformResponse, error) { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() var hd = NewHandlerDatum(d.GetValue(), d.EventTime.AsTime(), d.Watermark.AsTime(), d.Headers) messageTs := fs.Transformer.Transform(ctx, d.GetKeys(), hd) var results []*v1.SourceTransformResponse_Result