Skip to content

Commit

Permalink
Handle transformer panic
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Sep 30, 2024
1 parent e2b7df6 commit 8ff0657
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions pkg/sourcetransformer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
defer cancel()
grp, grpCtx := errgroup.WithContext(ctx)

senderCh := make(chan *v1.SourceTransformResponse, 500) // TODO: identify the right buffer size
senderCh := make(chan *v1.SourceTransformResponse, 500) // FIXME: identify the right buffer size
// goroutine to send the response to the stream
grp.Go(func() error {
for {
Expand All @@ -97,12 +97,16 @@ func (fs *Service) SourceTransformFn(stream v1.SourceTransform_SourceTransformFn
}
return err
}
if d.Handshake != nil {
return fmt.Errorf("expected source transform messages, received handshake message")
}

req := d.Request
grp.Go(func() error {
defer func() {
if r := recover(); r != nil {
log.Printf("panic inside source handler: %v %v", r, string(debug.Stack()))
cancel()
fs.shutdownCh <- struct{}{}
}
}()
var hd = NewHandlerDatum(req.GetValue(), req.EventTime.AsTime(), req.Watermark.AsTime(), req.Headers)
messageTs := fs.Transformer.Transform(grpCtx, req.GetKeys(), hd)
var results []*v1.SourceTransformResponse_Result
Expand Down

0 comments on commit 8ff0657

Please sign in to comment.