Skip to content

Commit

Permalink
Don't require minimal for failpoint injection period
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Apr 20, 2024
1 parent e246bb8 commit f285330
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 67 deletions.
61 changes: 25 additions & 36 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ import (
)

const (
triggerTimeout = time.Minute
waitBetweenFailpointTriggers = time.Second
failpointInjectionsCount = 1
failpointInjectionsRetries = 3
triggerTimeout = time.Minute
)

var (
Expand Down Expand Up @@ -78,45 +75,37 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
return nil
}

func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
var err error
successes := 0
failures := 0
for successes < failpointInjectionsCount && failures < failpointInjectionsRetries {
time.Sleep(waitBetweenFailpointTriggers)

lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
}

lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
err = failpoint.Inject(ctx, t, lg, clus)
if err != nil {
select {
case <-ctx.Done():
return fmt.Errorf("Triggering failpoints timed out, err: %v", ctx.Err())
default:
}
lg.Info("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
failures++
continue
}

lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
}

successes++
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return nil, fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
}
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
start := time.Since(baseTime)
err = failpoint.Inject(ctx, t, lg, clus)
if err != nil {
lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
return nil, fmt.Errorf("failed triggering failpoint, err: %v", err)
}
if successes < failpointInjectionsCount || failures >= failpointInjectionsRetries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return nil, fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
}
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime)

return &InjectionReport{
Start: start,
End: end,
Name: failpoint.Name(),
}, nil
}

return nil
type InjectionReport struct {
Start, End time.Duration
Name string
}

func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error {
Expand Down
15 changes: 10 additions & 5 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,32 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
defer cancel()
g := errgroup.Group{}
var operationReport, watchReport []report.ClientReport
finishTraffic := make(chan struct{})
failpointInjected := make(chan failpoint.InjectionReport, 1)

// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
baseTime := time.Now()
ids := identity.NewIDProvider()
g.Go(func() error {
defer close(finishTraffic)
err := failpoint.Inject(ctx, t, lg, clus, s.failpoint)
defer close(failpointInjected)
// Give some time for traffic to reach qps target before injecting failpoint.
time.Sleep(time.Second)
fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime)
if err != nil {
t.Error(err)
cancel()
}
// Give some time for traffic to reach qps target after injecting failpoint.
time.Sleep(time.Second)
lg.Info("Finished injecting failures")
if fr != nil {
failpointInjected <- *fr
}
return nil
})
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, failpointInjected, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
Expand Down
11 changes: 0 additions & 11 deletions tests/robustness/report/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ type ClientReport struct {
Watch []model.WatchOperation
}

func (r ClientReport) SuccessfulOperations() int {
count := 0
for _, op := range r.KeyValue {
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" {
count++
}
}
return count
}

func (r ClientReport) WatchEventCount() int {
count := 0
for _, op := range r.Watch {
Expand Down
78 changes: 63 additions & 15 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"golang.org/x/time/rate"

"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

Expand All @@ -50,15 +52,14 @@ var (
}
)

func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport {
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.InjectionReport, baseTime time.Time, ids identity.Provider) []report.ClientReport {
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()

lm := identity.NewLeaseIDStorage()
reports := []report.ClientReport{}
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200)

startTime := time.Now()
cc, err := NewClient(endpoints, ids, baseTime)
if err != nil {
t.Fatal(err)
Expand All @@ -71,6 +72,9 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
finish := make(chan struct{})
lg.Info("Start traffic")
startTime := time.Since(baseTime)
for i := 0; i < profile.ClientCount; i++ {
wg.Add(1)
c, nerr := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
Expand All @@ -87,8 +91,20 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
mux.Unlock()
}(c)
}
var fr *failpoint.InjectionReport
select {
case frp, ok := <-failpointInjected:
if !ok {
t.Fatalf("Failed to collect failpoint report")
}
fr = &frp
case <-ctx.Done():
t.Fatalf("Traffic finished before failure was injected: %s", ctx.Err())
}
close(finish)
wg.Wait()
endTime := time.Now()
lg.Info("Finished traffic")
endTime := time.Since(baseTime)

time.Sleep(time.Second)
// Ensure that last operation succeeds
Expand All @@ -98,23 +114,55 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
reports = append(reports, cc.Report())

var totalOperations int
var successfulOperations int
for _, r := range reports {
totalOperations += len(r.KeyValue)
successfulOperations += r.SuccessfulOperations()
}
lg.Info("Recorded operations", zap.Int("operations", totalOperations), zap.Float64("successRate", float64(successfulOperations)/float64(totalOperations)))
totalStats := calculateStats(reports, startTime, endTime)
beforeFailpointStats := calculateStats(reports, startTime, fr.Start)
duringFailpointStats := calculateStats(reports, fr.Start, fr.End)
afterFailpointStats := calculateStats(reports, fr.End, endTime)

lg.Info("Reporting complete traffic", zap.Int("successes", totalStats.successes), zap.Int("failures", totalStats.failures), zap.Float64("successRate", totalStats.successRate()), zap.Duration("period", totalStats.period), zap.Float64("qps", totalStats.QPS()))
lg.Info("Reporting traffic before failure injection", zap.Int("successes", beforeFailpointStats.successes), zap.Int("failures", beforeFailpointStats.failures), zap.Float64("successRate", beforeFailpointStats.successRate()), zap.Duration("period", beforeFailpointStats.period), zap.Float64("qps", beforeFailpointStats.QPS()))
lg.Info("Reporting traffic during failure injection", zap.Int("successes", duringFailpointStats.successes), zap.Int("failures", duringFailpointStats.failures), zap.Float64("successRate", duringFailpointStats.successRate()), zap.Duration("period", duringFailpointStats.period), zap.Float64("qps", duringFailpointStats.QPS()))
lg.Info("Reporting traffic after failure injection", zap.Int("successes", afterFailpointStats.successes), zap.Int("failures", afterFailpointStats.failures), zap.Float64("successRate", afterFailpointStats.successRate()), zap.Duration("period", afterFailpointStats.period), zap.Float64("qps", afterFailpointStats.QPS()))

period := endTime.Sub(startTime)
qps := float64(successfulOperations) / period.Seconds()
lg.Info("Traffic from successful requests", zap.Float64("qps", qps), zap.Int("operations", successfulOperations), zap.Duration("period", period))
if qps < profile.MinimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", profile.MinimalQPS, qps)
if beforeFailpointStats.QPS() < profile.MinimalQPS {
t.Errorf("Requiring minimal %f qps before failpoint injection for test results to be reliable, got %f qps", profile.MinimalQPS, beforeFailpointStats.QPS())
}
// TODO: Validate QPS post failpoint injection to ensure the that we sufficiently cover period when cluster recovers.
return reports
}

func calculateStats(reports []report.ClientReport, start, end time.Duration) (ts trafficStats) {
ts.period = end - start

for _, r := range reports {
for _, op := range r.KeyValue {
if op.Call < start.Nanoseconds() || op.Call > end.Nanoseconds() {
continue
}
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" {
ts.successes++
} else {
ts.failures++
}
}
}
return ts
}

type trafficStats struct {
successes, failures int
period time.Duration
}

func (ts *trafficStats) successRate() float64 {
return float64(ts.successes) / float64(ts.successes+ts.failures)
}

func (ts *trafficStats) QPS() float64 {
return float64(ts.successes) / ts.period.Seconds()
}

type Profile struct {
Name string
MinimalQPS float64
Expand Down

0 comments on commit f285330

Please sign in to comment.