diff --git a/pkg/sinks/forward/forward.go b/pkg/sinks/forward/forward.go index 400b076734..ecb0f7c133 100644 --- a/pkg/sinks/forward/forward.go +++ b/pkg/sinks/forward/forward.go @@ -351,18 +351,18 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr if err = errs[idx]; err != nil { // if we are asked to write to fallback sink, check if the fallback sink is configured, // and we are not already in the fallback sink write path. - if errors.As(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter { + if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter { fallbackMessages = append(fallbackMessages, msg) continue } // if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink - if errors.As(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil { + if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil { df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink") } // if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink - if errors.As(err, &udsink.WriteToFallbackErr) && isFbSinkWriter { + if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter { df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink") } diff --git a/pkg/sinks/udsink/errors.go b/pkg/sinks/udsink/errors.go index b6844279b2..3caddd0607 100644 --- a/pkg/sinks/udsink/errors.go +++ b/pkg/sinks/udsink/errors.go @@ -32,15 +32,15 @@ type InternalErr struct { } // IsUserUDSinkErr is true if the problem is due to the user code in the UDSink. -func (e ApplyUDSinkErr) IsUserUDSinkErr() bool { +func (e *ApplyUDSinkErr) IsUserUDSinkErr() bool { return e.UserUDSinkErr } // IsInternalErr is true if this is a platform issue. This is a blocking error. -func (e ApplyUDSinkErr) IsInternalErr() bool { +func (e *ApplyUDSinkErr) IsInternalErr() bool { return e.InternalErr.Flag } -func (e ApplyUDSinkErr) Error() string { +func (e *ApplyUDSinkErr) Error() string { return fmt.Sprint(e.Message) } diff --git a/pkg/sinks/udsink/udsink_grpc.go b/pkg/sinks/udsink/udsink_grpc.go index 9b1a61627e..d52105823e 100644 --- a/pkg/sinks/udsink/udsink_grpc.go +++ b/pkg/sinks/udsink/udsink_grpc.go @@ -94,7 +94,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S response, err := u.client.SinkFn(ctx, requests) if err != nil { for i := range requests { - errs[i] = ApplyUDSinkErr{ + errs[i] = &ApplyUDSinkErr{ UserUDSinkErr: false, Message: fmt.Sprintf("gRPC client.SinkFn failed, %s", err), InternalErr: InternalErr{ @@ -112,19 +112,19 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S } for i, m := range requests { if r, existing := resMap[m.GetId()]; !existing { - errs[i] = NotFoundErr + errs[i] = &NotFoundErr } else { if r.GetStatus() == sinkpb.Status_FAILURE { if r.GetErrMsg() != "" { - errs[i] = ApplyUDSinkErr{ + errs[i] = &ApplyUDSinkErr{ UserUDSinkErr: true, Message: r.GetErrMsg(), } } else { - errs[i] = UnknownUDSinkErr + errs[i] = &UnknownUDSinkErr } } else if r.GetStatus() == sinkpb.Status_FALLBACK { - errs[i] = WriteToFallbackErr + errs[i] = &WriteToFallbackErr } else { errs[i] = nil } diff --git a/pkg/sinks/udsink/udsink_grpc_test.go b/pkg/sinks/udsink/udsink_grpc_test.go index 0a7fbe60e7..b367416cc5 100644 --- a/pkg/sinks/udsink/udsink_grpc_test.go +++ b/pkg/sinks/udsink/udsink_grpc_test.go @@ -114,7 +114,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) { assert.Equal(t, 2, len(gotErrList)) assert.Equal(t, nil, gotErrList[0]) - assert.Equal(t, ApplyUDSinkErr{ + assert.Equal(t, &ApplyUDSinkErr{ UserUDSinkErr: true, Message: "mock sink message error", }, gotErrList[1]) @@ -158,7 +158,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) { u := NewMockUDSgRPCBasedUDSink(mockClient) gotErrList := u.ApplySink(ctx, testDatumList) expectedErrList := []error{ - ApplyUDSinkErr{ + &ApplyUDSinkErr{ UserUDSinkErr: false, Message: "gRPC client.SinkFn failed, failed to execute c.grpcClt.SinkFn(): mock SinkFn error", InternalErr: InternalErr{ @@ -166,7 +166,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) { MainCarDown: false, }, }, - ApplyUDSinkErr{ + &ApplyUDSinkErr{ UserUDSinkErr: false, Message: "gRPC client.SinkFn failed, failed to execute c.grpcClt.SinkFn(): mock SinkFn error", InternalErr: InternalErr{ diff --git a/pkg/sources/forward/data_forward_test.go b/pkg/sources/forward/data_forward_test.go index 0dbf09cdf4..4f69728e14 100644 --- a/pkg/sources/forward/data_forward_test.go +++ b/pkg/sources/forward/data_forward_test.go @@ -1161,7 +1161,7 @@ func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string, s string) ([]f } func (f myForwardInternalErrTest) ApplyTransform(_ context.Context, _ *isb.ReadMessage) ([]*isb.WriteMessage, error) { - return nil, udfapplier.ApplyUDFErr{ + return nil, &udfapplier.ApplyUDFErr{ UserUDFErr: false, InternalErr: struct { Flag bool diff --git a/pkg/sources/transformer/grpc_transformer.go b/pkg/sources/transformer/grpc_transformer.go index 9b4b437d3c..fe15a75c49 100644 --- a/pkg/sources/transformer/grpc_transformer.go +++ b/pkg/sources/transformer/grpc_transformer.go @@ -113,7 +113,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage * return true, nil }) if !success { - return nil, rpc.ApplyUDFErr{ + return nil, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err), InternalErr: rpc.InternalErr{ @@ -123,7 +123,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage * } } case sdkerr.NonRetryable: - return nil, rpc.ApplyUDFErr{ + return nil, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err), InternalErr: rpc.InternalErr{ @@ -132,7 +132,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage * }, } default: - return nil, rpc.ApplyUDFErr{ + return nil, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err), InternalErr: rpc.InternalErr{ diff --git a/pkg/sources/transformer/grpc_transformer_test.go b/pkg/sources/transformer/grpc_transformer_test.go index 66259e246e..c74a5a88ea 100644 --- a/pkg/sources/transformer/grpc_transformer_test.go +++ b/pkg/sources/transformer/grpc_transformer_test.go @@ -173,7 +173,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, rpc.ApplyUDFErr{ + assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: rpc.InternalErr{ @@ -227,7 +227,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, rpc.ApplyUDFErr{ + assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: rpc.InternalErr{ @@ -277,7 +277,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, rpc.ApplyUDFErr{ + assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: rpc.InternalErr{ @@ -377,7 +377,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, rpc.ApplyUDFErr{ + assert.ErrorIs(t, err, &rpc.ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: rpc.InternalErr{ diff --git a/pkg/udf/forward/forward_test.go b/pkg/udf/forward/forward_test.go index 3a0b2febbe..65c57ee05f 100644 --- a/pkg/udf/forward/forward_test.go +++ b/pkg/udf/forward/forward_test.go @@ -1601,7 +1601,7 @@ func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string, s string) ([]f } func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage) ([]*isb.WriteMessage, error) { - return nil, udfapplier.ApplyUDFErr{ + return nil, &udfapplier.ApplyUDFErr{ UserUDFErr: false, InternalErr: struct { Flag bool @@ -1613,7 +1613,7 @@ func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage func (f myForwardInternalErrTest) ApplyMapStream(_ context.Context, _ *isb.ReadMessage, writeMessagesCh chan<- isb.WriteMessage) error { close(writeMessagesCh) - return udfapplier.ApplyUDFErr{ + return &udfapplier.ApplyUDFErr{ UserUDFErr: false, InternalErr: struct { Flag bool diff --git a/pkg/udf/rpc/errors.go b/pkg/udf/rpc/errors.go index c711dcd09d..cfdffe5757 100644 --- a/pkg/udf/rpc/errors.go +++ b/pkg/udf/rpc/errors.go @@ -32,20 +32,20 @@ type InternalErr struct { } // IsUserUDFErr is true if the problem is due to the user code in the UDF. -func (e ApplyUDFErr) IsUserUDFErr() bool { +func (e *ApplyUDFErr) IsUserUDFErr() bool { return e.UserUDFErr } // IsInternalErr is true if this is a platform issue. This is a blocking error. -func (e ApplyUDFErr) IsInternalErr() bool { +func (e *ApplyUDFErr) IsInternalErr() bool { return e.InternalErr.Flag } -func (e ApplyUDFErr) Error() string { +func (e *ApplyUDFErr) Error() string { return fmt.Sprint(e.Message) } // Is checks if the error is of the same type -func (e ApplyUDFErr) Is(target error) bool { +func (e *ApplyUDFErr) Is(target error) bool { return target.Error() == e.Error() } diff --git a/pkg/udf/rpc/grpc_aligned_reduce.go b/pkg/udf/rpc/grpc_aligned_reduce.go index 51d0ccc023..1a774306eb 100644 --- a/pkg/udf/rpc/grpc_aligned_reduce.go +++ b/pkg/udf/rpc/grpc_aligned_reduce.go @@ -204,14 +204,14 @@ func createAlignedReduceRequest(windowRequest *window.TimedWindowRequest) *reduc } // convertToUdfError converts the error returned by the reduceFn to ApplyUDFErr -func convertToUdfError(err error) ApplyUDFErr { +func convertToUdfError(err error) error { // if any error happens in reduce // will exit and restart the numa container udfErr, _ := sdkerr.FromError(err) switch udfErr.ErrorKind() { case sdkerr.Retryable: // TODO: currently we don't handle retryable errors for reduce - return ApplyUDFErr{ + return &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err), InternalErr: InternalErr{ @@ -220,7 +220,7 @@ func convertToUdfError(err error) ApplyUDFErr { }, } case sdkerr.NonRetryable: - return ApplyUDFErr{ + return &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err), InternalErr: InternalErr{ @@ -229,7 +229,7 @@ func convertToUdfError(err error) ApplyUDFErr { }, } case sdkerr.Canceled: - return ApplyUDFErr{ + return &ApplyUDFErr{ UserUDFErr: false, Message: context.Canceled.Error(), InternalErr: InternalErr{ @@ -238,7 +238,7 @@ func convertToUdfError(err error) ApplyUDFErr { }, } default: - return ApplyUDFErr{ + return &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err), InternalErr: InternalErr{ diff --git a/pkg/udf/rpc/grpc_aligned_reduce_test.go b/pkg/udf/rpc/grpc_aligned_reduce_test.go index 1288c21bd1..1cdd00bf5c 100644 --- a/pkg/udf/rpc/grpc_aligned_reduce_test.go +++ b/pkg/udf/rpc/grpc_aligned_reduce_test.go @@ -199,7 +199,7 @@ func TestGRPCBasedUDF_AsyncReduceWithMockClient(t *testing.T) { break readLoop } if err != nil { - assert.ErrorIs(t, err, ApplyUDFErr{ + assert.ErrorIs(t, err, &ApplyUDFErr{ UserUDFErr: false, Message: err.Error(), InternalErr: InternalErr{ diff --git a/pkg/udf/rpc/grpc_map.go b/pkg/udf/rpc/grpc_map.go index 01d049c64a..5b6251907d 100644 --- a/pkg/udf/rpc/grpc_map.go +++ b/pkg/udf/rpc/grpc_map.go @@ -110,7 +110,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag return true, nil }) if !success { - return nil, ApplyUDFErr{ + return nil, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err), InternalErr: InternalErr{ @@ -120,7 +120,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag } } case sdkerr.NonRetryable: - return nil, ApplyUDFErr{ + return nil, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err), InternalErr: InternalErr{ @@ -129,7 +129,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag }, } default: - return nil, ApplyUDFErr{ + return nil, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err), InternalErr: InternalErr{ diff --git a/pkg/udf/rpc/grpc_map_test.go b/pkg/udf/rpc/grpc_map_test.go index 05fd5d5c2d..ea8c4b36af 100644 --- a/pkg/udf/rpc/grpc_map_test.go +++ b/pkg/udf/rpc/grpc_map_test.go @@ -183,7 +183,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, ApplyUDFErr{ + assert.ErrorIs(t, err, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: InternalErr{ @@ -233,7 +233,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, ApplyUDFErr{ + assert.ErrorIs(t, err, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: InternalErr{ @@ -337,7 +337,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) { ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, ) - assert.ErrorIs(t, err, ApplyUDFErr{ + assert.ErrorIs(t, err, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: InternalErr{ diff --git a/pkg/udf/rpc/grpc_mapstream.go b/pkg/udf/rpc/grpc_mapstream.go index e6287fb6fa..360422df17 100644 --- a/pkg/udf/rpc/grpc_mapstream.go +++ b/pkg/udf/rpc/grpc_mapstream.go @@ -89,7 +89,7 @@ func (u *GRPCBasedMapStream) ApplyMapStream(ctx context.Context, message *isb.Re errs.Go(func() error { err := u.client.MapStreamFn(ctx, d, responseCh) if err != nil { - err = ApplyUDFErr{ + err = &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("gRPC client.MapStreamFn failed, %s", err), InternalErr: InternalErr{ diff --git a/pkg/udf/rpc/grpc_mapstream_test.go b/pkg/udf/rpc/grpc_mapstream_test.go index 186cdfdd96..6329ec149f 100644 --- a/pkg/udf/rpc/grpc_mapstream_test.go +++ b/pkg/udf/rpc/grpc_mapstream_test.go @@ -176,7 +176,7 @@ func TestGRPCBasedUDF_BasicApplyStreamWithMockClient(t *testing.T) { }, ReadOffset: isb.SimpleStringOffset(func() string { return "0" }), }, writeMessageCh) - assert.ErrorIs(t, err, ApplyUDFErr{ + assert.ErrorIs(t, err, &ApplyUDFErr{ UserUDFErr: false, Message: fmt.Sprintf("%s", err), InternalErr: InternalErr{ diff --git a/pkg/udf/rpc/grpc_unaligned_reduce_test.go b/pkg/udf/rpc/grpc_unaligned_reduce_test.go index 947e3aaf7e..1b9c7dd1e2 100644 --- a/pkg/udf/rpc/grpc_unaligned_reduce_test.go +++ b/pkg/udf/rpc/grpc_unaligned_reduce_test.go @@ -172,7 +172,7 @@ func TestGRPCBasedUDF_BasicSessionReduceWithMockClient(t *testing.T) { break readLoop } if err != nil { - assert.ErrorIs(t, err, ApplyUDFErr{ + assert.ErrorIs(t, err, &ApplyUDFErr{ UserUDFErr: false, Message: err.Error(), InternalErr: InternalErr{ diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index e70fe9a3ea..11078a7f36 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -1,5 +1,3 @@ -//go:build test - /* Copyright 2022 The Numaproj Authors. @@ -342,6 +340,23 @@ func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClie return true, nil } +func (s *FunctionalSuite) TestFallbackSink() { + + w := s.Given().Pipeline("@testdata/simple-fallback.yaml"). + When(). + CreatePipelineAndWait() + defer w.DeletePipelineAndWait() + pipelineName := "simple-fallback" + + // send a message to the pipeline + w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("fallback-message"))) + + // wait for all the pods to come up + w.Expect().VertexPodsRunning() + + w.Expect().SinkContains("output", "fallback-message") +} + func TestFunctionalSuite(t *testing.T) { suite.Run(t, new(FunctionalSuite)) } diff --git a/test/e2e/testdata/simple-fallback.yaml b/test/e2e/testdata/simple-fallback.yaml new file mode 100644 index 0000000000..372fc4d29c --- /dev/null +++ b/test/e2e/testdata/simple-fallback.yaml @@ -0,0 +1,32 @@ +apiVersion: numaflow.numaproj.io/v1alpha1 +kind: Pipeline +metadata: + name: simple-fallback +spec: + vertices: + - name: in + source: + http: {} + - name: cat + scale: + min: 1 + udf: + builtin: + name: cat + - name: output + scale: + min: 1 + sink: + udsink: + container: + image: quay.io/numaio/numaflow-go/fb-sink-log:stable + imagePullPolicy: Always + fallback: + udsink: + container: + image: quay.io/numaio/numaflow-sink/redis-e2e-test-sink:latest + edges: + - from: in + to: cat + - from: cat + to: output