From ff638c0c0e9258b53ff257119c04a917f0e73646 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 19 Apr 2024 22:25:40 +0200 Subject: [PATCH] Don't require qps requirements failpoint injection period Signed-off-by: Marek Siarkowicz --- tests/robustness/failpoint/failpoint.go | 61 ++++++++----------- tests/robustness/main_test.go | 13 ++-- tests/robustness/report/client.go | 11 ---- tests/robustness/traffic/traffic.go | 80 ++++++++++++++++++++----- 4 files changed, 98 insertions(+), 67 deletions(-) diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index d52b64f4acb1..c81fb5fe9338 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -29,10 +29,7 @@ import ( ) const ( - triggerTimeout = time.Minute - waitBetweenFailpointTriggers = time.Second - failpointInjectionsCount = 1 - failpointInjectionsRetries = 3 + triggerTimeout = time.Minute ) var ( @@ -78,45 +75,37 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error { return nil } -func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint) error { +func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) { ctx, cancel := context.WithTimeout(ctx, triggerTimeout) defer cancel() var err error - successes := 0 - failures := 0 - for successes < failpointInjectionsCount && failures < failpointInjectionsRetries { - time.Sleep(waitBetweenFailpointTriggers) - - lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", failpoint.Name())) - if err = verifyClusterHealth(ctx, t, clus); err != nil { - return fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err) - } - - lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name())) - err = failpoint.Inject(ctx, t, lg, clus) - if err != nil { - select { - case <-ctx.Done(): - return fmt.Errorf("Triggering failpoints timed out, err: %v", ctx.Err()) - default: - } - lg.Info("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err)) - failures++ - continue - } - lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", failpoint.Name())) - if err = verifyClusterHealth(ctx, t, clus); err != nil { - return fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err) - } - - successes++ + if err = verifyClusterHealth(ctx, t, clus); err != nil { + return nil, fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err) + } + lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name())) + start := time.Since(baseTime) + err = failpoint.Inject(ctx, t, lg, clus) + if err != nil { + lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err)) + return nil, fmt.Errorf("failed triggering failpoint, err: %v", err) } - if successes < failpointInjectionsCount || failures >= failpointInjectionsRetries { - t.Errorf("failed to trigger failpoints enough times, err: %v", err) + if err = verifyClusterHealth(ctx, t, clus); err != nil { + return nil, fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err) } + lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name())) + end := time.Since(baseTime) + + return &InjectionReport{ + Start: start, + End: end, + Name: failpoint.Name(), + }, nil +} - return nil +type InjectionReport struct { + Start, End time.Duration + Name string } func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error { diff --git a/tests/robustness/main_test.go b/tests/robustness/main_test.go index 8f3ad5f6a86e..4f3d7ad22917 100644 --- a/tests/robustness/main_test.go +++ b/tests/robustness/main_test.go @@ -110,27 +110,30 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu defer cancel() g := errgroup.Group{} var operationReport, watchReport []report.ClientReport - finishTraffic := make(chan struct{}) + failpointInjected := make(chan failpoint.InjectionReport, 1) // using baseTime time-measuring operation to get monotonic clock reading // see https://github.com/golang/go/blob/master/src/time/time.go#L17 baseTime := time.Now() ids := identity.NewIDProvider() g.Go(func() error { - defer close(finishTraffic) - err := failpoint.Inject(ctx, t, lg, clus, s.failpoint) + defer close(failpointInjected) + time.Sleep(time.Second) + fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime) if err != nil { t.Error(err) cancel() } time.Sleep(time.Second) - lg.Info("Finished injecting failures") + if fr != nil { + failpointInjected <- *fr + } return nil }) maxRevisionChan := make(chan int64, 1) g.Go(func() error { defer close(maxRevisionChan) - operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids) + operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, failpointInjected, baseTime, ids) maxRevision := operationsMaxRevision(operationReport) maxRevisionChan <- maxRevision lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision)) diff --git a/tests/robustness/report/client.go b/tests/robustness/report/client.go index 477e3c4d33b0..cf0b24f52d66 100644 --- a/tests/robustness/report/client.go +++ b/tests/robustness/report/client.go @@ -36,17 +36,6 @@ type ClientReport struct { Watch []model.WatchOperation } -func (r ClientReport) SuccessfulOperations() int { - count := 0 - for _, op := range r.KeyValue { - resp := op.Output.(model.MaybeEtcdResponse) - if resp.Error == "" { - count++ - } - } - return count -} - func (r ClientReport) WatchEventCount() int { count := 0 for _, op := range r.Watch { diff --git a/tests/robustness/traffic/traffic.go b/tests/robustness/traffic/traffic.go index 23ca9e653b28..2816a683487f 100644 --- a/tests/robustness/traffic/traffic.go +++ b/tests/robustness/traffic/traffic.go @@ -24,7 +24,9 @@ import ( "golang.org/x/time/rate" "go.etcd.io/etcd/tests/v3/framework/e2e" + "go.etcd.io/etcd/tests/v3/robustness/failpoint" "go.etcd.io/etcd/tests/v3/robustness/identity" + "go.etcd.io/etcd/tests/v3/robustness/model" "go.etcd.io/etcd/tests/v3/robustness/report" ) @@ -50,7 +52,7 @@ var ( } ) -func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport { +func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.InjectionReport, baseTime time.Time, ids identity.Provider) []report.ClientReport { mux := sync.Mutex{} endpoints := clus.EndpointsGRPC() @@ -58,7 +60,6 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 reports := []report.ClientReport{} limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200) - startTime := time.Now() cc, err := NewClient(endpoints, ids, baseTime) if err != nil { t.Fatal(err) @@ -71,6 +72,9 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } wg := sync.WaitGroup{} nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency) + finish := make(chan struct{}) + lg.Info("Start traffic") + startTime := time.Since(baseTime) for i := 0; i < profile.ClientCount; i++ { wg.Add(1) c, nerr := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime) @@ -87,8 +91,20 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 mux.Unlock() }(c) } + var fr *failpoint.InjectionReport + select { + case frp, ok := <-failpointInjected: + if !ok { + t.Fatalf("Failed to collect failpoint report") + } + fr = &frp + case <-ctx.Done(): + t.Fatalf("Traffic finished before failure was injected: %s", ctx.Err()) + } + close(finish) wg.Wait() - endTime := time.Now() + lg.Info("Finished traffic") + endTime := time.Since(baseTime) time.Sleep(time.Second) // Ensure that last operation succeeds @@ -98,23 +114,57 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2 } reports = append(reports, cc.Report()) - var totalOperations int - var successfulOperations int - for _, r := range reports { - totalOperations += len(r.KeyValue) - successfulOperations += r.SuccessfulOperations() - } - lg.Info("Recorded operations", zap.Int("operations", totalOperations), zap.Float64("successRate", float64(successfulOperations)/float64(totalOperations))) + totalStats := calculateStats(reports, startTime, endTime) + beforeFailpointStats := calculateStats(reports, startTime, fr.Start) + duringFailpointStats := calculateStats(reports, fr.Start, fr.End) + afterFailpointStats := calculateStats(reports, fr.End, endTime) + + lg.Info("Reporting complete traffic", zap.Int("successes", totalStats.successes), zap.Int("failures", totalStats.failures), zap.Float64("successRate", totalStats.successRate()), zap.Duration("period", totalStats.period), zap.Float64("qps", totalStats.QPS())) + lg.Info("Reporting traffic before failure injection", zap.Int("successes", beforeFailpointStats.successes), zap.Int("failures", beforeFailpointStats.failures), zap.Float64("successRate", beforeFailpointStats.successRate()), zap.Duration("period", beforeFailpointStats.period), zap.Float64("qps", beforeFailpointStats.QPS())) + lg.Info("Reporting traffic during failure injection", zap.Int("successes", duringFailpointStats.successes), zap.Int("failures", duringFailpointStats.failures), zap.Float64("successRate", duringFailpointStats.successRate()), zap.Duration("period", duringFailpointStats.period), zap.Float64("qps", duringFailpointStats.QPS())) + lg.Info("Reporting traffic after failure injection", zap.Int("successes", afterFailpointStats.successes), zap.Int("failures", afterFailpointStats.failures), zap.Float64("successRate", afterFailpointStats.successRate()), zap.Duration("period", afterFailpointStats.period), zap.Float64("qps", afterFailpointStats.QPS())) - period := endTime.Sub(startTime) - qps := float64(successfulOperations) / period.Seconds() - lg.Info("Traffic from successful requests", zap.Float64("qps", qps), zap.Int("operations", successfulOperations), zap.Duration("period", period)) - if qps < profile.MinimalQPS { - t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", profile.MinimalQPS, qps) + if beforeFailpointStats.QPS() < profile.MinimalQPS { + t.Errorf("Requiring minimal %f qps before failpoint injection for test results to be reliable, got %f qps", profile.MinimalQPS, beforeFailpointStats.QPS()) + } + if afterFailpointStats.QPS() < profile.MinimalQPS { + t.Errorf("Requiring minimal %f qps after failpoint injection for test results to be reliable, got %f qps", profile.MinimalQPS, afterFailpointStats.QPS()) } return reports } +func calculateStats(reports []report.ClientReport, start, end time.Duration) (ts trafficStats) { + ts.period = end - start + + for _, r := range reports { + for _, op := range r.KeyValue { + if op.Call < start.Nanoseconds() || op.Call > end.Nanoseconds() { + continue + } + resp := op.Output.(model.MaybeEtcdResponse) + if resp.Error == "" { + ts.successes++ + } else { + ts.failures++ + } + } + } + return ts +} + +type trafficStats struct { + successes, failures int + period time.Duration +} + +func (ts *trafficStats) successRate() float64 { + return float64(ts.successes) / float64(ts.successes+ts.failures) +} + +func (ts *trafficStats) QPS() float64 { + return float64(ts.successes) / ts.period.Seconds() +} + type Profile struct { Name string MinimalQPS float64