Skip to content

Commit

Permalink
chore: handle panic and do graceful shutdown of the server (#138)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Jul 26, 2024
1 parent a2bc6b1 commit 7a3bad8
Show file tree
Hide file tree
Showing 25 changed files with 661 additions and 172 deletions.
55 changes: 42 additions & 13 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package batchmapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -15,8 +17,10 @@ import (

// server is a map gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
opts *options
shutdownCh <-chan struct{}
}

// NewServer creates a new batch map server.
Expand All @@ -25,11 +29,19 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.BatchMapper = m
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
BatchMapper: m,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the batch map server.
Expand All @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the batch map service
batchmappb.RegisterBatchMapServer(grpcServer, m.svc)
batchmappb.RegisterBatchMapServer(m.grpcServer, m.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
7 changes: 7 additions & 0 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
type Service struct {
batchmappb.UnimplementedBatchMapServer
BatchMapper BatchMapper
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -51,6 +52,12 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error

// go routine to invoke the user handler function, and process the responses.
g.Go(func() error {
// handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{}
}
}()
// Apply the user BatchMap implementation function
responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh)

Expand Down
55 changes: 42 additions & 13 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package mapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -15,8 +17,10 @@ import (

// server is a map gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
shutdownCh <-chan struct{}
opts *options
}

// NewServer creates a new map server.
Expand All @@ -25,11 +29,19 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.Mapper = m
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
Mapper: m,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the map server.
Expand All @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the map service
mappb.RegisterMapServer(grpcServer, m.svc)
mappb.RegisterMapServer(m.grpcServer, m.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
86 changes: 85 additions & 1 deletion pkg/mapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package mapper
import (
"context"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
)

func TestMapServer_Start(t *testing.T) {
Expand All @@ -25,8 +30,87 @@ func TestMapServer_Start(t *testing.T) {
return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"}))
})
// note: using actual uds connection
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}

// tests the case where the server is shutdown gracefully when a panic occurs in the map handler
func TestMapServer_GracefulShutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

dir := t.TempDir()
socketFile, _ := os.Create(dir + "/test.sock")
defer func() {
_ = os.RemoveAll(socketFile.Name())
}()

serverInfoFile, _ := os.Create(dir + "/numaflow-test-info")
defer func() {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var mapHandler = MapperFunc(func(ctx context.Context, keys []string, d Datum) Messages {
msg := d.Value()
if keys[0] == "key2" {
time.Sleep(20 * time.Millisecond)
panic("panic test")
}
time.Sleep(50 * time.Millisecond)
return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"}))
})

done := make(chan struct{})
go func() {
err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(socketFile.Name())).Start(ctx)
assert.NoError(t, err)
close(done)
}()

// wait for the server to start
time.Sleep(10 * time.Millisecond)

// create a client
conn, err := grpc.Dial(
"unix://"+socketFile.Name(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("Failed to dial server: %v", err)
}
defer conn.Close()

client := mappb.NewMapClient(conn)
// send two map requests with key1 and key2 as keys simultaneously
keys := []string{"key1", "key2"}
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key string) {
defer wg.Done()
req := &mappb.MapRequest{
Keys: []string{key},
}

resp, err := client.MapFn(ctx, req)
// since there is a panic in the map handler for key2, we should get an error
// other requests should be successful
if key == "key2" {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, resp)
}
}(key)
}

wg.Wait()
// wait for the server to shutdown gracefully because of the panic
select {
case <-ctx.Done():
t.Fatal("server did not shutdown gracefully")
case <-done:
}
}
20 changes: 16 additions & 4 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mapper
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -19,7 +21,8 @@ const (
// handler.
type Service struct {
mappb.UnimplementedMapServer
Mapper Mapper
Mapper Mapper
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -28,10 +31,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
}

// MapFn applies a user defined function to each request element and returns a list of results.
func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapResponse, error) {
func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.MapResponse, err error) {
var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders())
messages := fs.Mapper.Map(ctx, d.GetKeys(), hd)
var elements []*mappb.MapResponse_Result

// Use defer and recover to handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{} // Send shutdown signal
err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r)
}
}()

messages := fs.Mapper.Map(ctx, d.GetKeys(), hd)
for _, m := range messages.Items() {
elements = append(elements, &mappb.MapResponse_Result{
Keys: m.Keys(),
Expand All @@ -42,5 +54,5 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe
datumList := &mappb.MapResponse{
Results: elements,
}
return datumList, nil
return datumList, err
}
Loading

0 comments on commit 7a3bad8

Please sign in to comment.