Skip to content

Commit

Permalink
chore: Fallback Sink E2E test (#1681)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored Apr 18, 2024
1 parent c8370e2 commit 16abb83
Show file tree
Hide file tree
Showing 18 changed files with 92 additions and 45 deletions.
6 changes: 3 additions & 3 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,18 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
if err = errs[idx]; err != nil {
// if we are asked to write to fallback sink, check if the fallback sink is configured,
// and we are not already in the fallback sink write path.
if errors.As(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
fallbackMessages = append(fallbackMessages, msg)
continue
}

// if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink
if errors.As(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink")
}

// if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink
if errors.As(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/udsink/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ type InternalErr struct {
}

// IsUserUDSinkErr is true if the problem is due to the user code in the UDSink.
func (e ApplyUDSinkErr) IsUserUDSinkErr() bool {
func (e *ApplyUDSinkErr) IsUserUDSinkErr() bool {
return e.UserUDSinkErr
}

// IsInternalErr is true if this is a platform issue. This is a blocking error.
func (e ApplyUDSinkErr) IsInternalErr() bool {
func (e *ApplyUDSinkErr) IsInternalErr() bool {
return e.InternalErr.Flag
}

func (e ApplyUDSinkErr) Error() string {
func (e *ApplyUDSinkErr) Error() string {
return fmt.Sprint(e.Message)
}
10 changes: 5 additions & 5 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
response, err := u.client.SinkFn(ctx, requests)
if err != nil {
for i := range requests {
errs[i] = ApplyUDSinkErr{
errs[i] = &ApplyUDSinkErr{
UserUDSinkErr: false,
Message: fmt.Sprintf("gRPC client.SinkFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -112,19 +112,19 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
}
for i, m := range requests {
if r, existing := resMap[m.GetId()]; !existing {
errs[i] = NotFoundErr
errs[i] = &NotFoundErr
} else {
if r.GetStatus() == sinkpb.Status_FAILURE {
if r.GetErrMsg() != "" {
errs[i] = ApplyUDSinkErr{
errs[i] = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: r.GetErrMsg(),
}
} else {
errs[i] = UnknownUDSinkErr
errs[i] = &UnknownUDSinkErr
}
} else if r.GetStatus() == sinkpb.Status_FALLBACK {
errs[i] = WriteToFallbackErr
errs[i] = &WriteToFallbackErr
} else {
errs[i] = nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/udsink/udsink_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
assert.Equal(t, 2, len(gotErrList))
assert.Equal(t, nil, gotErrList[0])

assert.Equal(t, ApplyUDSinkErr{
assert.Equal(t, &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "mock sink message error",
}, gotErrList[1])
Expand Down Expand Up @@ -158,15 +158,15 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
u := NewMockUDSgRPCBasedUDSink(mockClient)
gotErrList := u.ApplySink(ctx, testDatumList)
expectedErrList := []error{
ApplyUDSinkErr{
&ApplyUDSinkErr{
UserUDSinkErr: false,
Message: "gRPC client.SinkFn failed, failed to execute c.grpcClt.SinkFn(): mock SinkFn error",
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
},
ApplyUDSinkErr{
&ApplyUDSinkErr{
UserUDSinkErr: false,
Message: "gRPC client.SinkFn failed, failed to execute c.grpcClt.SinkFn(): mock SinkFn error",
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string, s string) ([]f
}

func (f myForwardInternalErrTest) ApplyTransform(_ context.Context, _ *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return nil, udfapplier.ApplyUDFErr{
return nil, &udfapplier.ApplyUDFErr{
UserUDFErr: false,
InternalErr: struct {
Flag bool
Expand Down
6 changes: 3 additions & 3 deletions pkg/sources/transformer/grpc_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage *
return true, nil
})
if !success {
return nil, rpc.ApplyUDFErr{
return nil, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err),
InternalErr: rpc.InternalErr{
Expand All @@ -123,7 +123,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage *
}
}
case sdkerr.NonRetryable:
return nil, rpc.ApplyUDFErr{
return nil, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err),
InternalErr: rpc.InternalErr{
Expand All @@ -132,7 +132,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage *
},
}
default:
return nil, rpc.ApplyUDFErr{
return nil, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err),
InternalErr: rpc.InternalErr{
Expand Down
8 changes: 4 additions & 4 deletions pkg/sources/transformer/grpc_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down
4 changes: 2 additions & 2 deletions pkg/udf/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,7 @@ func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string, s string) ([]f
}

func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return nil, udfapplier.ApplyUDFErr{
return nil, &udfapplier.ApplyUDFErr{
UserUDFErr: false,
InternalErr: struct {
Flag bool
Expand All @@ -1613,7 +1613,7 @@ func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage

func (f myForwardInternalErrTest) ApplyMapStream(_ context.Context, _ *isb.ReadMessage, writeMessagesCh chan<- isb.WriteMessage) error {
close(writeMessagesCh)
return udfapplier.ApplyUDFErr{
return &udfapplier.ApplyUDFErr{
UserUDFErr: false,
InternalErr: struct {
Flag bool
Expand Down
8 changes: 4 additions & 4 deletions pkg/udf/rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ type InternalErr struct {
}

// IsUserUDFErr is true if the problem is due to the user code in the UDF.
func (e ApplyUDFErr) IsUserUDFErr() bool {
func (e *ApplyUDFErr) IsUserUDFErr() bool {
return e.UserUDFErr
}

// IsInternalErr is true if this is a platform issue. This is a blocking error.
func (e ApplyUDFErr) IsInternalErr() bool {
func (e *ApplyUDFErr) IsInternalErr() bool {
return e.InternalErr.Flag
}

func (e ApplyUDFErr) Error() string {
func (e *ApplyUDFErr) Error() string {
return fmt.Sprint(e.Message)
}

// Is checks if the error is of the same type
func (e ApplyUDFErr) Is(target error) bool {
func (e *ApplyUDFErr) Is(target error) bool {
return target.Error() == e.Error()
}
10 changes: 5 additions & 5 deletions pkg/udf/rpc/grpc_aligned_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,14 @@ func createAlignedReduceRequest(windowRequest *window.TimedWindowRequest) *reduc
}

// convertToUdfError converts the error returned by the reduceFn to ApplyUDFErr
func convertToUdfError(err error) ApplyUDFErr {
func convertToUdfError(err error) error {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -220,7 +220,7 @@ func convertToUdfError(err error) ApplyUDFErr {
},
}
case sdkerr.NonRetryable:
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -229,7 +229,7 @@ func convertToUdfError(err error) ApplyUDFErr {
},
}
case sdkerr.Canceled:
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: context.Canceled.Error(),
InternalErr: InternalErr{
Expand All @@ -238,7 +238,7 @@ func convertToUdfError(err error) ApplyUDFErr {
},
}
default:
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_aligned_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestGRPCBasedUDF_AsyncReduceWithMockClient(t *testing.T) {
break readLoop
}
if err != nil {
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: err.Error(),
InternalErr: InternalErr{
Expand Down
6 changes: 3 additions & 3 deletions pkg/udf/rpc/grpc_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag
return true, nil
})
if !success {
return nil, ApplyUDFErr{
return nil, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -120,7 +120,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag
}
}
case sdkerr.NonRetryable:
return nil, ApplyUDFErr{
return nil, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -129,7 +129,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag
},
}
default:
return nil, ApplyUDFErr{
return nil, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: InternalErr{
Expand Down
6 changes: 3 additions & 3 deletions pkg/udf/rpc/grpc_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_mapstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (u *GRPCBasedMapStream) ApplyMapStream(ctx context.Context, message *isb.Re
errs.Go(func() error {
err := u.client.MapStreamFn(ctx, d, responseCh)
if err != nil {
err = ApplyUDFErr{
err = &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapStreamFn failed, %s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_mapstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGRPCBasedUDF_BasicApplyStreamWithMockClient(t *testing.T) {
},
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
}, writeMessageCh)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_unaligned_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestGRPCBasedUDF_BasicSessionReduceWithMockClient(t *testing.T) {
break readLoop
}
if err != nil {
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: err.Error(),
InternalErr: InternalErr{
Expand Down
19 changes: 17 additions & 2 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.
Expand Down Expand Up @@ -342,6 +340,23 @@ func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClie
return true, nil
}

func (s *FunctionalSuite) TestFallbackSink() {

w := s.Given().Pipeline("@testdata/simple-fallback.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "simple-fallback"

// send a message to the pipeline
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("fallback-message")))

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

w.Expect().SinkContains("output", "fallback-message")
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
Loading

0 comments on commit 16abb83

Please sign in to comment.