Skip to content

Commit

Permalink
chore: shutdown when we see non retryable udf errors (numaproj#2204)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored and SaniyaKalamkar committed Jan 19, 2025
1 parent 40d7b6f commit 4b248e2
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 143 deletions.
4 changes: 3 additions & 1 deletion pkg/forwarder/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (gw GoWhere) WhereTo(ks []string, ts []string, id string) ([]VertexBuffer,

// StarterStopper starts/stops the forwarding.
type StarterStopper interface {
Start() <-chan struct{}
// Start returns a channel that can be used to listen for errors. If
// the channel is closed without any error, it means the forwarder has stopped.
Start() <-chan error
Stop()
ForceStop()
}
41 changes: 26 additions & 15 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ func NewDataForward(
}

// Start starts reading the buffer and forwards to sinker. Call `Stop` to stop.
func (df *DataForward) Start() <-chan struct{} {
func (df *DataForward) Start() <-chan error {
log := logging.FromContext(df.ctx)
stopped := make(chan struct{})
stopped := make(chan error)
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -137,7 +137,11 @@ func (df *DataForward) Start() <-chan struct{} {
// shutdown the fromBufferPartition should be empty.
}
// keep doing what you are good at
df.forwardAChunk(df.ctx)
if err := df.forwardAChunk(df.ctx); err != nil {
log.Errorw("Failed to forward a chunk", zap.Error(err))
stopped <- err
return
}
}
}()

Expand Down Expand Up @@ -176,7 +180,7 @@ func (df *DataForward) Start() <-chan struct{} {
// for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack
// the message after forwarding, barring any platform errors. The platform errors include buffer-full,
// buffer-not-reachable, etc., but does not include errors due to WhereTo, etc.
func (df *DataForward) forwardAChunk(ctx context.Context) {
func (df *DataForward) forwardAChunk(ctx context.Context) error {
start := time.Now()
totalBytes := 0
dataBytes := 0
Expand Down Expand Up @@ -207,12 +211,12 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
zap.Int64("offset", processorWMB.Offset),
zap.Int64("watermark", processorWMB.Watermark),
zap.Bool("idle", processorWMB.Idle))
return
return nil
}

// if the validation passed, we will publish the watermark to all the toBuffer partitions.
idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark)))
return
return nil
}

var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages))
Expand Down Expand Up @@ -266,7 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("failed to write to sink", zap.Error(err))
df.fromBufferPartition.NoAck(ctx, readOffsets)
return
return err
}

// Only when fallback is configured, it is possible to return fallbackMessages. If there's any, write to the fallback sink.
Expand All @@ -277,7 +281,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
_, _, err = df.writeToSink(ctx, df.opts.fbSinkWriter, fallbackMessages, true)
if err != nil {
df.opts.logger.Errorw("Failed to write to fallback sink", zap.Error(err))
return
df.fromBufferPartition.NoAck(ctx, readOffsets)
return err
}
}

Expand All @@ -300,7 +305,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
if err != nil {
df.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err))
metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))
return
return nil
}
metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets)))

Expand All @@ -311,6 +316,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
}
// ProcessingTimes of the entire forwardAChunk
metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds()))
return nil
}

// ackFromBuffer acknowledges an array of offsets back to fromBufferPartition and is a blocking call or until shutdown has been initiated.
Expand Down Expand Up @@ -390,20 +396,26 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
_writeOffsets, errs := sinkWriter.Write(ctx, messagesToTry)
for idx, msg := range messagesToTry {
if err = errs[idx]; err != nil {
var udsinkErr = new(udsink.ApplyUDSinkErr)
if errors.As(err, &udsinkErr) {
if udsinkErr.IsInternalErr() {
return false, err
}
}
// if we are asked to write to fallback sink, check if the fallback sink is configured,
// and we are not already in the fallback sink write path.
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
fallbackMessages = append(fallbackMessages, msg)
continue
}

// if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink")
}

// if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink
if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
if errors.Is(err, udsink.WriteToFallbackErr) && isFbSinkWriter {
df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink")
}

Expand Down Expand Up @@ -444,9 +456,8 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
}
return true, nil
})
// If we exited out of the loop and it was due to a forced shutdown we should exit
// TODO(Retry-Sink): Check for ctx done separately? That should be covered in shutdown
if ok, _ := df.IsShuttingDown(); err != nil && ok {

if err != nil {
return nil, nil, err
}
// Check what actions are required once the writing loop is completed
Expand Down
27 changes: 13 additions & 14 deletions pkg/sinks/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,21 +256,20 @@ func (u *SinkProcessor) Start(ctx context.Context) error {
defer finalWg.Done()
log.Infow("Start processing sink messages ", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("fromPartition ", fromBufferPartitionName))
stopped := sinkForwarder.Start()
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
<-stopped
log.Info("Sink forwarder stopped, exiting sink processor...")
return
select {
case <-ctx.Done(): // context cancelled case
log.Info("Context cancelled, stopping forwarder for partition...", zap.String("partition", fromBufferPartitionName))
sinkForwarder.Stop()
if err := <-stopped; err != nil {
log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err))
}
}()
<-ctx.Done()
log.Infow("SIGTERM exiting inside partition...", zap.String("fromPartition", fromBufferPartitionName))
sinkForwarder.Stop()
wg.Wait()
log.Infow("Exited for partition...", zap.String("fromPartition", fromBufferPartitionName))
log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName))
case err := <-stopped: // critical error case
if err != nil {
log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err))
cancel()
}
}
}(df, readers[index].GetName())
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
)

var (
WriteToFallbackErr = ApplyUDSinkErr{
WriteToFallbackErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "write to fallback sink",
}

UnknownUDSinkErr = ApplyUDSinkErr{
UnknownUDSinkErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "unknown error in udsink",
}
NotFoundErr = ApplyUDSinkErr{
NotFoundErr error = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "not found in response",
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
}
for i, m := range requests {
if r, existing := resMap[m.Request.GetId()]; !existing {
errs[i] = &NotFoundErr
errs[i] = NotFoundErr
} else {
if r.GetStatus() == sinkpb.Status_FAILURE {
if r.GetErrMsg() != "" {
Expand All @@ -123,10 +123,10 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
Message: r.GetErrMsg(),
}
} else {
errs[i] = &UnknownUDSinkErr
errs[i] = UnknownUDSinkErr
}
} else if r.GetStatus() == sinkpb.Status_FALLBACK {
errs[i] = &WriteToFallbackErr
errs[i] = WriteToFallbackErr
} else {
errs[i] = nil
}
Expand Down
72 changes: 72 additions & 0 deletions pkg/sources/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package errors

// SourceReadErr represents any source read related error
type SourceReadErr struct {
Message string
Retryable bool
}

func (e *SourceReadErr) Error() string {
return e.Message
}

func (e *SourceReadErr) Is(target error) bool {
return target.Error() == e.Error()
}

// IsRetryable is true if the error is retryable
func (e *SourceReadErr) IsRetryable() bool {
return e.Retryable
}

type SourceAckErr struct {
Message string
Retryable bool
}

func (e *SourceAckErr) Error() string {
return e.Message
}

func (e *SourceAckErr) Is(target error) bool {
return target.Error() == e.Error()
}

// IsRetryable is true if the error is retryable
func (e *SourceAckErr) IsRetryable() bool {
return e.Retryable
}

type SourcePendingErr struct {
Message string
Retryable bool
}

func (e *SourcePendingErr) Error() string {
return e.Message
}

func (e *SourcePendingErr) Is(target error) bool {
return target.Error() == e.Error()
}

// IsRetryable is true if the error is retryable
func (e *SourcePendingErr) IsRetryable() bool {
return e.Retryable
}
Loading

0 comments on commit 4b248e2

Please sign in to comment.