From a8d782e1c208cf91fc41072d27d1f6c53a7f498f Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Thu, 15 Aug 2024 11:12:17 -0400 Subject: [PATCH] address comments Signed-off-by: Keran Yang --- pkg/sinker/examples/redis-sink/main.go | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/pkg/sinker/examples/redis-sink/main.go b/pkg/sinker/examples/redis-sink/main.go index 862dbbf4..38a12b46 100644 --- a/pkg/sinker/examples/redis-sink/main.go +++ b/pkg/sinker/examples/redis-sink/main.go @@ -2,7 +2,6 @@ package main import ( "context" - "fmt" "log" "os" @@ -28,24 +27,8 @@ func (rds *redisTestSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk // We use redis hashes to store messages. // Each field of a hash is the content of a message and value of the field is the no. of occurrences of the message. var hashKey string - - monoVertexName := os.Getenv("NUMAFLOW_MONO_VERTEX_NAME") - pipelineName := os.Getenv("NUMAFLOW_PIPELINE_NAME") - vertexName := os.Getenv("NUMAFLOW_VERTEX_NAME") - - if !(monoVertexName != "" || (pipelineName != "" && vertexName != "")) { - log.Println("Error: Either mono vertex name or pipeline name and vertex name must be set in the environment variables.") - return result.Append(sinksdk.ResponseFailure(d.ID(), fmt.Sprintf("either mono vertex name or pipeline name and vertex name must be set in the environment variables."))) - } - - if monoVertexName != "" { - // if the mono vertex name environment variable is set, we are in a mono vertex - // in this case, we use the mono vertex name as the hash key - hashKey = monoVertexName - } else { - // if the mono vertex name environment variable is not set, we are in a pipeline - // in this case, the name of a hash is pipelineName:sinkName. - hashKey = fmt.Sprintf("%s:%s", pipelineName, vertexName) + if hashKey = os.Getenv("SINK_HASH_KEY"); hashKey == "" { + log.Panicf("SINK_HASH_KEY environment variable is not set.") } err := client.HIncrBy(ctx, hashKey, string(d.Value()), 1).Err() if err != nil { @@ -53,7 +36,6 @@ func (rds *redisTestSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk } else { log.Printf("Incremented by 1 the no. of occurrences of %s under hash key %s\n", string(d.Value()), hashKey) } - id := d.ID() result = result.Append(sinksdk.ResponseOK(id)) }