From 9140f799b56905fffccfccff45015d6cf7e18008 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 7 Nov 2024 01:32:38 +0530 Subject: [PATCH] chore: shutdown when we see non retryable udf errors (#2204) Signed-off-by: Yashash H L --- pkg/forwarder/interfaces.go | 4 +- pkg/sinks/forward/forward.go | 41 +++++++----- pkg/sinks/sink.go | 27 ++++---- pkg/sinks/udsink/udsink_grpc.go | 12 ++-- pkg/sources/errors/errors.go | 72 +++++++++++++++++++++ pkg/sources/forward/data_forward.go | 72 +++++++++++---------- pkg/sources/source.go | 35 +++++----- pkg/sources/udsource/user_defined_source.go | 28 ++++++-- pkg/udf/forward/forward.go | 69 ++++++++++---------- pkg/udf/map_udf.go | 29 ++++----- 10 files changed, 246 insertions(+), 143 deletions(-) create mode 100644 pkg/sources/errors/errors.go diff --git a/pkg/forwarder/interfaces.go b/pkg/forwarder/interfaces.go index db628a4083..e745899ffc 100644 --- a/pkg/forwarder/interfaces.go +++ b/pkg/forwarder/interfaces.go @@ -49,7 +49,9 @@ func (gw GoWhere) WhereTo(ks []string, ts []string, id string) ([]VertexBuffer, // StarterStopper starts/stops the forwarding. type StarterStopper interface { - Start() <-chan struct{} + // Start returns a channel that can be used to listen for errors. If + // the channel is closed without any error, it means the forwarder has stopped. + Start() <-chan error Stop() ForceStop() } diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go index ad53b5fa51..832611c6dc 100644 --- a/pkg/sinks/forward/forward.go +++ b/pkg/sinks/forward/forward.go @@ -111,9 +111,9 @@ func NewDataForward( } // Start starts reading the buffer and forwards to sinker. Call `Stop` to stop. -func (df *DataForward) Start() <-chan struct{} { +func (df *DataForward) Start() <-chan error { log := logging.FromContext(df.ctx) - stopped := make(chan struct{}) + stopped := make(chan error) var wg sync.WaitGroup wg.Add(1) go func() { @@ -137,7 +137,11 @@ func (df *DataForward) Start() <-chan struct{} { // shutdown the fromBufferPartition should be empty. } // keep doing what you are good at - df.forwardAChunk(df.ctx) + if err := df.forwardAChunk(df.ctx); err != nil { + log.Errorw("Failed to forward a chunk", zap.Error(err)) + stopped <- err + return + } } }() @@ -176,7 +180,7 @@ func (df *DataForward) Start() <-chan struct{} { // for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack // the message after forwarding, barring any platform errors. The platform errors include buffer-full, // buffer-not-reachable, etc., but does not include errors due to WhereTo, etc. -func (df *DataForward) forwardAChunk(ctx context.Context) { +func (df *DataForward) forwardAChunk(ctx context.Context) error { start := time.Now() totalBytes := 0 dataBytes := 0 @@ -207,12 +211,12 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { zap.Int64("offset", processorWMB.Offset), zap.Int64("watermark", processorWMB.Watermark), zap.Bool("idle", processorWMB.Idle)) - return + return nil } // if the validation passed, we will publish the watermark to all the toBuffer partitions. idlehandler.PublishIdleWatermark(ctx, df.sinkWriter.GetPartitionIdx(), df.sinkWriter, df.wmPublisher, df.idleManager, df.opts.logger, df.vertexName, df.pipelineName, dfv1.VertexTypeSink, df.vertexReplica, wmb.Watermark(time.UnixMilli(processorWMB.Watermark))) - return + return nil } var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages)) @@ -266,7 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { if err != nil { df.opts.logger.Errorw("failed to write to sink", zap.Error(err)) df.fromBufferPartition.NoAck(ctx, readOffsets) - return + return err } // Only when fallback is configured, it is possible to return fallbackMessages. If there's any, write to the fallback sink. @@ -277,7 +281,8 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { _, _, err = df.writeToSink(ctx, df.opts.fbSinkWriter, fallbackMessages, true) if err != nil { df.opts.logger.Errorw("Failed to write to fallback sink", zap.Error(err)) - return + df.fromBufferPartition.NoAck(ctx, readOffsets) + return err } } @@ -300,7 +305,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { if err != nil { df.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err)) metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) - return + return nil } metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) @@ -311,6 +316,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { } // ProcessingTimes of the entire forwardAChunk metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: df.vertexName, metrics.LabelPipeline: df.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeSink), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica))}).Observe(float64(time.Since(start).Microseconds())) + return nil } // ackFromBuffer acknowledges an array of offsets back to fromBufferPartition and is a blocking call or until shutdown has been initiated. @@ -390,20 +396,26 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr _writeOffsets, errs := sinkWriter.Write(ctx, messagesToTry) for idx, msg := range messagesToTry { if err = errs[idx]; err != nil { + var udsinkErr = new(udsink.ApplyUDSinkErr) + if errors.As(err, &udsinkErr) { + if udsinkErr.IsInternalErr() { + return false, err + } + } // if we are asked to write to fallback sink, check if the fallback sink is configured, // and we are not already in the fallback sink write path. - if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter { + if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter { fallbackMessages = append(fallbackMessages, msg) continue } // if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink - if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil { + if errors.Is(err, udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil { df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink") } // if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink - if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter { + if errors.Is(err, udsink.WriteToFallbackErr) && isFbSinkWriter { df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink") } @@ -444,9 +456,8 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr } return true, nil }) - // If we exited out of the loop and it was due to a forced shutdown we should exit - // TODO(Retry-Sink): Check for ctx done separately? That should be covered in shutdown - if ok, _ := df.IsShuttingDown(); err != nil && ok { + + if err != nil { return nil, nil, err } // Check what actions are required once the writing loop is completed diff --git a/pkg/sinks/sink.go b/pkg/sinks/sink.go index 73075dc1cc..3355cbe70c 100644 --- a/pkg/sinks/sink.go +++ b/pkg/sinks/sink.go @@ -256,21 +256,20 @@ func (u *SinkProcessor) Start(ctx context.Context) error { defer finalWg.Done() log.Infow("Start processing sink messages ", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("fromPartition ", fromBufferPartitionName)) stopped := sinkForwarder.Start() - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - for { - <-stopped - log.Info("Sink forwarder stopped, exiting sink processor...") - return + select { + case <-ctx.Done(): // context cancelled case + log.Info("Context cancelled, stopping forwarder for partition...", zap.String("partition", fromBufferPartitionName)) + sinkForwarder.Stop() + if err := <-stopped; err != nil { + log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err)) } - }() - <-ctx.Done() - log.Infow("SIGTERM exiting inside partition...", zap.String("fromPartition", fromBufferPartitionName)) - sinkForwarder.Stop() - wg.Wait() - log.Infow("Exited for partition...", zap.String("fromPartition", fromBufferPartitionName)) + log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName)) + case err := <-stopped: // critical error case + if err != nil { + log.Errorw("Sink forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err)) + cancel() + } + } }(df, readers[index].GetName()) } diff --git a/pkg/sinks/udsink/udsink_grpc.go b/pkg/sinks/udsink/udsink_grpc.go index 6b9a1a77eb..1656c26111 100644 --- a/pkg/sinks/udsink/udsink_grpc.go +++ b/pkg/sinks/udsink/udsink_grpc.go @@ -29,16 +29,16 @@ import ( ) var ( - WriteToFallbackErr = ApplyUDSinkErr{ + WriteToFallbackErr error = &ApplyUDSinkErr{ UserUDSinkErr: true, Message: "write to fallback sink", } - UnknownUDSinkErr = ApplyUDSinkErr{ + UnknownUDSinkErr error = &ApplyUDSinkErr{ UserUDSinkErr: true, Message: "unknown error in udsink", } - NotFoundErr = ApplyUDSinkErr{ + NotFoundErr error = &ApplyUDSinkErr{ UserUDSinkErr: true, Message: "not found in response", } @@ -114,7 +114,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S } for i, m := range requests { if r, existing := resMap[m.Request.GetId()]; !existing { - errs[i] = &NotFoundErr + errs[i] = NotFoundErr } else { if r.GetStatus() == sinkpb.Status_FAILURE { if r.GetErrMsg() != "" { @@ -123,10 +123,10 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S Message: r.GetErrMsg(), } } else { - errs[i] = &UnknownUDSinkErr + errs[i] = UnknownUDSinkErr } } else if r.GetStatus() == sinkpb.Status_FALLBACK { - errs[i] = &WriteToFallbackErr + errs[i] = WriteToFallbackErr } else { errs[i] = nil } diff --git a/pkg/sources/errors/errors.go b/pkg/sources/errors/errors.go new file mode 100644 index 0000000000..2d6582fcde --- /dev/null +++ b/pkg/sources/errors/errors.go @@ -0,0 +1,72 @@ +/* +Copyright 2022 The Numaproj Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +// SourceReadErr represents any source read related error +type SourceReadErr struct { + Message string + Retryable bool +} + +func (e *SourceReadErr) Error() string { + return e.Message +} + +func (e *SourceReadErr) Is(target error) bool { + return target.Error() == e.Error() +} + +// IsRetryable is true if the error is retryable +func (e *SourceReadErr) IsRetryable() bool { + return e.Retryable +} + +type SourceAckErr struct { + Message string + Retryable bool +} + +func (e *SourceAckErr) Error() string { + return e.Message +} + +func (e *SourceAckErr) Is(target error) bool { + return target.Error() == e.Error() +} + +// IsRetryable is true if the error is retryable +func (e *SourceAckErr) IsRetryable() bool { + return e.Retryable +} + +type SourcePendingErr struct { + Message string + Retryable bool +} + +func (e *SourcePendingErr) Error() string { + return e.Message +} + +func (e *SourcePendingErr) Is(target error) bool { + return target.Error() == e.Error() +} + +// IsRetryable is true if the error is retryable +func (e *SourcePendingErr) IsRetryable() bool { + return e.Retryable +} diff --git a/pkg/sources/forward/data_forward.go b/pkg/sources/forward/data_forward.go index fc22d8ac0b..63d230652c 100644 --- a/pkg/sources/forward/data_forward.go +++ b/pkg/sources/forward/data_forward.go @@ -32,6 +32,7 @@ import ( "github.com/numaproj/numaflow/pkg/metrics" "github.com/numaproj/numaflow/pkg/shared/idlehandler" "github.com/numaproj/numaflow/pkg/shared/logging" + errors2 "github.com/numaproj/numaflow/pkg/sources/errors" "github.com/numaproj/numaflow/pkg/sources/sourcer" "github.com/numaproj/numaflow/pkg/watermark/entity" "github.com/numaproj/numaflow/pkg/watermark/fetch" @@ -119,9 +120,9 @@ func NewDataForward( } // Start starts reading from source and forwards to the next buffers. Call `Stop` to stop. -func (df *DataForward) Start() <-chan struct{} { +func (df *DataForward) Start() <-chan error { log := logging.FromContext(df.ctx) - stopped := make(chan struct{}) + stopped := make(chan error) var wg sync.WaitGroup wg.Add(1) go func() { @@ -145,7 +146,10 @@ func (df *DataForward) Start() <-chan struct{} { // shutdown the reader should be empty. } // keep doing what you are good at - df.forwardAChunk(df.ctx) + if err := df.forwardAChunk(df.ctx); err != nil { + stopped <- err + return + } } }() @@ -188,7 +192,7 @@ func (df *DataForward) Start() <-chan struct{} { // for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack // the message after forwarding, barring any platform errors. The platform errors include buffer-full, // buffer-not-reachable, etc., but do not include errors due to user code transformer, WhereTo, etc. -func (df *DataForward) forwardAChunk(ctx context.Context) { +func (df *DataForward) forwardAChunk(ctx context.Context) error { start := time.Now() totalBytes := 0 // There is a chance that we have read the message and the container got forcefully terminated before processing. To provide @@ -204,13 +208,21 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName(), }).Inc() + + // if the error is not retryable, we should return the error. + var readErr = new(errors2.SourceReadErr) + if errors.As(err, &readErr) { + if !readErr.IsRetryable() { + return err + } + } } // if there are no read messages, we return early. if len(readMessages) == 0 { // not idling, so nothing much to do if !df.srcIdleHandler.IsSourceIdling() { - return + return nil } // if the source is idling, we will publish idle watermark to the source and all the toBuffers @@ -243,7 +255,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { } // len(readMessages) == 0, so we do not have anything more to do - return + return nil } // reset the idle handler because we have read messages @@ -314,7 +326,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { readWriteMessagePairs, err = df.applyTransformer(ctx, readMessages) if err != nil { df.opts.logger.Errorw("failed to apply source transformer", zap.Error(err)) - return + return err } df.opts.logger.Debugw("concurrent applyTransformer completed", @@ -381,7 +393,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { for _, message := range m.WriteMessages { if err = df.whereToStep(message, messageToStep); err != nil { df.opts.logger.Errorw("failed in whereToStep", zap.Error(err)) - return + return err } } // get the list of source partitions for which we have read messages, we will use this to publish watermarks to toVertices @@ -392,7 +404,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { writeOffsets, err = df.writeToBuffers(ctx, messageToStep) if err != nil { df.opts.logger.Errorw("failed to write to toBuffers", zap.Error(err)) - return + return err } // activeWatermarkBuffers records the buffers that the publisher has published @@ -463,8 +475,14 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), metrics.LabelPartitionName: df.reader.GetName(), }).Add(float64(len(readOffsets))) - - return + // if the error is not retryable, we should return the error. + var ackErr = new(errors2.SourceAckErr) + if errors.As(err, &ackErr) { + if !ackErr.IsRetryable() { + return err + } + } + return nil } metrics.AckMessagesCount.With(map[string]string{ metrics.LabelVertex: df.vertexName, @@ -487,13 +505,17 @@ func (df *DataForward) forwardAChunk(ctx context.Context) { metrics.LabelVertexType: string(dfv1.VertexTypeSource), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), }).Observe(float64(time.Since(start).Microseconds())) + return nil } func (df *DataForward) ackFromSource(ctx context.Context, offsets []isb.Offset) error { // for all the sources, we either ack all offsets or none. // when a batch ack fails, the source Ack() function populate the error array with the same error; // hence we can just return the first error. - return df.reader.Ack(ctx, offsets)[0] + if errs := df.reader.Ack(ctx, offsets); len(errs) > 0 { + return errs[0] + } + return nil } // writeToBuffers is a blocking call until all the messages have been forwarded to all the toBuffers, or a shutdown @@ -638,29 +660,11 @@ func (df *DataForward) writeToBuffer(ctx context.Context, toBufferPartition isb. // the skip flag is set. The ShutDown flag will only if there is an InternalErr and ForceStop has been invoked. // The UserError retry will be done on the applyTransformer. func (df *DataForward) applyTransformer(ctx context.Context, messages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) { - for { - transformResults, err := df.opts.transformer.ApplyTransform(ctx, messages) - if err != nil { - df.opts.logger.Errorw("Transformer.Apply error", zap.Error(err)) - // TODO: implement retry with backoff etc. - time.Sleep(df.opts.retryInterval) - // keep retrying, I cannot think of a use case where a user could say, errors are fine :-) - // as a platform, we should not lose or corrupt data. - // this does not mean we should prohibit this from a shutdown. - if ok, _ := df.IsShuttingDown(); ok { - df.opts.logger.Errorw("Transformer.Apply, Stop called while stuck on an internal error", zap.Error(err)) - metrics.PlatformError.With(map[string]string{ - metrics.LabelVertex: df.vertexName, - metrics.LabelPipeline: df.pipelineName, - metrics.LabelVertexType: string(dfv1.VertexTypeSource), - metrics.LabelVertexReplicaIndex: strconv.Itoa(int(df.vertexReplica)), - }).Inc() - return nil, err - } - continue - } - return transformResults, nil + transformResults, err := df.opts.transformer.ApplyTransform(ctx, messages) + if err != nil { + return nil, err } + return transformResults, nil } // whereToStep executes the WhereTo interfaces and then updates the to step's writeToBuffers buffer. diff --git a/pkg/sources/source.go b/pkg/sources/source.go index 69bc0c0099..6ed2b78d5b 100644 --- a/pkg/sources/source.go +++ b/pkg/sources/source.go @@ -21,7 +21,6 @@ import ( "fmt" "os" "strconv" - "sync" "time" "go.uber.org/zap" @@ -295,19 +294,6 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { return fmt.Errorf("failed to create source forwarder, error: %w", err) } - log.Infow("Start processing source messages", zap.String("isbs", string(sp.ISBSvcType)), zap.Any("to", sp.VertexInstance.Vertex.GetToBuffers())) - stopped := sourceForwarder.Start() - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - for { - <-stopped - log.Info("Source forwarder stopped, exiting...") - return - } - }() - metricsOpts := metrics.NewMetricsOptions(ctx, sp.VertexInstance.Vertex, healthCheckers, []isb.LagReader{sourceReader}) ms := metrics.NewMetricsServer(sp.VertexInstance.Vertex, metricsOpts...) if shutdown, err := ms.Start(ctx); err != nil { @@ -315,10 +301,23 @@ func (sp *SourceProcessor) Start(ctx context.Context) error { } else { defer func() { _ = shutdown(context.Background()) }() } - <-ctx.Done() - log.Info("SIGTERM, exiting...") - sourceForwarder.Stop() - wg.Wait() + + log.Infow("Start processing source messages", zap.String("isbs", string(sp.ISBSvcType)), zap.Any("to", sp.VertexInstance.Vertex.GetToBuffers())) + stopped := sourceForwarder.Start() + select { + case <-ctx.Done(): // context cancelled case + log.Info("Context cancelled, stopping forwarder for partition...") + sourceForwarder.Stop() + if err := <-stopped; err != nil { + log.Errorw("Source forwarder stopped with error", zap.Error(err)) + } + log.Info("Exited source forwarder...") + case err := <-stopped: // critical error case + if err != nil { + log.Errorw("Source forwarder stopped with error", zap.Error(err)) + cancel() + } + } // close all the sourceReader wm stores for _, wmStore := range sourceWmStores { diff --git a/pkg/sources/udsource/user_defined_source.go b/pkg/sources/udsource/user_defined_source.go index 5ba77019a1..37a36a46db 100644 --- a/pkg/sources/udsource/user_defined_source.go +++ b/pkg/sources/udsource/user_defined_source.go @@ -25,6 +25,7 @@ import ( dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1" "github.com/numaproj/numaflow/pkg/isb" "github.com/numaproj/numaflow/pkg/shared/logging" + "github.com/numaproj/numaflow/pkg/sources/errors" "github.com/numaproj/numaflow/pkg/sources/sourcer" ) @@ -49,7 +50,6 @@ type userDefinedSource struct { // NewUserDefinedSource returns a new user-defined source reader. func NewUserDefinedSource(ctx context.Context, vertexInstance *dfv1.VertexInstance, sourceApplier *GRPCBasedUDSource, opts ...Option) (sourcer.SourceReader, error) { var err error - u := &userDefinedSource{ vertexName: vertexInstance.Vertex.Spec.Name, pipelineName: vertexInstance.Vertex.Spec.PipelineName, @@ -82,18 +82,38 @@ func (u *userDefinedSource) Partitions(ctx context.Context) []int32 { // Read reads the messages from the user-defined source. func (u *userDefinedSource) Read(ctx context.Context, count int64) ([]*isb.ReadMessage, error) { - return u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout) + messages, err := u.sourceApplier.ApplyReadFn(ctx, count, u.readTimeout) + if err != nil { + return nil, &errors.SourceReadErr{ + Message: err.Error(), + Retryable: false, + } + } + return messages, nil } // Ack acknowledges the messages from the user-defined source // If there is an error, return the error using an error array func (u *userDefinedSource) Ack(ctx context.Context, offsets []isb.Offset) []error { - return []error{u.sourceApplier.ApplyAckFn(ctx, offsets)} + if err := u.sourceApplier.ApplyAckFn(ctx, offsets); err != nil { + return []error{&errors.SourceAckErr{ + Message: err.Error(), + Retryable: false, + }} + } + return []error{} } // Pending returns the number of pending messages in the user-defined source func (u *userDefinedSource) Pending(ctx context.Context) (int64, error) { - return u.sourceApplier.ApplyPendingFn(ctx) + pending, err := u.sourceApplier.ApplyPendingFn(ctx) + if err != nil { + return 0, &errors.SourcePendingErr{ + Message: err.Error(), + Retryable: false, + } + } + return pending, nil } func (u *userDefinedSource) Close() error { diff --git a/pkg/udf/forward/forward.go b/pkg/udf/forward/forward.go index e081359cf1..2d61e32408 100644 --- a/pkg/udf/forward/forward.go +++ b/pkg/udf/forward/forward.go @@ -112,9 +112,9 @@ func NewInterStepDataForward(vertexInstance *dfv1.VertexInstance, fromStep isb.B } // Start starts reading the buffer and forwards to the next buffers. Call `Stop` to stop. -func (isdf *InterStepDataForward) Start() <-chan struct{} { +func (isdf *InterStepDataForward) Start() <-chan error { log := logging.FromContext(isdf.ctx) - stopped := make(chan struct{}) + stopped := make(chan error) var wg sync.WaitGroup wg.Add(1) go func() { @@ -137,8 +137,12 @@ func (isdf *InterStepDataForward) Start() <-chan struct{} { // once context.Done() is called, we still have to try to forwardAChunk because in graceful // shutdown the fromBufferPartition should be empty. } - // keep doing what you are good at - isdf.forwardAChunk(isdf.ctx) + // keep doing what you are good at, if we get an error we will stop. + if err := isdf.forwardAChunk(isdf.ctx); err != nil { + log.Errorw("Failed to forward a chunk", zap.Error(err)) + stopped <- err + return + } } }() @@ -170,7 +174,7 @@ func (isdf *InterStepDataForward) Start() <-chan struct{} { // for a chunk of messages returned by the first Read call. It will return only if only we are successfully able to ack // the message after forwarding, barring any platform errors. The platform errors include buffer-full, // buffer-not-reachable, etc., but does not include errors due to user code UDFs, WhereTo, etc. -func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { +func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) error { start := time.Now() totalBytes := 0 dataBytes := 0 @@ -201,7 +205,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { zap.Int64("offset", processorWMB.Offset), zap.Int64("watermark", processorWMB.Watermark), zap.Bool("idle", processorWMB.Idle)) - return + return nil } // if the validation passed, we will publish the watermark to all the toBuffer partitions. @@ -212,7 +216,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { } } } - return + return nil } var dataMessages = make([]*isb.ReadMessage, 0, len(readMessages)) @@ -227,6 +231,17 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { dataBytes += len(m.Payload) } } + + // If we don't have any data messages(we received only wmbs), we can ack all the readOffsets and return early. + if len(dataMessages) == 0 { + if err := isdf.ackFromBuffer(ctx, readOffsets); err != nil { + isdf.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err)) + metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) + return err + } + return nil + } + metrics.ReadDataMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(dataMessages))) metrics.ReadMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readMessages))) metrics.ReadBytesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(totalBytes)) @@ -254,7 +269,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { isdf.opts.logger.Errorw("failed to streamMessage", zap.Error(err)) // As there's no partial failure, non-ack all the readOffsets isdf.fromBufferPartition.NoAck(ctx, readOffsets) - return + return err } } else { // create space for writeMessages specific to each step as we could forward to all the steps too. @@ -271,7 +286,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { isdf.opts.logger.Errorw("failed to applyUDF", zap.Error(err)) // As there's no partial failure, non-ack all the readOffsets isdf.fromBufferPartition.NoAck(ctx, readOffsets) - return + return err } // let's figure out which vertex to send the results to. @@ -282,7 +297,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { if err := isdf.whereToStep(message, messageToStep, m.ReadMessage); err != nil { isdf.opts.logger.Errorw("failed in whereToStep", zap.Error(err)) isdf.fromBufferPartition.NoAck(ctx, readOffsets) - return + return err } } } @@ -292,7 +307,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { if err != nil { isdf.opts.logger.Errorw("failed to write to toBuffers", zap.Error(err)) isdf.fromBufferPartition.NoAck(ctx, readOffsets) - return + return err } isdf.opts.logger.Debugw("writeToBuffers completed") } @@ -348,7 +363,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { if err != nil { isdf.opts.logger.Errorw("Failed to ack from buffer", zap.Error(err)) metrics.AckMessageError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) - return + return err } metrics.AckMessagesCount.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica)), metrics.LabelPartitionName: isdf.fromBufferPartition.GetName()}).Add(float64(len(readOffsets))) @@ -360,6 +375,7 @@ func (isdf *InterStepDataForward) forwardAChunk(ctx context.Context) { } // ProcessingTimes of the entire forwardAChunk metrics.ForwardAChunkProcessingTime.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica))}).Observe(float64(time.Since(start).Microseconds())) + return nil } // streamMessage streams the data messages to the next step. @@ -376,7 +392,7 @@ func (isdf *InterStepDataForward) streamMessage(ctx context.Context, dataMessage // Ensure dataMessages length is 1 for streaming if len(dataMessages) != 1 { errMsg := "data message size is not 1 with map UDF streaming" - isdf.opts.logger.Errorw(errMsg) + isdf.opts.logger.Errorw(errMsg, zap.Int("dataMessagesSize", len(dataMessages))) return nil, errors.New(errMsg) } @@ -618,29 +634,12 @@ func (isdf *InterStepDataForward) writeToBuffer(ctx context.Context, toBufferPar // the skip flag is set. ShutDown flag will only if there is an InternalErr and ForceStop has been invoked. // The UserError retry will be done on the ApplyUDF. func (isdf *InterStepDataForward) applyUDF(ctx context.Context, readMessages []*isb.ReadMessage) ([]isb.ReadWriteMessagePair, error) { - for { - writeMessages, err := isdf.opts.unaryMapUdfApplier.ApplyMap(ctx, readMessages) - if err != nil { - isdf.opts.logger.Errorw("mapUDF.Apply error", zap.Error(err)) - // TODO: implement retry with backoff etc. - select { - case <-ctx.Done(): - // no point in retrying if the context is cancelled - return nil, err - case <-time.After(isdf.opts.retryInterval): - } - // keep retrying, I cannot think of a use case where a user could say, errors are fine :-) - // as a platform we should not lose or corrupt data. - // this does not mean we should prohibit this from a shutdown. - if ok, _ := isdf.IsShuttingDown(); ok { - isdf.opts.logger.Errorw("mapUDF.Apply, Stop called while stuck on an internal error", zap.Error(err)) - metrics.PlatformError.With(map[string]string{metrics.LabelVertex: isdf.vertexName, metrics.LabelPipeline: isdf.pipelineName, metrics.LabelVertexType: string(dfv1.VertexTypeMapUDF), metrics.LabelVertexReplicaIndex: strconv.Itoa(int(isdf.vertexReplica))}).Inc() - return nil, err - } - continue - } - return writeMessages, nil + writeMessages, err := isdf.opts.unaryMapUdfApplier.ApplyMap(ctx, readMessages) + if err != nil { + isdf.opts.logger.Errorw("mapUDF.Apply error", zap.Error(err)) + return nil, err } + return writeMessages, nil } // whereToStep executes the WhereTo interfaces and then updates the to step's writeToBuffers buffer. diff --git a/pkg/udf/map_udf.go b/pkg/udf/map_udf.go index 4a913eca1a..8cf10f36a4 100644 --- a/pkg/udf/map_udf.go +++ b/pkg/udf/map_udf.go @@ -291,24 +291,21 @@ func (u *MapUDFProcessor) Start(ctx context.Context) error { go func(fromBufferPartitionName string, isdf *forward.InterStepDataForward) { defer finalWg.Done() log.Infow("Start processing udf messages", zap.String("isbsvc", string(u.ISBSvcType)), zap.String("from", fromBufferPartitionName), zap.Any("to", u.VertexInstance.Vertex.GetToBuffers())) - stopped := isdf.Start() - wg := &sync.WaitGroup{} - wg.Add(1) - go func() { - defer wg.Done() - for { - <-stopped - log.Info("Forwarder stopped, exiting udf data processor for partition " + fromBufferPartitionName + "...") - return + select { + case <-ctx.Done(): + log.Info("Context cancelled, stopping forwarder for partition...", zap.String("partition", fromBufferPartitionName)) + isdf.Stop() + if err := <-stopped; err != nil { + log.Errorw("Map forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err)) } - }() - - <-ctx.Done() - log.Info("SIGTERM, exiting inside partition...", zap.String("partition", fromBufferPartitionName)) - isdf.Stop() - wg.Wait() - log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName)) + log.Info("Exited for partition...", zap.String("partition", fromBufferPartitionName)) + case err := <-stopped: + if err != nil { + log.Errorw("Map forwarder stopped with error", zap.String("fromPartition", fromBufferPartitionName), zap.Error(err)) + cancel() + } + } }(bufferPartition, df) } // create lag readers from buffer readers