From 14ef45868650d90b38bc3e89c80a340a58e25d88 Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Wed, 14 Aug 2024 17:33:45 -0400 Subject: [PATCH] . Signed-off-by: Keran Yang --- pkg/sinker/examples/redis-sink/main.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/sinker/examples/redis-sink/main.go b/pkg/sinker/examples/redis-sink/main.go index dca584a5..862dbbf4 100644 --- a/pkg/sinker/examples/redis-sink/main.go +++ b/pkg/sinker/examples/redis-sink/main.go @@ -28,14 +28,24 @@ func (rds *redisTestSink) Sink(ctx context.Context, datumStreamCh <-chan sinksdk // We use redis hashes to store messages. // Each field of a hash is the content of a message and value of the field is the no. of occurrences of the message. var hashKey string - if os.Getenv("NUMAFLOW_MONO_VERTEX_NAME") != "" { + + monoVertexName := os.Getenv("NUMAFLOW_MONO_VERTEX_NAME") + pipelineName := os.Getenv("NUMAFLOW_PIPELINE_NAME") + vertexName := os.Getenv("NUMAFLOW_VERTEX_NAME") + + if !(monoVertexName != "" || (pipelineName != "" && vertexName != "")) { + log.Println("Error: Either mono vertex name or pipeline name and vertex name must be set in the environment variables.") + return result.Append(sinksdk.ResponseFailure(d.ID(), fmt.Sprintf("either mono vertex name or pipeline name and vertex name must be set in the environment variables."))) + } + + if monoVertexName != "" { // if the mono vertex name environment variable is set, we are in a mono vertex // in this case, we use the mono vertex name as the hash key - hashKey = os.Getenv("NUMAFLOW_MONO_VERTEX_NAME") + hashKey = monoVertexName } else { // if the mono vertex name environment variable is not set, we are in a pipeline // in this case, the name of a hash is pipelineName:sinkName. - hashKey = fmt.Sprintf("%s:%s", os.Getenv("NUMAFLOW_PIPELINE_NAME"), os.Getenv("NUMAFLOW_VERTEX_NAME")) + hashKey = fmt.Sprintf("%s:%s", pipelineName, vertexName) } err := client.HIncrBy(ctx, hashKey, string(d.Value()), 1).Err() if err != nil {