Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: shutdown when we see non retryable udf errors #2204

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/forwarder/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (gw GoWhere) WhereTo(ks []string, ts []string, id string) ([]VertexBuffer,

// StarterStopper starts/stops the forwarding.
type StarterStopper interface {
Start() <-chan struct{}
Start() <-chan error
Stop()
ForceStop()
}
41 changes: 26 additions & 15 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func NewDataForward(
}

// Start starts reading the buffer and forwards to sinker. Call `Stop` to stop.
func (df *DataForward) Start() <-chan struct{} {
func (df *DataForward) Start() <-chan error {
log := logging.FromContext(df.ctx)
stopped := make(chan struct{})
stopped := make(chan error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce another channel dedicated for error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a offline discussion, we decided to go with a blocking call implementation for start which returns an error similar to http server. We will make this change after 1.4.

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -137,7 +137,11 @@ func (df *DataForward) Start() <-chan struct{} {
// shutdown the fromBufferPartition should be empty.
}
// keep doing what you are good at
df.forwardAChunk(df.ctx)
if err := df.forwardAChunk(df.ctx); err != nil {
log.Errorw("Failed to forward a chunk", zap.Error(err))
stopped <- err
return
}
}
}()

Expand Down Expand Up @@ -176,7 +180,7 @@ func (df *DataForward) Start() <-chan struct{} {
// for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack
// the message after forwarding, barring any platform errors. The platform errors include buffer-full,
// buffer-not-reachable, etc., but does not include errors due to WhereTo, etc.
func (df *DataForward) forwardAChunk(ctx context.Context) {
func (df *DataForward) forwardAChunk(ctx context.Context) error {
start := time.Now()
totalBytes := 0
dataBytes := 0
Expand Down Expand Up @@ -207,12 +211,12 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
zap.Int64("offset", processorWMB.Offset),
zap.Int64("watermark", processorWMB.Watermark),
zap.Bool("idle", processorWMB.Idle))
return
return nil
}

// if the validation passed, we will publish the watermark to all the toBuffer partitions.
idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
return
return nil
}

var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages))
Expand Down Expand Up @@ -266,7 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("failed to write to sink", zap.Error(err))
df.fromBufferPartition.NoAck(ctx, readOffsets)
return
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we want to restart the container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non retryable error, so we will have to restart.

}

// Only when fallback is configured, it is possible to return fallbackMessages. If there's any, write to the fallback sink.
Expand All @@ -277,7 +281,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
_, _, err = df.writeToSink(ctx, df.opts.fbSinkWriter, fallbackMessages, true)
if err != nil {
df.opts.logger.Errorw("Failed to write to fallback sink", zap.Error(err))
return
df.fromBufferPartition.NoAck(ctx, readOffsets)
return err
}
}

Expand All @@ -300,7 +305,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err))
metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
return
return nil
}
metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))

Expand All @@ -311,6 +316,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
}
// ProcessingTimes of the entire forwardAChunk
metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds()))
return nil
}

// ackFromBuffer acknowledges an array of offsets back to fromBufferPartition and is a blocking call or until shutdown has been initiated.
Expand Down Expand Up @@ -390,20 +396,26 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
_writeOffsets, errs := sinkWriter.Write(ctx, messagesToTry)
for idx, msg := range messagesToTry {
if err = errs[idx]; err != nil {
var udsinkErr = new(udsink.ApplyUDSinkErr)
if errors.As(err, &udsinkErr) {
if udsinkErr.IsInternalErr() {
return false, err
}
}
// if we are asked to write to fallback sink, check if the fallback sink is configured,
// and we are not already in the fallback sink write path.
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
fallbackMessages = append(fallbackMessages, msg)
continue
}

// if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink")
}

// if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink
if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
if errors.Is(err, udsink.WriteToFallbackErr) && isFbSinkWriter {
df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink")
}

Expand Down Expand Up @@ -444,9 +456,8 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
}
return true, nil
})
// If we exited out of the loop and it was due to a forced shutdown we should exit
// TODO(Retry-Sink): Check for ctx done separately? That should be covered in shutdown
if ok, _ := df.IsShuttingDown(); err != nil && ok {

if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any logic change for the operation (handlePostRetryFailures) below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change is required for that function

return nil, nil, err
}
// Check what actions are required once the writing loop is completed
Expand Down
27 changes: 13 additions & 14 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,20 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
defer finalWg.Done()
log.Infow("Start processing sink messages ", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("fromPartition ", fromBufferPartitionName))
stopped := sinkForwarder.Start()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
<-stopped
log.Info("Sink forwarder stopped, exiting sink processor...")
return
select {
case <-ctx.Done(): // context cancelled case
log.Info("Context cancelled, stopping forwarder for partition...", zap.String("partition", fromBufferPartitionName))
sinkForwarder.Stop()
if err := <-stopped; err != nil {
log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err))
}
}()
<-ctx.Done()
log.Infow("SIGTERM exiting inside partition...", zap.String("fromPartition", fromBufferPartitionName))
sinkForwarder.Stop()
wg.Wait()
log.Infow("Exited for partition...", zap.String("fromPartition", fromBufferPartitionName))
log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName))
case err := <-stopped: // critical error case
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this trigger a successful container restart? do we need to trigger a crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will trigger a successful container restart

log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err))
cancel()
}
}
}(df, readers[index].GetName())
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
)

var (
WriteToFallbackErr = ApplyUDSinkErr{
WriteToFallbackErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "write to fallback sink",
}

UnknownUDSinkErr = ApplyUDSinkErr{
UnknownUDSinkErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "unknown error in udsink",
}
NotFoundErr = ApplyUDSinkErr{
NotFoundErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "not found in response",
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
}
for i, m := range requests {
if r, existing := resMap[m.Request.GetId()]; !existing {
errs[i] = &NotFoundErr
errs[i] = NotFoundErr
} else {
if r.GetStatus() == sinkpb.Status_FAILURE {
if r.GetErrMsg() != "" {
Expand All @@ -123,10 +123,10 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
Message: r.GetErrMsg(),
}
} else {
errs[i] = &UnknownUDSinkErr
errs[i] = UnknownUDSinkErr
}
} else if r.GetStatus() == sinkpb.Status_FALLBACK {
errs[i] = &WriteToFallbackErr
errs[i] = WriteToFallbackErr
} else {
errs[i] = nil
}
Expand Down
24 changes: 14 additions & 10 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func NewDataForward(
}

// Start starts reading from source and forwards to the next buffers. Call `Stop` to stop.
func (df *DataForward) Start() <-chan struct{} {
func (df *DataForward) Start() <-chan error {
log := logging.FromContext(df.ctx)
stopped := make(chan struct{})
stopped := make(chan error)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -145,7 +145,10 @@ func (df *DataForward) Start() <-chan struct{} {
// shutdown the reader should be empty.
}
// keep doing what you are good at
df.forwardAChunk(df.ctx)
if err := df.forwardAChunk(df.ctx); err != nil {
stopped <- err
return
}
}
}()

Expand Down Expand Up @@ -188,7 +191,7 @@ func (df *DataForward) Start() <-chan struct{} {
// for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack
// the message after forwarding, barring any platform errors. The platform errors include buffer-full,
// buffer-not-reachable, etc., but do not include errors due to user code transformer, WhereTo, etc.
func (df *DataForward) forwardAChunk(ctx context.Context) {
func (df *DataForward) forwardAChunk(ctx context.Context) error {
start := time.Now()
totalBytes := 0
// There is a chance that we have read the message and the container got forcefully terminated before processing. To provide
Expand All @@ -210,7 +213,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if len(readMessages) == 0 {
// not idling, so nothing much to do
if !df.srcIdleHandler.IsSourceIdling() {
return
return nil
}

// if the source is idling, we will publish idle watermark to the source and all the toBuffers
Expand Down Expand Up @@ -243,7 +246,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
}

// len(readMessages) == 0, so we do not have anything more to do
return
return nil
}

// reset the idle handler because we have read messages
Expand Down Expand Up @@ -314,7 +317,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
readWriteMessagePairs, err = df.applyTransformer(ctx, readMessages)
if err != nil {
df.opts.logger.Errorw("failed to apply source transformer", zap.Error(err))
return
return err
}

df.opts.logger.Debugw("concurrent applyTransformer completed",
Expand Down Expand Up @@ -381,7 +384,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
for _, message := range m.WriteMessages {
if err = df.whereToStep(message, messageToStep); err != nil {
df.opts.logger.Errorw("failed in whereToStep", zap.Error(err))
return
return err
}
}
// get the list of source partitions for which we have read messages, we will use this to publish watermarks to toVertices
Expand All @@ -392,7 +395,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
writeOffsets, err = df.writeToBuffers(ctx, messageToStep)
if err != nil {
df.opts.logger.Errorw("failed to write to toBuffers", zap.Error(err))
return
return err
}

// activeWatermarkBuffers records the buffers that the publisher has published
Expand Down Expand Up @@ -464,7 +467,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
metrics.LabelPartitionName: df.reader.GetName(),
}).Add(float64(len(readOffsets)))

return
return err
}
metrics.AckMessagesCount.With(map[string]string{
metrics.LabelVertex: df.vertexName,
Expand All @@ -487,6 +490,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
metrics.LabelVertexType: string(dfv1.VertexTypeSource),
metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)),
}).Observe(float64(time.Since(start).Microseconds()))
return nil
}

func (df *DataForward) ackFromSource(ctx context.Context, offsets []isb.Offset) error {
Expand Down
35 changes: 17 additions & 18 deletions pkg/sources/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"os"
"strconv"
"sync"
"time"

"go.uber.org/zap"
Expand Down Expand Up @@ -295,30 +294,30 @@ func (sp *SourceProcessor) Start(ctx context.Context) error {
return fmt.Errorf("failed to create source forwarder, error: %w", err)
}

log.Infow("Start processing source messages", zap.String("isbs", string(sp.ISBSvcType)), zap.Any("to", sp.VertexInstance.Vertex.GetToBuffers()))
stopped := sourceForwarder.Start()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
<-stopped
log.Info("Source forwarder stopped, exiting...")
return
}
}()

metricsOpts := metrics.NewMetricsOptions(ctx, sp.VertexInstance.Vertex, healthCheckers, []isb.LagReader{sourceReader})
ms := metrics.NewMetricsServer(sp.VertexInstance.Vertex, metricsOpts...)
if shutdown, err := ms.Start(ctx); err != nil {
return fmt.Errorf("failed to start metrics server, error: %w", err)
} else {
defer func() { _ = shutdown(context.Background()) }()
}
<-ctx.Done()
log.Info("SIGTERM, exiting...")
sourceForwarder.Stop()
wg.Wait()

log.Infow("Start processing source messages", zap.String("isbs", string(sp.ISBSvcType)), zap.Any("to", sp.VertexInstance.Vertex.GetToBuffers()))
stopped := sourceForwarder.Start()
select {
case <-ctx.Done(): // context cancelled case
log.Info("Context cancelled, stopping forwarder for partition...")
sourceForwarder.Stop()
if err := <-stopped; err != nil {
log.Errorw("Source forwarder stopped with error", zap.Error(err))
}
log.Info("Exited source forwarder...")
case err := <-stopped: // critical error case
if err != nil {
log.Errorw("Source forwarder stopped with error", zap.Error(err))
cancel()
}
}

// close all the sourceReader wm stores
for _, wmStore := range sourceWmStores {
Expand Down
Loading
Loading