Skip to content
17 changes: 17 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,23 @@ func (s *FunctionalSuite) TestFallbackSink() {
w.Expect().RedisSinkContains("simple-fallback-output", "fallback-message")
}

func (s *FunctionalSuite) TestOnSuccessSink() {

w := s.Given().Pipeline("@testdata/simple-on-success.yaml").
When().
CreatePipelineAndWait()
defer w.DeletePipelineAndWait()
pipelineName := "simple-on-success"

// wait for all the pods to come up
w.Expect().VertexPodsRunning()

// send a message to the pipeline
w.SendMessageTo(pipelineName, "in", NewHttpPostRequest().WithBody([]byte("on-success-message")))

w.Expect().RedisSinkContains("simple-on-success-output", "on-success-message")
}

func (s *FunctionalSuite) TestExponentialBackoffRetryStrategyForPipeline() {
w := s.Given().Pipeline("@testdata/simple-pipeline-with-retry-strategy.yaml").
When().
Expand Down
40 changes: 40 additions & 0 deletions test/e2e/testdata/simple-on-success.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
name: simple-on-success
spec:
vertices:
- name: in
limits:
readBatchSize: 1
source:
http: {}
- name: udf
scale:
min: 1
udf:
container:
image: quay.io/numaio/numaflow-go/map-cat:stable # A UDF which simply cats the message
imagePullPolicy: IfNotPresent
- name: output
scale:
min: 1
sink:
udsink:
container:
image: quay.io/numaio/numaflow-go/on-success-log:stable
imagePullPolicy: IfNotPresent
onSuccess:
udsink:
container:
image: quay.io/numaio/numaflow-rs/redis-sink:stable
imagePullPolicy: IfNotPresent
env:
- name: SINK_HASH_KEY
# The key is set in the format of "pipeline_name-vertex_name"
value: "simple-on-success-output"
edges:
- from: in
to: udf
- from: udf
to: output