From cbd246bc727537974bffef265a236b0562e77d9b Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Wed, 24 Jul 2024 18:25:38 +0530 Subject: [PATCH] batch map Signed-off-by: Yashash H L --- pkg/batchmapper/server.go | 55 +++++++++++++++++++++++++++++--------- pkg/batchmapper/service.go | 7 +++++ 2 files changed, 49 insertions(+), 13 deletions(-) diff --git a/pkg/batchmapper/server.go b/pkg/batchmapper/server.go index 2d46ef09..61736427 100644 --- a/pkg/batchmapper/server.go +++ b/pkg/batchmapper/server.go @@ -3,10 +3,12 @@ package batchmapper import ( "context" "fmt" - "log" "os/signal" + "sync" "syscall" + "google.golang.org/grpc" + "github.com/numaproj/numaflow-go/pkg" batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1" "github.com/numaproj/numaflow-go/pkg/info" @@ -15,8 +17,10 @@ import ( // server is a map gRPC server. type server struct { - svc *Service - opts *options + grpcServer *grpc.Server + svc *Service + opts *options + shutdownCh <-chan struct{} } // NewServer creates a new batch map server. @@ -25,11 +29,19 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server { for _, inputOption := range inputOptions { inputOption(opts) } - s := new(server) - s.svc = new(Service) - s.svc.BatchMapper = m - s.opts = opts - return s + shutdownCh := make(chan struct{}) + + // create a new service and server + svc := &Service{ + BatchMapper: m, + shutdownCh: shutdownCh, + } + + return &server{ + svc: svc, + shutdownCh: shutdownCh, + opts: opts, + } } // Start starts the batch map server. @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error { defer func() { _ = lis.Close() }() // create a grpc server - grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize) - defer log.Println("Successfully stopped the gRPC server") - defer grpcServer.GracefulStop() + m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize) // register the batch map service - batchmappb.RegisterBatchMapServer(grpcServer, m.svc) + batchmappb.RegisterBatchMapServer(m.grpcServer, m.svc) + + // start a go routine to stop the server gracefully when the context is done + // or a shutdown signal is received from the service + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + select { + case <-m.shutdownCh: + case <-ctxWithSignal.Done(): + } + shared.StopGRPCServer(m.grpcServer) + }() // start the grpc server - return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis) + if err := m.grpcServer.Serve(lis); err != nil { + return fmt.Errorf("failed to start the gRPC server: %v", err) + } + + // wait for the graceful shutdown to complete + wg.Wait() + return nil } diff --git a/pkg/batchmapper/service.go b/pkg/batchmapper/service.go index 5923e4af..ba4df99d 100644 --- a/pkg/batchmapper/service.go +++ b/pkg/batchmapper/service.go @@ -27,6 +27,7 @@ const ( type Service struct { batchmappb.UnimplementedBatchMapServer BatchMapper BatchMapper + shutdownCh chan<- struct{} } // IsReady returns true to indicate the gRPC connection is ready. @@ -51,6 +52,12 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error // go routine to invoke the user handler function, and process the responses. g.Go(func() error { + // handle panic + defer func() { + if r := recover(); r != nil { + fs.shutdownCh <- struct{}{} + } + }() // Apply the user BatchMap implementation function responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh)