From 8ff0657809c467cb3ac74a7b1fcab9ba1fbe689a Mon Sep 17 00:00:00 2001 From: Sreekanth Date: Mon, 30 Sep 2024 17:53:19 +0530 Subject: [PATCH] Handle transformer panic Signed-off-by: Sreekanth --- pkg/sourcetransformer/service.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/sourcetransformer/service.go b/pkg/sourcetransformer/service.go index 86d945ea..2b749012 100644 --- a/pkg/sourcetransformer/service.go +++ b/pkg/sourcetransformer/service.go @@ -73,7 +73,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn defer cancel() grp, grpCtx := errgroup.WithContext(ctx) - senderCh := make(chan *v1.SourceTransformResponse, 500) // TODO: identify the right buffer size + senderCh := make(chan *v1.SourceTransformResponse, 500) // FIXME: identify the right buffer size // goroutine to send the response to the stream grp.Go(func() error { for { @@ -97,12 +97,16 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn } return err } - if d.Handshake != nil { - return fmt.Errorf("expected source transform messages, received handshake message") - } req := d.Request grp.Go(func() error { + defer func() { + if r := recover(); r != nil { + log.Printf("panic inside source handler: %v %v", r, string(debug.Stack())) + cancel() + fs.shutdownCh <- struct{}{} + } + }() var hd = NewHandlerDatum(req.GetValue(), req.EventTime.AsTime(), req.Watermark.AsTime(), req.Headers) messageTs := fs.Transformer.Transform(grpCtx, req.GetKeys(), hd) var results []*v1.SourceTransformResponse_Result