diff --git a/pkg/mapper/examples/cat_sleep/main.go b/pkg/mapper/examples/cat_sleep/main.go index 89dc6609..7fc63a5e 100644 --- a/pkg/mapper/examples/cat_sleep/main.go +++ b/pkg/mapper/examples/cat_sleep/main.go @@ -3,16 +3,33 @@ package main import ( "context" "log" + "os" + "strconv" "time" "github.com/numaproj/numaflow-go/pkg/mapper" ) +const DEFAULT_SLEEP_SECONDS = 10 + type CatSleep struct { } func (e *CatSleep) Map(ctx context.Context, keys []string, d mapper.Datum) mapper.Messages { - time.Sleep(10 * time.Second) + + sleepSeconds := DEFAULT_SLEEP_SECONDS + secondsString := os.Getenv("SLEEP_SECONDS") + if secondsString == "" { + log.Printf("SLEEP_SECONDS environment variable not set, using default %d seconds\n", DEFAULT_SLEEP_SECONDS) + } else { + val, err := strconv.Atoi(secondsString) + if err != nil { + log.Printf("SLEEP_SECONDS environment variable %q not an int, using default %d seconds\n", secondsString, DEFAULT_SLEEP_SECONDS) + } else { + sleepSeconds = val + } + } + time.Sleep(time.Duration(sleepSeconds) * time.Second) return mapper.MessagesBuilder().Append(mapper.NewMessage(d.Value()).WithKeys(keys)) }