Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: handle panic and do graceful shutdown of the server #138

Merged
merged 3 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package batchmapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
batchmappb "github.com/numaproj/numaflow-go/pkg/apis/proto/batchmap/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -15,8 +17,10 @@ import (

// server is a map gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
opts *options
shutdownCh <-chan struct{}
}

// NewServer creates a new batch map server.
Expand All @@ -25,11 +29,19 @@ func NewServer(m BatchMapper, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.BatchMapper = m
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
BatchMapper: m,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the batch map server.
Expand All @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the batch map service
batchmappb.RegisterBatchMapServer(grpcServer, m.svc)
batchmappb.RegisterBatchMapServer(m.grpcServer, m.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
7 changes: 7 additions & 0 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
type Service struct {
batchmappb.UnimplementedBatchMapServer
BatchMapper BatchMapper
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -51,6 +52,12 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error

// go routine to invoke the user handler function, and process the responses.
g.Go(func() error {
// handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{}
}
}()
// Apply the user BatchMap implementation function
responses := fs.BatchMapper.BatchMap(ctx, datumStreamCh)

Expand Down
55 changes: 42 additions & 13 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package mapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"

"google.golang.org/grpc"

"github.com/numaproj/numaflow-go/pkg"
mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
"github.com/numaproj/numaflow-go/pkg/info"
Expand All @@ -15,8 +17,10 @@ import (

// server is a map gRPC server.
type server struct {
svc *Service
opts *options
grpcServer *grpc.Server
svc *Service
shutdownCh <-chan struct{}
opts *options
}

// NewServer creates a new map server.
Expand All @@ -25,11 +29,19 @@ func NewServer(m Mapper, inputOptions ...Option) numaflow.Server {
for _, inputOption := range inputOptions {
inputOption(opts)
}
s := new(server)
s.svc = new(Service)
s.svc.Mapper = m
s.opts = opts
return s
shutdownCh := make(chan struct{})

// create a new service and server
svc := &Service{
Mapper: m,
shutdownCh: shutdownCh,
}

return &server{
svc: svc,
shutdownCh: shutdownCh,
opts: opts,
}
}

// Start starts the map server.
Expand All @@ -51,13 +63,30 @@ func (m *server) Start(ctx context.Context) error {
defer func() { _ = lis.Close() }()

// create a grpc server
grpcServer := shared.CreateGRPCServer(m.opts.maxMessageSize)
defer log.Println("Successfully stopped the gRPC server")
defer grpcServer.GracefulStop()
m.grpcServer = shared.CreateGRPCServer(m.opts.maxMessageSize)

// register the map service
mappb.RegisterMapServer(grpcServer, m.svc)
mappb.RegisterMapServer(m.grpcServer, m.svc)

// start a go routine to stop the server gracefully when the context is done
// or a shutdown signal is received from the service
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-m.shutdownCh:
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
}()

// start the grpc server
return shared.StartGRPCServer(ctxWithSignal, grpcServer, lis)
if err := m.grpcServer.Serve(lis); err != nil {
return fmt.Errorf("failed to start the gRPC server: %v", err)
}

// wait for the graceful shutdown to complete
wg.Wait()
return nil
}
86 changes: 85 additions & 1 deletion pkg/mapper/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,15 @@ package mapper
import (
"context"
"os"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
)

func TestMapServer_Start(t *testing.T) {
Expand All @@ -25,8 +30,87 @@ func TestMapServer_Start(t *testing.T) {
return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"}))
})
// note: using actual uds connection
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(serverInfoFile.Name())).Start(ctx)
assert.NoError(t, err)
}

// tests the case where the server is shutdown gracefully when a panic occurs in the map handler
func TestMapServer_GracefulShutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

dir := t.TempDir()
socketFile, _ := os.Create(dir + "/test.sock")
defer func() {
_ = os.RemoveAll(socketFile.Name())
}()

serverInfoFile, _ := os.Create(dir + "/numaflow-test-info")
defer func() {
_ = os.RemoveAll(serverInfoFile.Name())
}()

var mapHandler = MapperFunc(func(ctx context.Context, keys []string, d Datum) Messages {
msg := d.Value()
if keys[0] == "key2" {
time.Sleep(20 * time.Millisecond)
panic("panic test")
}
time.Sleep(50 * time.Millisecond)
return MessagesBuilder().Append(NewMessage(msg).WithKeys([]string{keys[0] + "_test"}))
})

done := make(chan struct{})
go func() {
err := NewServer(mapHandler, WithSockAddr(socketFile.Name()), WithServerInfoFilePath(socketFile.Name())).Start(ctx)
assert.NoError(t, err)
close(done)
}()

// wait for the server to start
time.Sleep(10 * time.Millisecond)

// create a client
conn, err := grpc.Dial(
"unix://"+socketFile.Name(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("Failed to dial server: %v", err)
}
defer conn.Close()

client := mappb.NewMapClient(conn)
// send two map requests with key1 and key2 as keys simultaneously
keys := []string{"key1", "key2"}
var wg sync.WaitGroup
for _, key := range keys {
wg.Add(1)
go func(key string) {
defer wg.Done()
req := &mappb.MapRequest{
Keys: []string{key},
}

resp, err := client.MapFn(ctx, req)
// since there is a panic in the map handler for key2, we should get an error
// other requests should be successful
if key == "key2" {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.NotNil(t, resp)
}
}(key)
}

wg.Wait()
// wait for the server to shutdown gracefully because of the panic
select {
case <-ctx.Done():
t.Fatal("server did not shutdown gracefully")
case <-done:
}
}
20 changes: 16 additions & 4 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mapper
import (
"context"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"

mappb "github.com/numaproj/numaflow-go/pkg/apis/proto/map/v1"
Expand All @@ -19,7 +21,8 @@ const (
// handler.
type Service struct {
mappb.UnimplementedMapServer
Mapper Mapper
Mapper Mapper
shutdownCh chan<- struct{}
}

// IsReady returns true to indicate the gRPC connection is ready.
Expand All @@ -28,10 +31,19 @@ func (fs *Service) IsReady(context.Context, *emptypb.Empty) (*mappb.ReadyRespons
}

// MapFn applies a user defined function to each request element and returns a list of results.
func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapResponse, error) {
func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.MapResponse, err error) {
var hd = NewHandlerDatum(d.GetValue(), d.GetEventTime().AsTime(), d.GetWatermark().AsTime(), d.GetHeaders())
messages := fs.Mapper.Map(ctx, d.GetKeys(), hd)
var elements []*mappb.MapResponse_Result

// Use defer and recover to handle panic
defer func() {
if r := recover(); r != nil {
fs.shutdownCh <- struct{}{} // Send shutdown signal
err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r)
}
}()

messages := fs.Mapper.Map(ctx, d.GetKeys(), hd)
for _, m := range messages.Items() {
elements = append(elements, &mappb.MapResponse_Result{
Keys: m.Keys(),
Expand All @@ -42,5 +54,5 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (*mappb.MapRe
datumList := &mappb.MapResponse{
Results: elements,
}
return datumList, nil
return datumList, err
}
Loading
Loading