-
Notifications
You must be signed in to change notification settings - Fork 124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: shutdown when we see non retryable udf errors #2204
Changes from 5 commits
86e059d
78302cf
3013927
910b506
4124ec4
017cde3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -111,9 +111,9 @@ func NewDataForward( | |
} | ||
|
||
// Start starts reading the buffer and forwards to sinker. Call `Stop` to stop. | ||
func (df *DataForward) Start() <-chan struct{} { | ||
func (df *DataForward) Start() <-chan error { | ||
log := logging.FromContext(df.ctx) | ||
stopped := make(chan struct{}) | ||
stopped := make(chan error) | ||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
|
@@ -137,7 +137,11 @@ func (df *DataForward) Start() <-chan struct{} { | |
// shutdown the fromBufferPartition should be empty. | ||
} | ||
// keep doing what you are good at | ||
df.forwardAChunk(df.ctx) | ||
if err := df.forwardAChunk(df.ctx); err != nil { | ||
log.Errorw("Failed to forward a chunk", zap.Error(err)) | ||
stopped <- err | ||
return | ||
} | ||
} | ||
}() | ||
|
||
|
@@ -176,7 +180,7 @@ func (df *DataForward) Start() <-chan struct{} { | |
// for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack | ||
// the message after forwarding, barring any platform errors. The platform errors include buffer-full, | ||
// buffer-not-reachable, etc., but does not include errors due to WhereTo, etc. | ||
func (df *DataForward) forwardAChunk(ctx context.Context) { | ||
func (df *DataForward) forwardAChunk(ctx context.Context) error { | ||
start := time.Now() | ||
totalBytes := 0 | ||
dataBytes := 0 | ||
|
@@ -207,12 +211,12 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { | |
zap.Int64("offset", processorWMB.Offset), | ||
zap.Int64("watermark", processorWMB.Watermark), | ||
zap.Bool("idle", processorWMB.Idle)) | ||
return | ||
return nil | ||
} | ||
|
||
// if the validation passed, we will publish the watermark to all the toBuffer partitions. | ||
idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark))) | ||
return | ||
return nil | ||
} | ||
|
||
var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages)) | ||
|
@@ -266,7 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { | |
if err != nil { | ||
df.opts.logger.Errorw("failed to write to sink", zap.Error(err)) | ||
df.fromBufferPartition.NoAck(ctx, readOffsets) | ||
return | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case, we want to restart the container? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a non retryable error, so we will have to restart. |
||
} | ||
|
||
// Only when fallback is configured, it is possible to return fallbackMessages. If there's any, write to the fallback sink. | ||
|
@@ -277,7 +281,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { | |
_, _, err = df.writeToSink(ctx, df.opts.fbSinkWriter, fallbackMessages, true) | ||
if err != nil { | ||
df.opts.logger.Errorw("Failed to write to fallback sink", zap.Error(err)) | ||
return | ||
df.fromBufferPartition.NoAck(ctx, readOffsets) | ||
return err | ||
} | ||
} | ||
|
||
|
@@ -300,7 +305,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { | |
if err != nil { | ||
df.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err)) | ||
metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) | ||
return | ||
return nil | ||
} | ||
metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) | ||
|
||
|
@@ -311,6 +316,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { | |
} | ||
// ProcessingTimes of the entire forwardAChunk | ||
metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds())) | ||
return nil | ||
} | ||
|
||
// ackFromBuffer acknowledges an array of offsets back to fromBufferPartition and is a blocking call or until shutdown has been initiated. | ||
|
@@ -390,20 +396,26 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr | |
_writeOffsets, errs := sinkWriter.Write(ctx, messagesToTry) | ||
for idx, msg := range messagesToTry { | ||
if err = errs[idx]; err != nil { | ||
var udsinkErr = new(udsink.ApplyUDSinkErr) | ||
KeranYang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if errors.As(err, &udsinkErr) { | ||
if udsinkErr.IsInternalErr() { | ||
return false, err | ||
} | ||
} | ||
// if we are asked to write to fallback sink, check if the fallback sink is configured, | ||
// and we are not already in the fallback sink write path. | ||
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter { | ||
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter { | ||
fallbackMessages = append(fallbackMessages, msg) | ||
continue | ||
} | ||
|
||
// if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink | ||
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil { | ||
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil { | ||
df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink") | ||
} | ||
|
||
// if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink | ||
if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter { | ||
if errors.Is(err, udsink.WriteToFallbackErr) && isFbSinkWriter { | ||
df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink") | ||
} | ||
|
||
|
@@ -444,9 +456,8 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr | |
} | ||
return true, nil | ||
}) | ||
// If we exited out of the loop and it was due to a forced shutdown we should exit | ||
// TODO(Retry-Sink): Check for ctx done separately? That should be covered in shutdown | ||
if ok, _ := df.IsShuttingDown(); err != nil && ok { | ||
KeranYang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any logic change for the operation ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No change is required for that function |
||
return nil, nil, err | ||
} | ||
// Check what actions are required once the writing loop is completed | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -256,21 +256,20 @@ func (u *SinkProcessor) Start(ctx context.Context) error { | |
defer finalWg.Done() | ||
log.Infow("Start processing sink messages ", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("fromPartition ", fromBufferPartitionName)) | ||
stopped := sinkForwarder.Start() | ||
wg := &sync.WaitGroup{} | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for { | ||
<-stopped | ||
log.Info("Sink forwarder stopped, exiting sink processor...") | ||
return | ||
select { | ||
case <-ctx.Done(): // context cancelled case | ||
log.Info("Context cancelled, stopping forwarder for partition...", zap.String("partition", fromBufferPartitionName)) | ||
sinkForwarder.Stop() | ||
if err := <-stopped; err != nil { | ||
log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err)) | ||
} | ||
}() | ||
<-ctx.Done() | ||
log.Infow("SIGTERM exiting inside partition...", zap.String("fromPartition", fromBufferPartitionName)) | ||
sinkForwarder.Stop() | ||
wg.Wait() | ||
log.Infow("Exited for partition...", zap.String("fromPartition", fromBufferPartitionName)) | ||
log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName)) | ||
case err := <-stopped: // critical error case | ||
if err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this trigger a successful container restart? do we need to trigger a crash? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will trigger a successful container restart |
||
log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err)) | ||
cancel() | ||
} | ||
} | ||
}(df, readers[index].GetName()) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
/* | ||
Copyright 2022 The Numaproj Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package errors | ||
|
||
// SourceReadErr represents any source read related error | ||
type SourceReadErr struct { | ||
Message string | ||
Retryable bool | ||
} | ||
|
||
func (e *SourceReadErr) Error() string { | ||
return e.Message | ||
} | ||
|
||
func (e *SourceReadErr) Is(target error) bool { | ||
return target.Error() == e.Error() | ||
} | ||
|
||
// IsRetryable is true if the error is retryable | ||
func (e *SourceReadErr) IsRetryable() bool { | ||
return e.Retryable | ||
} | ||
|
||
type SourceAckErr struct { | ||
Message string | ||
Retryable bool | ||
} | ||
|
||
func (e *SourceAckErr) Error() string { | ||
return e.Message | ||
} | ||
|
||
func (e *SourceAckErr) Is(target error) bool { | ||
return target.Error() == e.Error() | ||
} | ||
|
||
// IsRetryable is true if the error is retryable | ||
func (e *SourceAckErr) IsRetryable() bool { | ||
return e.Retryable | ||
} | ||
|
||
type SourcePendingErr struct { | ||
Message string | ||
Retryable bool | ||
} | ||
|
||
func (e *SourcePendingErr) Error() string { | ||
return e.Message | ||
} | ||
|
||
func (e *SourcePendingErr) Is(target error) bool { | ||
return target.Error() == e.Error() | ||
} | ||
|
||
// IsRetryable is true if the error is retryable | ||
func (e *SourcePendingErr) IsRetryable() bool { | ||
return e.Retryable | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we introduce another channel dedicated for error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a offline discussion, we decided to go with a blocking call implementation for start which returns an error similar to http server. We will make this change after 1.4.