Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Keran Yang <[email protected]>
  • Loading branch information
KeranYang committed Aug 15, 2024
1 parent 14ef458 commit a8d782e
Showing 1 changed file with 2 additions and 20 deletions.
22 changes: 2 additions & 20 deletions pkg/sinker/examples/redis-sink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"fmt"
"log"
"os"

Expand All @@ -28,32 +27,15 @@ func (rds *redisTestSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk
// We use redis hashes to store messages.
// Each field of a hash is the content of a message and value of the field is the no. of occurrences of the message.
var hashKey string

monoVertexName := os.Getenv("NUMAFLOW_MONO_VERTEX_NAME")
pipelineName := os.Getenv("NUMAFLOW_PIPELINE_NAME")
vertexName := os.Getenv("NUMAFLOW_VERTEX_NAME")

if !(monoVertexName != "" || (pipelineName != "" && vertexName != "")) {
log.Println("Error: Either mono vertex name or pipeline name and vertex name must be set in the environment variables.")
return result.Append(sinksdk.ResponseFailure(d.ID(), fmt.Sprintf("either mono vertex name or pipeline name and vertex name must be set in the environment variables.")))
}

if monoVertexName != "" {
// if the mono vertex name environment variable is set, we are in a mono vertex
// in this case, we use the mono vertex name as the hash key
hashKey = monoVertexName
} else {
// if the mono vertex name environment variable is not set, we are in a pipeline
// in this case, the name of a hash is pipelineName:sinkName.
hashKey = fmt.Sprintf("%s:%s", pipelineName, vertexName)
if hashKey = os.Getenv("SINK_HASH_KEY"); hashKey == "" {
log.Panicf("SINK_HASH_KEY environment variable is not set.")
}
err := client.HIncrBy(ctx, hashKey, string(d.Value()), 1).Err()
if err != nil {
log.Println("Set Error - ", err)
} else {
log.Printf("Incremented by 1 the no. of occurrences of %s under hash key %s\n", string(d.Value()), hashKey)
}

id := d.ID()
result = result.Append(sinksdk.ResponseOK(id))
}
Expand Down

0 comments on commit a8d782e

Please sign in to comment.