Skip to content

Commit

Permalink
chore: enable testing of mono vertex
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Aug 14, 2024
1 parent 7a3bad8 commit 30c29f9
Showing 1 changed file with 12 additions and 4 deletions.
16 changes: 12 additions & 4 deletions pkg/sinker/examples/redis-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,22 @@ func (rds *redisTestSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk
_ = d.Watermark()

// We use redis hashes to store messages.
// The name of a hash is pipelineName:sinkName.
// Each field of a hash is the content of a message and value of the field is the no. of occurrences of the message.
hkey := fmt.Sprintf("%s:%s", os.Getenv("NUMAFLOW_PIPELINE_NAME"), os.Getenv("NUMAFLOW_VERTEX_NAME"))
err := client.HIncrBy(ctx, hkey, string(d.Value()), 1).Err()
var hashKey string
if os.Getenv("NUMAFLOW_MONO_VERTEX_NAME") != "" {
// if the mono vertex name environment variable is set, we are in a mono vertex
// in this case, we use the mono vertex name as the hash key
hashKey = os.Getenv("NUMAFLOW_MONO_VERTEX_NAME")
} else {
// if the mono vertex name environment variable is not set, we are in a pipeline
// in this case, the name of a hash is pipelineName:sinkName.
hashKey = fmt.Sprintf("%s:%s", os.Getenv("NUMAFLOW_PIPELINE_NAME"), os.Getenv("NUMAFLOW_VERTEX_NAME"))
}
err := client.HIncrBy(ctx, hashKey, string(d.Value()), 1).Err()
if err != nil {
log.Println("Set Error - ", err)
} else {
log.Printf("Incremented by 1 the no. of occurrences of %s under hash key %s\n", string(d.Value()), hkey)
log.Printf("Incremented by 1 the no. of occurrences of %s under hash key %s\n", string(d.Value()), hashKey)
}

id := d.ID()
Expand Down

0 comments on commit 30c29f9

Please sign in to comment.