Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: shutdown when we see non retryable udf errors #2204

Merged
merged 6 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/forwarder/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (gw GoWhere) WhereTo(ks []string, ts []string, id string) ([]VertexBuffer,

// StarterStopper starts/stops the forwarding.
type StarterStopper interface {
Start() <-chan struct{}
// Start returns a channel that can be used to listen for errors. If
// the channel is closed without any error, it means the forwarder has stopped.
Start() <-chan error
Stop()
ForceStop()
}
41 changes: 26 additions & 15 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func NewDataForward(
}

// Start starts reading the buffer and forwards to sinker. Call `Stop` to stop.
func (df *DataForward) Start() <-chan struct{} {
func (df *DataForward) Start() <-chan error {
log := logging.FromContext(df.ctx)
stopped := make(chan struct{})
stopped := make(chan error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce another channel dedicated for error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a offline discussion, we decided to go with a blocking call implementation for start which returns an error similar to http server. We will make this change after 1.4.

var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -137,7 +137,11 @@ func (df *DataForward) Start() <-chan struct{} {
// shutdown the fromBufferPartition should be empty.
}
// keep doing what you are good at
df.forwardAChunk(df.ctx)
if err := df.forwardAChunk(df.ctx); err != nil {
log.Errorw("Failed to forward a chunk", zap.Error(err))
stopped <- err
return
}
}
}()

Expand Down Expand Up @@ -176,7 +180,7 @@ func (df *DataForward) Start() <-chan struct{} {
// for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack
// the message after forwarding, barring any platform errors. The platform errors include buffer-full,
// buffer-not-reachable, etc., but does not include errors due to WhereTo, etc.
func (df *DataForward) forwardAChunk(ctx context.Context) {
func (df *DataForward) forwardAChunk(ctx context.Context) error {
start := time.Now()
totalBytes := 0
dataBytes := 0
Expand Down Expand Up @@ -207,12 +211,12 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
zap.Int64("offset", processorWMB.Offset),
zap.Int64("watermark", processorWMB.Watermark),
zap.Bool("idle", processorWMB.Idle))
return
return nil
}

// if the validation passed, we will publish the watermark to all the toBuffer partitions.
idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
return
return nil
}

var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages))
Expand Down Expand Up @@ -266,7 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("failed to write to sink", zap.Error(err))
df.fromBufferPartition.NoAck(ctx, readOffsets)
return
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we want to restart the container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non retryable error, so we will have to restart.

}

// Only when fallback is configured, it is possible to return fallbackMessages. If there's any, write to the fallback sink.
Expand All @@ -277,7 +281,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
_, _, err = df.writeToSink(ctx, df.opts.fbSinkWriter, fallbackMessages, true)
if err != nil {
df.opts.logger.Errorw("Failed to write to fallback sink", zap.Error(err))
return
df.fromBufferPartition.NoAck(ctx, readOffsets)
return err
}
}

Expand All @@ -300,7 +305,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err))
metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
return
return nil
}
metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))

Expand All @@ -311,6 +316,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
}
// ProcessingTimes of the entire forwardAChunk
metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds()))
return nil
}

// ackFromBuffer acknowledges an array of offsets back to fromBufferPartition and is a blocking call or until shutdown has been initiated.
Expand Down Expand Up @@ -390,20 +396,26 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
_writeOffsets, errs := sinkWriter.Write(ctx, messagesToTry)
for idx, msg := range messagesToTry {
if err = errs[idx]; err != nil {
var udsinkErr = new(udsink.ApplyUDSinkErr)
if errors.As(err, &udsinkErr) {
if udsinkErr.IsInternalErr() {
return false, err
}
}
// if we are asked to write to fallback sink, check if the fallback sink is configured,
// and we are not already in the fallback sink write path.
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
fallbackMessages = append(fallbackMessages, msg)
continue
}

// if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink")
}

// if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink
if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
if errors.Is(err, udsink.WriteToFallbackErr) && isFbSinkWriter {
df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink")
}

Expand Down Expand Up @@ -444,9 +456,8 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
}
return true, nil
})
// If we exited out of the loop and it was due to a forced shutdown we should exit
// TODO(Retry-Sink): Check for ctx done separately? That should be covered in shutdown
if ok, _ := df.IsShuttingDown(); err != nil && ok {

if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any logic change for the operation (handlePostRetryFailures) below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No change is required for that function

return nil, nil, err
}
// Check what actions are required once the writing loop is completed
Expand Down
27 changes: 13 additions & 14 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,20 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
defer finalWg.Done()
log.Infow("Start processing sink messages ", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("fromPartition ", fromBufferPartitionName))
stopped := sinkForwarder.Start()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
<-stopped
log.Info("Sink forwarder stopped, exiting sink processor...")
return
select {
case <-ctx.Done(): // context cancelled case
log.Info("Context cancelled, stopping forwarder for partition...", zap.String("partition", fromBufferPartitionName))
sinkForwarder.Stop()
if err := <-stopped; err != nil {
log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err))
}
}()
<-ctx.Done()
log.Infow("SIGTERM exiting inside partition...", zap.String("fromPartition", fromBufferPartitionName))
sinkForwarder.Stop()
wg.Wait()
log.Infow("Exited for partition...", zap.String("fromPartition", fromBufferPartitionName))
log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName))
case err := <-stopped: // critical error case
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this trigger a successful container restart? do we need to trigger a crash?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will trigger a successful container restart

log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err))
cancel()
}
}
}(df, readers[index].GetName())
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
)

var (
WriteToFallbackErr = ApplyUDSinkErr{
WriteToFallbackErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "write to fallback sink",
}

UnknownUDSinkErr = ApplyUDSinkErr{
UnknownUDSinkErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "unknown error in udsink",
}
NotFoundErr = ApplyUDSinkErr{
NotFoundErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "not found in response",
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
}
for i, m := range requests {
if r, existing := resMap[m.Request.GetId()]; !existing {
errs[i] = &NotFoundErr
errs[i] = NotFoundErr
} else {
if r.GetStatus() == sinkpb.Status_FAILURE {
if r.GetErrMsg() != "" {
Expand All @@ -123,10 +123,10 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
Message: r.GetErrMsg(),
}
} else {
errs[i] = &UnknownUDSinkErr
errs[i] = UnknownUDSinkErr
}
} else if r.GetStatus() == sinkpb.Status_FALLBACK {
errs[i] = &WriteToFallbackErr
errs[i] = WriteToFallbackErr
} else {
errs[i] = nil
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/sources/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package errors

// SourceReadErr represents any source read related error
type SourceReadErr struct {
Message string
Retryable bool
}

func (e *SourceReadErr) Error() string {
return e.Message
}

func (e *SourceReadErr) Is(target error) bool {
return target.Error() == e.Error()
}

// IsRetryable is true if the error is retryable
func (e *SourceReadErr) IsRetryable() bool {
return e.Retryable
}

type SourceAckErr struct {
Message string
Retryable bool
}

func (e *SourceAckErr) Error() string {
return e.Message
}

func (e *SourceAckErr) Is(target error) bool {
return target.Error() == e.Error()
}

// IsRetryable is true if the error is retryable
func (e *SourceAckErr) IsRetryable() bool {
return e.Retryable
}

type SourcePendingErr struct {
Message string
Retryable bool
}

func (e *SourcePendingErr) Error() string {
return e.Message
}

func (e *SourcePendingErr) Is(target error) bool {
return target.Error() == e.Error()
}

// IsRetryable is true if the error is retryable
func (e *SourcePendingErr) IsRetryable() bool {
return e.Retryable
}
Loading
Loading