Skip to content

Commit

Permalink
feat: watermark for sources (numaproj#159)
Browse files Browse the repository at this point in the history
* feat: watermark for sources
  • Loading branch information
whynowy authored Sep 13, 2022
1 parent 9a85860 commit 7d41129
Show file tree
Hide file tree
Showing 52 changed files with 940 additions and 717 deletions.
12 changes: 9 additions & 3 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3934,16 +3934,22 @@ spec:
type: array
watermark:
default:
propagate: false
disabled: false
description: Watermark enables watermark progression across the entire
pipeline. Updating this after the pipeline has been created will
have no impact and will be ignored. To make the pipeline honor any
changes to the setting, the pipeline should be recreated.
properties:
propagate:
disabled:
default: false
description: Propagate toggles the watermark propagation.
description: Disabled toggles the watermark propagation, defaults
to false.
type: boolean
maxDelay:
default: 0s
description: Maximum delay allowed for watermark calculation,
defaults to "0s", which means no delay.
type: string
type: object
type: object
status:
Expand Down
12 changes: 9 additions & 3 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8660,16 +8660,22 @@ spec:
type: array
watermark:
default:
propagate: false
disabled: false
description: Watermark enables watermark progression across the entire
pipeline. Updating this after the pipeline has been created will
have no impact and will be ignored. To make the pipeline honor any
changes to the setting, the pipeline should be recreated.
properties:
propagate:
disabled:
default: false
description: Propagate toggles the watermark propagation.
description: Disabled toggles the watermark propagation, defaults
to false.
type: boolean
maxDelay:
default: 0s
description: Maximum delay allowed for watermark calculation,
defaults to "0s", which means no delay.
type: string
type: object
type: object
status:
Expand Down
12 changes: 9 additions & 3 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8660,16 +8660,22 @@ spec:
type: array
watermark:
default:
propagate: false
disabled: false
description: Watermark enables watermark progression across the entire
pipeline. Updating this after the pipeline has been created will
have no impact and will be ignored. To make the pipeline honor any
changes to the setting, the pipeline should be recreated.
properties:
propagate:
disabled:
default: false
description: Propagate toggles the watermark propagation.
description: Disabled toggles the watermark propagation, defaults
to false.
type: boolean
maxDelay:
default: 0s
description: Maximum delay allowed for watermark calculation,
defaults to "0s", which means no delay.
type: string
type: object
type: object
status:
Expand Down
13 changes: 11 additions & 2 deletions controllers/vertex/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,17 @@ func (r *vertexReconciler) buildPodSpec(vertex *dfv1.Vertex, pl *dfv1.Pipeline,
}

podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, corev1.EnvVar{
Name: dfv1.EnvWatermarkOn,
Value: fmt.Sprintf("%t", pl.Spec.Watermark.Propagate),
Name: dfv1.EnvWatermarkDisabled,
Value: fmt.Sprintf("%t", pl.Spec.Watermark.Disabled),
})

maxDelay := "0s"
if x := pl.Spec.Watermark.MaxDelay; x != nil {
maxDelay = x.Duration.String()
}
podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, corev1.EnvVar{
Name: dfv1.EnvWatermarkMaxDelay,
Value: maxDelay,
})

return podSpec, nil
Expand Down
6 changes: 6 additions & 0 deletions controllers/vertex/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ func Test_BuildPodSpec(t *testing.T) {
assert.Contains(t, envNames, dfv1.EnvISBSvcSentinelMaster)
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisUser)
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisURL)
assert.Contains(t, envNames, dfv1.EnvWatermarkDisabled)
assert.Contains(t, envNames, dfv1.EnvWatermarkMaxDelay)
argStr := strings.Join(spec.InitContainers[0].Args, " ")
assert.Contains(t, argStr, "--buffers=")
for _, b := range testObj.GetToBuffers() {
Expand Down Expand Up @@ -221,6 +223,8 @@ func Test_BuildPodSpec(t *testing.T) {
assert.Contains(t, envNames, dfv1.EnvISBSvcSentinelMaster)
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisUser)
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisURL)
assert.Contains(t, envNames, dfv1.EnvWatermarkDisabled)
assert.Contains(t, envNames, dfv1.EnvWatermarkMaxDelay)
argStr := strings.Join(spec.InitContainers[0].Args, " ")
assert.Contains(t, argStr, "--buffers=")
for _, b := range testObj.GetFromBuffers() {
Expand Down Expand Up @@ -296,6 +300,8 @@ func Test_BuildPodSpec(t *testing.T) {
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisSentinelPassword)
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisUser)
assert.Contains(t, envNames, dfv1.EnvISBSvcRedisURL)
assert.Contains(t, envNames, dfv1.EnvWatermarkDisabled)
assert.Contains(t, envNames, dfv1.EnvWatermarkMaxDelay)
udfEnvNames := []string{}
udfEnvValues := []string{}
for _, e := range spec.Containers[1].Env {
Expand Down
18 changes: 16 additions & 2 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3748,12 +3748,26 @@ Description
<tbody>
<tr>
<td>
<code>propagate</code></br> <em> bool </em>
<code>disabled</code></br> <em> bool </em>
</td>
<td>
<em>(Optional)</em>
<p>
Disabled toggles the watermark propagation, defaults to false.
</p>
</td>
</tr>
<tr>
<td>
<code>maxDelay</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
Propagate toggles the watermark propagation.
Maximum delay allowed for watermark calculation, defaults to “0s”, which
means no delay.
</p>
</td>
</tr>
Expand Down
File renamed without changes
8 changes: 8 additions & 0 deletions docs/sources/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ When posting data to the HTTP Source, an optional HTTP header `x-numaflow-id` ca
curl -kq -X POST -H "x-numaflow-id: ${id}" -d "hello world" ${http-source-url}
```

## x-numaflow-event-time

By default, the time of the date coming to the HTTP source is used as the event time, it could be set by putting an HTTP header `x-numaflow-event-time` with value of the number of seconds elapsed since January 1, 1970 UTC.

```sh
curl -kq -X POST -H "x-numaflow-event-time: 1663006726" -d "hello world" ${http-source-url}
```

## Auth

A `Bearer` token can be configured to prevent the HTTP Source from being accessed by unexpected clients. To do so, a Kubernetes Secret needs to be created to store the token, and the valid clients also need to include the token in its HTTP request header.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/numaproj/numaflow

go 1.18
go 1.19

require (
github.com/Masterminds/sprig/v3 v3.2.2
Expand Down
2 changes: 1 addition & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ edit_uri: edit/main/docs/
strict: true
theme:
name: material
favicon: assets/numa.svg
favicon: assets/numaproj.svg
font:
text: Roboto
code: Roboto Mono
Expand Down
6 changes: 4 additions & 2 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ const (
KeyReplica = "numaflow.numaproj.io/replica"

// ID key in the header of sources like http
KeyMetaID = "x-numaflow-id"
KeyMetaID = "x-numaflow-id"
KeyMetaEventTime = "x-numaflow-event-time"

DefaultISBSvcName = "default"

Expand Down Expand Up @@ -86,7 +87,8 @@ const (
EnvDebug = "NUMAFLOW_DEBUG"

// Watermark
EnvWatermarkOn = "NUMAFLOW_WATERMARK_ON"
EnvWatermarkDisabled = "NUMAFLOW_WATERMARK_DISABLED"
EnvWatermarkMaxDelay = "NUMAFLOW_WATERMARK_MAX_DELAY"

PathVarRun = "/var/run/numaflow"
VertexMetricsPort = 2469
Expand Down
Loading

0 comments on commit 7d41129

Please sign in to comment.