Skip to content

Commit

Permalink
chore: log panics inside udf (#145)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Aug 29, 2024
1 parent 4b3a63f commit d82fc27
Show file tree
Hide file tree
Showing 21 changed files with 57 additions and 1 deletion.
2 changes: 2 additions & 0 deletions pkg/batchmapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package batchmapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -76,6 +77,7 @@ func (m *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-m.shutdownCh:
log.Printf("received shutdown signal")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
Expand Down
2 changes: 2 additions & 0 deletions pkg/batchmapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"runtime/debug"

"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -55,6 +56,7 @@ func (fs *Service) BatchMapFn(stream batchmappb.BatchMap_BatchMapFnServer) error
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapper/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mapper
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -76,6 +77,7 @@ func (m *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-m.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/mapper/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mapper

import (
"context"
"log"
"runtime/debug"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -38,6 +40,7 @@ func (fs *Service) MapFn(ctx context.Context, d *mappb.MapRequest) (_ *mappb.Map
// Use defer and recover to handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside map handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{} // Send shutdown signal
err = status.Errorf(codes.Internal, "panic occurred in Mapper.Map: %v", r)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mapstreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mapstreamer
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -76,6 +77,7 @@ func (m *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-m.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/mapstreamer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mapstreamer

import (
"context"
"log"
"runtime/debug"

"google.golang.org/protobuf/types/known/emptypb"

Expand Down Expand Up @@ -39,6 +41,7 @@ func (fs *Service) MapStreamFn(d *mapstreampb.MapStreamRequest, stream mapstream
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside mapStream handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down
3 changes: 2 additions & 1 deletion pkg/reducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reducer
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -60,7 +61,6 @@ func (r *server) Start(ctx context.Context) error {

// create a grpc server
r.grpcServer = shared.CreateGRPCServer(r.opts.maxMessageSize)
defer r.grpcServer.GracefulStop()

// register the reduce service
reducepb.RegisterReduceServer(r.grpcServer, r.svc)
Expand All @@ -72,6 +72,7 @@ func (r *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-r.shutdownCh:
log.Printf("received shutdown signal")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(r.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/reducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package reducer
import (
"context"
"fmt"
"log"
"runtime/debug"
"strings"

v1 "github.com/numaproj/numaflow-go/pkg/apis/proto/reduce/v1"
Expand Down Expand Up @@ -89,6 +91,7 @@ func (rtm *reduceTaskManager) CreateTask(ctx context.Context, request *v1.Reduce
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack()))
rtm.shutdownCh <- struct{}{}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/reducestreamer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package reducestreamer
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (r *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-r.shutdownCh:
log.Printf("received shutdown signal")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(r.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/reducestreamer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package reducestreamer
import (
"context"
"fmt"
"log"
"runtime/debug"
"strings"
"sync"

Expand Down Expand Up @@ -102,6 +104,7 @@ func (rtm *reduceStreamTaskManager) CreateTask(ctx context.Context, request *v1.
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside reduce handler: %v %v", r, string(debug.Stack()))
rtm.shutdownCh <- struct{}{}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sessionreducer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sessionreducer
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (r *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-r.shutdownCh:
log.Printf("received shutdown signal")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(r.grpcServer)
Expand Down
4 changes: 4 additions & 0 deletions pkg/sessionreducer/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sessionreducer
import (
"context"
"fmt"
"log"
"runtime/debug"
"strings"
"sync"

Expand Down Expand Up @@ -134,6 +136,7 @@ func (rtm *sessionReduceTaskManager) CreateTask(ctx context.Context, request *v1
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside session reduce handler: %v %v", r, string(debug.Stack()))
rtm.shutdownCh <- struct{}{}
}
}()
Expand Down Expand Up @@ -207,6 +210,7 @@ func (rtm *sessionReduceTaskManager) MergeTasks(ctx context.Context, request *v1
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside session reduce handler: %v %v", r, string(debug.Stack()))
rtm.shutdownCh <- struct{}{}
}
}()
Expand Down
4 changes: 4 additions & 0 deletions pkg/shared/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package shared

import (
"fmt"
"log"
"net"
"os"
"time"
Expand Down Expand Up @@ -54,15 +55,18 @@ func StopGRPCServer(grpcServer *grpc.Server) {
// if it is not stopped, stop it forcefully
stopped := make(chan struct{})
go func() {
log.Printf("gracefully stopping grpc server")
grpcServer.GracefulStop()
close(stopped)
}()

t := time.NewTimer(30 * time.Second)
select {
case <-t.C:
log.Printf("forcefully stopping grpc server")
grpcServer.Stop()
case <-stopped:
t.Stop()
}
log.Printf("grpc server stopped")
}
2 changes: 2 additions & 0 deletions pkg/sideinput/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sideinput
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -71,6 +72,7 @@ func (s *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-s.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(s.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sideinput/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sideinput

import (
"context"
"log"
"runtime/debug"

"google.golang.org/protobuf/types/known/emptypb"

Expand Down Expand Up @@ -33,6 +35,7 @@ func (fs *Service) RetrieveSideInput(ctx context.Context, _ *emptypb.Empty) (*si
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sideinput handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sinker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sinker
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -74,6 +75,7 @@ func (s *sinkServer) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-s.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(s.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sinker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package sinker
import (
"context"
"io"
"log"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -83,6 +85,7 @@ func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error {
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sink handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sourcer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sourcer
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -76,6 +77,7 @@ func (s *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-s.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(s.grpcServer)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sourcer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sourcer

import (
"context"
"log"
"runtime/debug"
"time"

"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -34,6 +36,7 @@ func (fs *Service) PendingFn(ctx context.Context, _ *emptypb.Empty) (*sourcepb.P
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sourcer handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down Expand Up @@ -72,6 +75,7 @@ func (fs *Service) ReadFn(d *sourcepb.ReadRequest, stream sourcepb.Source_ReadFn
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down Expand Up @@ -117,6 +121,7 @@ func (fs *Service) AckFn(ctx context.Context, d *sourcepb.AckRequest) (*sourcepb
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand All @@ -139,6 +144,7 @@ func (fs *Service) PartitionsFn(ctx context.Context, _ *emptypb.Empty) (*sourcep
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down
2 changes: 2 additions & 0 deletions pkg/sourcetransformer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sourcetransformer
import (
"context"
"fmt"
"log"
"os/signal"
"sync"
"syscall"
Expand Down Expand Up @@ -71,6 +72,7 @@ func (m *server) Start(ctx context.Context) error {
defer wg.Done()
select {
case <-m.shutdownCh:
log.Printf("shutdown signal received")
case <-ctxWithSignal.Done():
}
shared.StopGRPCServer(m.grpcServer)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package sourcetransformer

import (
"context"
"log"
"runtime/debug"

"google.golang.org/protobuf/types/known/emptypb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -36,6 +38,7 @@ func (fs *Service) SourceTransformFn(ctx context.Context, d *v1.SourceTransformR
// handle panic
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside sourcetransform handler: %v %v", r, string(debug.Stack()))
fs.shutdownCh <- struct{}{}
}
}()
Expand Down

0 comments on commit d82fc27

Please sign in to comment.