Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Fallback Sink E2E test #1681

Merged
merged 4 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/sinks/forward/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,18 @@ func (df *DataForward) writeToSink(ctx context.Context, sinkWriter sinker.SinkWr
if err = errs[idx]; err != nil {
// if we are asked to write to fallback sink, check if the fallback sink is configured,
// and we are not already in the fallback sink write path.
if errors.As(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter != nil && !isFbSinkWriter {
fallbackMessages = append(fallbackMessages, msg)
continue
}

// if we are asked to write to fallback but no fallback sink is configured, we will retry the messages to the same sink
if errors.As(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
if errors.Is(err, &udsink.WriteToFallbackErr) && df.opts.fbSinkWriter == nil {
df.opts.logger.Error("Asked to write to fallback but no fallback sink is configured, retrying the message to the same sink")
}

// if we are asked to write to fallback sink inside the fallback sink, we will retry the messages to the fallback sink
if errors.As(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
if errors.Is(err, &udsink.WriteToFallbackErr) && isFbSinkWriter {
df.opts.logger.Error("Asked to write to fallback sink inside the fallback sink, retrying the message to fallback sink")
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/udsink/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ type InternalErr struct {
}

// IsUserUDSinkErr is true if the problem is due to the user code in the UDSink.
func (e ApplyUDSinkErr) IsUserUDSinkErr() bool {
func (e *ApplyUDSinkErr) IsUserUDSinkErr() bool {
return e.UserUDSinkErr
}

// IsInternalErr is true if this is a platform issue. This is a blocking error.
func (e ApplyUDSinkErr) IsInternalErr() bool {
func (e *ApplyUDSinkErr) IsInternalErr() bool {
return e.InternalErr.Flag
}

func (e ApplyUDSinkErr) Error() string {
func (e *ApplyUDSinkErr) Error() string {
return fmt.Sprint(e.Message)
}
10 changes: 5 additions & 5 deletions pkg/sinks/udsink/udsink_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
response, err := u.client.SinkFn(ctx, requests)
if err != nil {
for i := range requests {
errs[i] = ApplyUDSinkErr{
errs[i] = &ApplyUDSinkErr{
UserUDSinkErr: false,
Message: fmt.Sprintf("gRPC client.SinkFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -112,19 +112,19 @@ func (u *UDSgRPCBasedUDSink) ApplySink(ctx context.Context, requests []*sinkpb.S
}
for i, m := range requests {
if r, existing := resMap[m.GetId()]; !existing {
errs[i] = NotFoundErr
errs[i] = &NotFoundErr
} else {
if r.GetStatus() == sinkpb.Status_FAILURE {
if r.GetErrMsg() != "" {
errs[i] = ApplyUDSinkErr{
errs[i] = &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: r.GetErrMsg(),
}
} else {
errs[i] = UnknownUDSinkErr
errs[i] = &UnknownUDSinkErr
}
} else if r.GetStatus() == sinkpb.Status_FALLBACK {
errs[i] = WriteToFallbackErr
errs[i] = &WriteToFallbackErr
} else {
errs[i] = nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sinks/udsink/udsink_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
assert.Equal(t, 2, len(gotErrList))
assert.Equal(t, nil, gotErrList[0])

assert.Equal(t, ApplyUDSinkErr{
assert.Equal(t, &ApplyUDSinkErr{
UserUDSinkErr: true,
Message: "mock sink message error",
}, gotErrList[1])
Expand Down Expand Up @@ -158,15 +158,15 @@ func Test_gRPCBasedUDSink_ApplyWithMockClient(t *testing.T) {
u := NewMockUDSgRPCBasedUDSink(mockClient)
gotErrList := u.ApplySink(ctx, testDatumList)
expectedErrList := []error{
ApplyUDSinkErr{
&ApplyUDSinkErr{
UserUDSinkErr: false,
Message: "gRPC client.SinkFn failed, failed to execute c.grpcClt.SinkFn(): mock SinkFn error",
InternalErr: InternalErr{
Flag: true,
MainCarDown: false,
},
},
ApplyUDSinkErr{
&ApplyUDSinkErr{
UserUDSinkErr: false,
Message: "gRPC client.SinkFn failed, failed to execute c.grpcClt.SinkFn(): mock SinkFn error",
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string, s string) ([]f
}

func (f myForwardInternalErrTest) ApplyTransform(_ context.Context, _ *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return nil, udfapplier.ApplyUDFErr{
return nil, &udfapplier.ApplyUDFErr{
UserUDFErr: false,
InternalErr: struct {
Flag bool
Expand Down
6 changes: 3 additions & 3 deletions pkg/sources/transformer/grpc_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage *
return true, nil
})
if !success {
return nil, rpc.ApplyUDFErr{
return nil, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err),
InternalErr: rpc.InternalErr{
Expand All @@ -123,7 +123,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage *
}
}
case sdkerr.NonRetryable:
return nil, rpc.ApplyUDFErr{
return nil, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err),
InternalErr: rpc.InternalErr{
Expand All @@ -132,7 +132,7 @@ func (u *GRPCBasedTransformer) ApplyTransform(ctx context.Context, readMessage *
},
}
default:
return nil, rpc.ApplyUDFErr{
return nil, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.SourceTransformFn failed, %s", err),
InternalErr: rpc.InternalErr{
Expand Down
8 changes: 4 additions & 4 deletions pkg/sources/transformer/grpc_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down Expand Up @@ -277,7 +277,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestGRPCBasedTransformer_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, rpc.ApplyUDFErr{
assert.ErrorIs(t, err, &rpc.ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: rpc.InternalErr{
Expand Down
4 changes: 2 additions & 2 deletions pkg/udf/forward/forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,7 +1601,7 @@ func (f myForwardInternalErrTest) WhereTo(_ []string, _ []string, s string) ([]f
}

func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage) ([]*isb.WriteMessage, error) {
return nil, udfapplier.ApplyUDFErr{
return nil, &udfapplier.ApplyUDFErr{
UserUDFErr: false,
InternalErr: struct {
Flag bool
Expand All @@ -1613,7 +1613,7 @@ func (f myForwardInternalErrTest) ApplyMap(_ context.Context, _ *isb.ReadMessage

func (f myForwardInternalErrTest) ApplyMapStream(_ context.Context, _ *isb.ReadMessage, writeMessagesCh chan<- isb.WriteMessage) error {
close(writeMessagesCh)
return udfapplier.ApplyUDFErr{
return &udfapplier.ApplyUDFErr{
UserUDFErr: false,
InternalErr: struct {
Flag bool
Expand Down
8 changes: 4 additions & 4 deletions pkg/udf/rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ type InternalErr struct {
}

// IsUserUDFErr is true if the problem is due to the user code in the UDF.
func (e ApplyUDFErr) IsUserUDFErr() bool {
func (e *ApplyUDFErr) IsUserUDFErr() bool {
return e.UserUDFErr
}

// IsInternalErr is true if this is a platform issue. This is a blocking error.
func (e ApplyUDFErr) IsInternalErr() bool {
func (e *ApplyUDFErr) IsInternalErr() bool {
return e.InternalErr.Flag
}

func (e ApplyUDFErr) Error() string {
func (e *ApplyUDFErr) Error() string {
return fmt.Sprint(e.Message)
}

// Is checks if the error is of the same type
func (e ApplyUDFErr) Is(target error) bool {
func (e *ApplyUDFErr) Is(target error) bool {
return target.Error() == e.Error()
}
10 changes: 5 additions & 5 deletions pkg/udf/rpc/grpc_aligned_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,14 @@ func createAlignedReduceRequest(windowRequest *window.TimedWindowRequest) *reduc
}

// convertToUdfError converts the error returned by the reduceFn to ApplyUDFErr
func convertToUdfError(err error) ApplyUDFErr {
func convertToUdfError(err error) error {
// if any error happens in reduce
// will exit and restart the numa container
udfErr, _ := sdkerr.FromError(err)
switch udfErr.ErrorKind() {
case sdkerr.Retryable:
// TODO: currently we don't handle retryable errors for reduce
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -220,7 +220,7 @@ func convertToUdfError(err error) ApplyUDFErr {
},
}
case sdkerr.NonRetryable:
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -229,7 +229,7 @@ func convertToUdfError(err error) ApplyUDFErr {
},
}
case sdkerr.Canceled:
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: context.Canceled.Error(),
InternalErr: InternalErr{
Expand All @@ -238,7 +238,7 @@ func convertToUdfError(err error) ApplyUDFErr {
},
}
default:
return ApplyUDFErr{
return &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.ReduceFn failed, %s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_aligned_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func TestGRPCBasedUDF_AsyncReduceWithMockClient(t *testing.T) {
break readLoop
}
if err != nil {
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: err.Error(),
InternalErr: InternalErr{
Expand Down
6 changes: 3 additions & 3 deletions pkg/udf/rpc/grpc_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag
return true, nil
})
if !success {
return nil, ApplyUDFErr{
return nil, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -120,7 +120,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag
}
}
case sdkerr.NonRetryable:
return nil, ApplyUDFErr{
return nil, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: InternalErr{
Expand All @@ -129,7 +129,7 @@ func (u *GRPCBasedMap) ApplyMap(ctx context.Context, readMessage *isb.ReadMessag
},
}
default:
return nil, ApplyUDFErr{
return nil, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapFn failed, %s", err),
InternalErr: InternalErr{
Expand Down
6 changes: 3 additions & 3 deletions pkg/udf/rpc/grpc_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down Expand Up @@ -233,7 +233,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestGRPCBasedMap_BasicApplyWithMockClient(t *testing.T) {
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
},
)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_mapstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (u *GRPCBasedMapStream) ApplyMapStream(ctx context.Context, message *isb.Re
errs.Go(func() error {
err := u.client.MapStreamFn(ctx, d, responseCh)
if err != nil {
err = ApplyUDFErr{
err = &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("gRPC client.MapStreamFn failed, %s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_mapstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGRPCBasedUDF_BasicApplyStreamWithMockClient(t *testing.T) {
},
ReadOffset: isb.SimpleStringOffset(func() string { return "0" }),
}, writeMessageCh)
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: fmt.Sprintf("%s", err),
InternalErr: InternalErr{
Expand Down
2 changes: 1 addition & 1 deletion pkg/udf/rpc/grpc_unaligned_reduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestGRPCBasedUDF_BasicSessionReduceWithMockClient(t *testing.T) {
break readLoop
}
if err != nil {
assert.ErrorIs(t, err, ApplyUDFErr{
assert.ErrorIs(t, err, &ApplyUDFErr{
UserUDFErr: false,
Message: err.Error(),
InternalErr: InternalErr{
Expand Down
19 changes: 17 additions & 2 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build test

/*
Copyright 2022 The Numaproj Authors.

Expand Down Expand Up @@ -342,6 +340,23 @@ func isWatermarkProgressing(ctx context.Context, client *daemonclient.DaemonClie
return true, nil
}

func (s *FunctionalSuite) TestFallbackSink() {

w := s.Given().Pipeline("@testdata/simple-fallback.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "simple-fallback"

// send a message to the pipeline
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("fallback-message")))

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

w.Expect().SinkContains("output", "fallback-message")
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
Loading
Loading