|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "flag" |
| 5 | + "fmt" |
| 6 | + hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" |
| 7 | + "github.com/mediocregopher/radix/v3" |
| 8 | + "golang.org/x/time/rate" |
| 9 | + "log" |
| 10 | + "math" |
| 11 | + "math/rand" |
| 12 | + "os" |
| 13 | + "os/signal" |
| 14 | + "sync" |
| 15 | + "sync/atomic" |
| 16 | + "time" |
| 17 | +) |
| 18 | + |
| 19 | +var totalCommands uint64 |
| 20 | +var totalErrors uint64 |
| 21 | +var latencies *hdrhistogram.Histogram |
| 22 | + |
| 23 | +const Inf = rate.Limit(math.MaxFloat64) |
| 24 | +const charset = "abcdefghijklmnopqrstuvwxyz" |
| 25 | + |
| 26 | +func stringWithCharset(length int, charset string) string { |
| 27 | + |
| 28 | + b := make([]byte, length) |
| 29 | + for i := range b { |
| 30 | + b[i] = charset[rand.Intn(len(charset))] |
| 31 | + } |
| 32 | + return string(b) |
| 33 | +} |
| 34 | + |
| 35 | +func main() { |
| 36 | + host := flag.String("h", "127.0.0.1", "Server hostname.") |
| 37 | + port := flag.Int("p", 12000, "Server port.") |
| 38 | + rps := flag.Int64("rps", 0, "Max rps. If 0 no limit is applied and the DB is stressed up to maximum.") |
| 39 | + password := flag.String("a", "", "Password for Redis Auth.") |
| 40 | + seed := flag.Int64("random-seed", 12345, "random seed to be used.") |
| 41 | + clients := flag.Uint64("c", 50, "number of clients.") |
| 42 | + keyspacelen := flag.Uint64("r", 1000000, "keyspace length.") |
| 43 | + numberRequests := flag.Uint64("n", 10000000, "Total number of requests. Only used in case of -mode=query") |
| 44 | + debug := flag.Int("debug", 0, "Client debug level.") |
| 45 | + benchMode := flag.String("mode", "", "Bechmark mode. One of [load,query]. `load` will populate the db with sorted sets. `query` will run the zrangebylexscore command .") |
| 46 | + perKeyElmRangeStart := flag.Uint64("key-elements-min", 1, "Use zipfian random-sized items in the specified range (min-max).") |
| 47 | + perKeyElmRangeEnd := flag.Uint64("key-elements-max", 10, "Use zipfian random-sized items in the specified range (min-max).") |
| 48 | + perKeyElmDataSize := flag.Uint64("d", 10, "Data size of each sorted set element.") |
| 49 | + pipeline := flag.Uint64("pipeline", 1, "Redis pipeline value.") |
| 50 | + flag.Parse() |
| 51 | + if *benchMode != "load" && *benchMode != "query" { |
| 52 | + log.Fatal("Please specify a valid -mode option. Either `load` or `query`") |
| 53 | + } |
| 54 | + isLoad := false |
| 55 | + if *benchMode == "load" { |
| 56 | + isLoad = true |
| 57 | + } |
| 58 | + var requestRate = Inf |
| 59 | + var requestBurst = 1 |
| 60 | + useRateLimiter := false |
| 61 | + if *rps != 0 { |
| 62 | + requestRate = rate.Limit(*rps) |
| 63 | + requestBurst = int(*clients) |
| 64 | + useRateLimiter = true |
| 65 | + } |
| 66 | + |
| 67 | + var rateLimiter = rate.NewLimiter(requestRate, requestBurst) |
| 68 | + totalCmds := *numberRequests |
| 69 | + if isLoad { |
| 70 | + totalCmds = *keyspacelen |
| 71 | + } |
| 72 | + samplesPerClient := totalCmds / *clients |
| 73 | + client_update_tick := 1 |
| 74 | + latencies = hdrhistogram.New(1, 90000000, 3) |
| 75 | + opts := make([]radix.DialOpt, 0) |
| 76 | + if *password != "" { |
| 77 | + opts = append(opts, radix.DialAuthPass(*password)) |
| 78 | + } |
| 79 | + connectionStr := fmt.Sprintf("%s:%d", *host, *port) |
| 80 | + stopChan := make(chan struct{}) |
| 81 | + // a WaitGroup for the goroutines to tell us they've stopped |
| 82 | + wg := sync.WaitGroup{} |
| 83 | + fmt.Printf("Total clients: %d. Commands per client: %d Total commands: %d\n", *clients, samplesPerClient, totalCmds) |
| 84 | + fmt.Printf("Using random seed: %d\n", *seed) |
| 85 | + rand.Seed(*seed) |
| 86 | + var standalone *radix.Pool = getStandaloneConn(connectionStr, opts, *clients) |
| 87 | + for client_id := 1; uint64(client_id) <= *clients; client_id++ { |
| 88 | + wg.Add(1) |
| 89 | + keyspace_client_start := uint64(client_id-1) * samplesPerClient |
| 90 | + keyspace_client_end := uint64(client_id) * samplesPerClient |
| 91 | + if uint64(client_id) == *clients { |
| 92 | + keyspace_client_end = uint64(*keyspacelen) |
| 93 | + } |
| 94 | + go loadGoRoutime(standalone, keyspace_client_start, keyspace_client_end, samplesPerClient, *pipeline, *perKeyElmDataSize, *perKeyElmRangeStart, *perKeyElmRangeEnd, int(*debug), &wg, useRateLimiter, rateLimiter) |
| 95 | + } |
| 96 | + |
| 97 | + // listen for C-c |
| 98 | + c := make(chan os.Signal, 1) |
| 99 | + signal.Notify(c, os.Interrupt) |
| 100 | + |
| 101 | + tick := time.NewTicker(time.Duration(client_update_tick) * time.Second) |
| 102 | + closed, _, duration, totalMessages, _ := updateCLI(tick, c, totalCmds) |
| 103 | + messageRate := float64(totalMessages) / float64(duration.Seconds()) |
| 104 | + p50IngestionMs := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 |
| 105 | + p95IngestionMs := float64(latencies.ValueAtQuantile(95.0)) / 1000.0 |
| 106 | + p99IngestionMs := float64(latencies.ValueAtQuantile(99.0)) / 1000.0 |
| 107 | + |
| 108 | + fmt.Printf("\n") |
| 109 | + fmt.Printf("#################################################\n") |
| 110 | + fmt.Printf("Total Duration %.3f Seconds\n", duration.Seconds()) |
| 111 | + fmt.Printf("Total Errors %d\n", totalErrors) |
| 112 | + fmt.Printf("Throughput summary: %.0f requests per second\n", messageRate) |
| 113 | + fmt.Printf("Latency summary (msec):\n") |
| 114 | + fmt.Printf(" %9s %9s %9s\n", "p50", "p95", "p99") |
| 115 | + fmt.Printf(" %9.3f %9.3f %9.3f\n", p50IngestionMs, p95IngestionMs, p99IngestionMs) |
| 116 | + |
| 117 | + if closed { |
| 118 | + return |
| 119 | + } |
| 120 | + |
| 121 | + // tell the goroutine to stop |
| 122 | + close(stopChan) |
| 123 | + // and wait for them both to reply back |
| 124 | + wg.Wait() |
| 125 | +} |
| 126 | + |
| 127 | +func loadGoRoutime(conn radix.Client, keyspace_client_start uint64, keyspace_client_end uint64, samplesPerClient uint64, pipeline uint64, perKeyElmDataSize uint64, perKeyElmRangeStart uint64, perKeyElmRangeEnd uint64, debug int, w *sync.WaitGroup, useRateLimiter bool, rateLimiter *rate.Limiter) { |
| 128 | + defer w.Done() |
| 129 | + var i uint64 = 0 |
| 130 | + var keypos uint64 = keyspace_client_start |
| 131 | + cmds := make([]radix.CmdAction, pipeline) |
| 132 | + for i < samplesPerClient { |
| 133 | + if useRateLimiter { |
| 134 | + r := rateLimiter.ReserveN(time.Now(), int(pipeline)) |
| 135 | + time.Sleep(r.Delay()) |
| 136 | + } |
| 137 | + var j uint64 = 0 |
| 138 | + for ; j < pipeline; j++ { |
| 139 | + cmdArgs := []string{fmt.Sprintf("zbench:%d", keypos)} |
| 140 | + nElements := rand.Int63n(int64(perKeyElmRangeEnd-perKeyElmRangeStart)) + int64(perKeyElmRangeStart) |
| 141 | + var k int64 = 0 |
| 142 | + for ; k < nElements; k++ { |
| 143 | + cmdArgs = append(cmdArgs, fmt.Sprintf("%f", rand.Float32()), stringWithCharset(int(perKeyElmDataSize), charset)) |
| 144 | + } |
| 145 | + cmds[j] = radix.Cmd(nil, "ZADD", cmdArgs...) |
| 146 | + keypos++ |
| 147 | + } |
| 148 | + var err error |
| 149 | + startT := time.Now() |
| 150 | + err = conn.Do(radix.Pipeline(cmds...)) |
| 151 | + endT := time.Now() |
| 152 | + if err != nil { |
| 153 | + log.Fatalf("Received an error with the following command(s): %v, error: %v", cmds, err) |
| 154 | + } |
| 155 | + duration := endT.Sub(startT) |
| 156 | + err = latencies.RecordValue(duration.Microseconds()) |
| 157 | + if err != nil { |
| 158 | + log.Fatalf("Received an error while recording latencies: %v", err) |
| 159 | + } |
| 160 | + atomic.AddUint64(&totalCommands, uint64(pipeline)) |
| 161 | + i = i + pipeline |
| 162 | + } |
| 163 | +} |
| 164 | + |
| 165 | +func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64) (bool, time.Time, time.Duration, uint64, []float64) { |
| 166 | + |
| 167 | + start := time.Now() |
| 168 | + prevTime := time.Now() |
| 169 | + prevMessageCount := uint64(0) |
| 170 | + messageRateTs := []float64{} |
| 171 | + fmt.Printf("%26s %7s %25s %25s %7s %25s %25s\n", "Test time", " ", "Total Commands", "Total Errors", "", "Command Rate", "p50 lat. (msec)") |
| 172 | + for { |
| 173 | + select { |
| 174 | + case <-tick.C: |
| 175 | + { |
| 176 | + now := time.Now() |
| 177 | + took := now.Sub(prevTime) |
| 178 | + messageRate := float64(totalCommands-prevMessageCount) / float64(took.Seconds()) |
| 179 | + completionPercent := float64(totalCommands) / float64(message_limit) * 100.0 |
| 180 | + completionPercentStr := fmt.Sprintf("[%3.1f%%]", completionPercent) |
| 181 | + errorPercent := float64(totalErrors) / float64(totalCommands) * 100.0 |
| 182 | + |
| 183 | + p50 := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 |
| 184 | + |
| 185 | + if prevMessageCount == 0 && totalCommands != 0 { |
| 186 | + start = time.Now() |
| 187 | + } |
| 188 | + if totalCommands != 0 { |
| 189 | + messageRateTs = append(messageRateTs, messageRate) |
| 190 | + } |
| 191 | + prevMessageCount = totalCommands |
| 192 | + prevTime = now |
| 193 | + |
| 194 | + fmt.Printf("%25.0fs %s %25d %25d [%3.1f%%] %25.2f %25.2f\t", time.Since(start).Seconds(), completionPercentStr, totalCommands, totalErrors, errorPercent, messageRate, p50) |
| 195 | + fmt.Printf("\r") |
| 196 | + if message_limit > 0 && totalCommands >= uint64(message_limit) { |
| 197 | + return true, start, time.Since(start), totalCommands, messageRateTs |
| 198 | + } |
| 199 | + |
| 200 | + break |
| 201 | + } |
| 202 | + |
| 203 | + case <-c: |
| 204 | + fmt.Println("\nreceived Ctrl-c - shutting down") |
| 205 | + return true, start, time.Since(start), totalCommands, messageRateTs |
| 206 | + } |
| 207 | + } |
| 208 | +} |
0 commit comments