Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Aug 14, 2024
1 parent 30c29f9 commit 14ef458
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions pkg/sinker/examples/redis-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,24 @@ func (rds *redisTestSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk
// We use redis hashes to store messages.
// Each field of a hash is the content of a message and value of the field is the no. of occurrences of the message.
var hashKey string
if os.Getenv("NUMAFLOW_MONO_VERTEX_NAME") != "" {

monoVertexName := os.Getenv("NUMAFLOW_MONO_VERTEX_NAME")
pipelineName := os.Getenv("NUMAFLOW_PIPELINE_NAME")
vertexName := os.Getenv("NUMAFLOW_VERTEX_NAME")

if !(monoVertexName != "" || (pipelineName != "" && vertexName != "")) {
log.Println("Error: Either mono vertex name or pipeline name and vertex name must be set in the environment variables.")
return result.Append(sinksdk.ResponseFailure(d.ID(), fmt.Sprintf("either mono vertex name or pipeline name and vertex name must be set in the environment variables.")))
}

if monoVertexName != "" {
// if the mono vertex name environment variable is set, we are in a mono vertex
// in this case, we use the mono vertex name as the hash key
hashKey = os.Getenv("NUMAFLOW_MONO_VERTEX_NAME")
hashKey = monoVertexName
} else {
// if the mono vertex name environment variable is not set, we are in a pipeline
// in this case, the name of a hash is pipelineName:sinkName.
hashKey = fmt.Sprintf("%s:%s", os.Getenv("NUMAFLOW_PIPELINE_NAME"), os.Getenv("NUMAFLOW_VERTEX_NAME"))
hashKey = fmt.Sprintf("%s:%s", pipelineName, vertexName)
}
err := client.HIncrBy(ctx, hashKey, string(d.Value()), 1).Err()
if err != nil {
Expand Down

0 comments on commit 14ef458

Please sign in to comment.