Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't require minimal for failpoint injection period #17825

Merged
merged 1 commit into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 25 additions & 36 deletions tests/robustness/failpoint/failpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ import (
)

const (
triggerTimeout = time.Minute
waitBetweenFailpointTriggers = time.Second
failpointInjectionsCount = 1
failpointInjectionsRetries = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the retries removed? was it ever useful?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced as a band aid solution to flakes. It reduced problem a little, but there were still some issues with failpoint that retries didn't help. Over time we improved, especially after it we discovered issue with process execution.

Hard to say if removing it will expose some issues, we should see them in CI.

Copy link
Member Author

@serathius serathius Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked logs form one of robustness test runs https://github.com/etcd-io/etcd/actions/runs/8781749612. Didn't find any "Failed to trigger failpoint" logs.

triggerTimeout = time.Minute
)

var (
Expand Down Expand Up @@ -78,45 +75,37 @@ func Validate(clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
return nil
}

func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint) error {
func Inject(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, failpoint Failpoint, baseTime time.Time) (*InjectionReport, error) {
ctx, cancel := context.WithTimeout(ctx, triggerTimeout)
defer cancel()
var err error
successes := 0
failures := 0
for successes < failpointInjectionsCount && failures < failpointInjectionsRetries {
time.Sleep(waitBetweenFailpointTriggers)

lg.Info("Verifying cluster health before failpoint", zap.String("failpoint", failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
}

lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
err = failpoint.Inject(ctx, t, lg, clus)
if err != nil {
select {
case <-ctx.Done():
return fmt.Errorf("Triggering failpoints timed out, err: %v", ctx.Err())
default:
}
lg.Info("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
failures++
continue
}

lg.Info("Verifying cluster health after failpoint", zap.String("failpoint", failpoint.Name()))
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
}

successes++
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return nil, fmt.Errorf("failed to verify cluster health before failpoint injection, err: %v", err)
}
lg.Info("Triggering failpoint", zap.String("failpoint", failpoint.Name()))
start := time.Since(baseTime)
err = failpoint.Inject(ctx, t, lg, clus)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the original retry here just in case that the failpoint http call timeouts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to retry http calls and not whole failpoints. Reason is that failpoint seems simple and retryable as it's just a single Inject function. Underneath it's a multi stage stateful process, with some stages not retryiable or at least not from the begging. For example a simple KILL failpoint, first needs to kill the process, wait for it to exit, and start it back again. If we failed or wait, can we really retry and kill it again? What about failure on start?

Failpoint injection has more nuances making them not easy to blindly retry, and trying to make them externally retryiable makes the internal code unneseserly complicated. Would prefer we failpoints we able to make the decision how to handle their internal failures and whether they can retry on their own

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. It sounds good to me.

if err != nil {
lg.Error("Failed to trigger failpoint", zap.String("failpoint", failpoint.Name()), zap.Error(err))
return nil, fmt.Errorf("failed triggering failpoint, err: %v", err)
}
if successes < failpointInjectionsCount || failures >= failpointInjectionsRetries {
t.Errorf("failed to trigger failpoints enough times, err: %v", err)
if err = verifyClusterHealth(ctx, t, clus); err != nil {
return nil, fmt.Errorf("failed to verify cluster health after failpoint injection, err: %v", err)
}
lg.Info("Finished triggering failpoint", zap.String("failpoint", failpoint.Name()))
end := time.Since(baseTime)

return &InjectionReport{
Start: start,
End: end,
Name: failpoint.Name(),
}, nil
}

return nil
type InjectionReport struct {
Start, End time.Duration
Name string
}

func verifyClusterHealth(ctx context.Context, _ *testing.T, clus *e2e.EtcdProcessCluster) error {
Expand Down
15 changes: 10 additions & 5 deletions tests/robustness/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,32 @@ func (s testScenario) run(ctx context.Context, t *testing.T, lg *zap.Logger, clu
defer cancel()
g := errgroup.Group{}
var operationReport, watchReport []report.ClientReport
finishTraffic := make(chan struct{})
failpointInjected := make(chan failpoint.InjectionReport, 1)

// using baseTime time-measuring operation to get monotonic clock reading
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
baseTime := time.Now()
ids := identity.NewIDProvider()
g.Go(func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not for this PR, but we should probably get rid of the errgroup.Group, probably a good idea to just use a wait groups and goroutines.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? This code was changed to use errgroup to make error handling cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but iiuc errgroups are useful only if you return an error, we always return nil afaict.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense.

defer close(finishTraffic)
err := failpoint.Inject(ctx, t, lg, clus, s.failpoint)
defer close(failpointInjected)
// Give some time for traffic to reach qps target before injecting failpoint.
time.Sleep(time.Second)
serathius marked this conversation as resolved.
Show resolved Hide resolved
fr, err := failpoint.Inject(ctx, t, lg, clus, s.failpoint, baseTime)
if err != nil {
t.Error(err)
cancel()
}
// Give some time for traffic to reach qps target after injecting failpoint.
time.Sleep(time.Second)
lg.Info("Finished injecting failures")
if fr != nil {
failpointInjected <- *fr
}
return nil
})
maxRevisionChan := make(chan int64, 1)
g.Go(func() error {
defer close(maxRevisionChan)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, finishTraffic, baseTime, ids)
operationReport = traffic.SimulateTraffic(ctx, t, lg, clus, s.profile, s.traffic, failpointInjected, baseTime, ids)
maxRevision := operationsMaxRevision(operationReport)
maxRevisionChan <- maxRevision
lg.Info("Finished simulating traffic", zap.Int64("max-revision", maxRevision))
Expand Down
11 changes: 0 additions & 11 deletions tests/robustness/report/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,6 @@ type ClientReport struct {
Watch []model.WatchOperation
}

func (r ClientReport) SuccessfulOperations() int {
count := 0
for _, op := range r.KeyValue {
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" {
count++
}
}
return count
}

func (r ClientReport) WatchEventCount() int {
count := 0
for _, op := range r.Watch {
Expand Down
78 changes: 63 additions & 15 deletions tests/robustness/traffic/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
"golang.org/x/time/rate"

"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
"go.etcd.io/etcd/tests/v3/robustness/identity"
"go.etcd.io/etcd/tests/v3/robustness/model"
"go.etcd.io/etcd/tests/v3/robustness/report"
)

Expand All @@ -50,15 +52,14 @@ var (
}
)

func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, finish <-chan struct{}, baseTime time.Time, ids identity.Provider) []report.ClientReport {
func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2e.EtcdProcessCluster, profile Profile, traffic Traffic, failpointInjected <-chan failpoint.InjectionReport, baseTime time.Time, ids identity.Provider) []report.ClientReport {
mux := sync.Mutex{}
endpoints := clus.EndpointsGRPC()

lm := identity.NewLeaseIDStorage()
reports := []report.ClientReport{}
limiter := rate.NewLimiter(rate.Limit(profile.MaximalQPS), 200)

startTime := time.Now()
cc, err := NewClient(endpoints, ids, baseTime)
if err != nil {
t.Fatal(err)
Expand All @@ -71,6 +72,9 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
wg := sync.WaitGroup{}
nonUniqueWriteLimiter := NewConcurrencyLimiter(profile.MaxNonUniqueRequestConcurrency)
finish := make(chan struct{})
lg.Info("Start traffic")
startTime := time.Since(baseTime)
for i := 0; i < profile.ClientCount; i++ {
wg.Add(1)
c, nerr := NewClient([]string{endpoints[i%len(endpoints)]}, ids, baseTime)
Expand All @@ -87,8 +91,20 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
mux.Unlock()
}(c)
}
var fr *failpoint.InjectionReport
select {
case frp, ok := <-failpointInjected:
if !ok {
t.Fatalf("Failed to collect failpoint report")
}
fr = &frp
case <-ctx.Done():
t.Fatalf("Traffic finished before failure was injected: %s", ctx.Err())
}
close(finish)
wg.Wait()
endTime := time.Now()
lg.Info("Finished traffic")
endTime := time.Since(baseTime)

time.Sleep(time.Second)
// Ensure that last operation succeeds
Expand All @@ -98,23 +114,55 @@ func SimulateTraffic(ctx context.Context, t *testing.T, lg *zap.Logger, clus *e2
}
reports = append(reports, cc.Report())

var totalOperations int
var successfulOperations int
for _, r := range reports {
totalOperations += len(r.KeyValue)
successfulOperations += r.SuccessfulOperations()
}
lg.Info("Recorded operations", zap.Int("operations", totalOperations), zap.Float64("successRate", float64(successfulOperations)/float64(totalOperations)))
totalStats := calculateStats(reports, startTime, endTime)
beforeFailpointStats := calculateStats(reports, startTime, fr.Start)
duringFailpointStats := calculateStats(reports, fr.Start, fr.End)
afterFailpointStats := calculateStats(reports, fr.End, endTime)

lg.Info("Reporting complete traffic", zap.Int("successes", totalStats.successes), zap.Int("failures", totalStats.failures), zap.Float64("successRate", totalStats.successRate()), zap.Duration("period", totalStats.period), zap.Float64("qps", totalStats.QPS()))
lg.Info("Reporting traffic before failure injection", zap.Int("successes", beforeFailpointStats.successes), zap.Int("failures", beforeFailpointStats.failures), zap.Float64("successRate", beforeFailpointStats.successRate()), zap.Duration("period", beforeFailpointStats.period), zap.Float64("qps", beforeFailpointStats.QPS()))
lg.Info("Reporting traffic during failure injection", zap.Int("successes", duringFailpointStats.successes), zap.Int("failures", duringFailpointStats.failures), zap.Float64("successRate", duringFailpointStats.successRate()), zap.Duration("period", duringFailpointStats.period), zap.Float64("qps", duringFailpointStats.QPS()))
lg.Info("Reporting traffic after failure injection", zap.Int("successes", afterFailpointStats.successes), zap.Int("failures", afterFailpointStats.failures), zap.Float64("successRate", afterFailpointStats.successRate()), zap.Duration("period", afterFailpointStats.period), zap.Float64("qps", afterFailpointStats.QPS()))

period := endTime.Sub(startTime)
qps := float64(successfulOperations) / period.Seconds()
lg.Info("Traffic from successful requests", zap.Float64("qps", qps), zap.Int("operations", successfulOperations), zap.Duration("period", period))
if qps < profile.MinimalQPS {
t.Errorf("Requiring minimal %f qps for test results to be reliable, got %f qps", profile.MinimalQPS, qps)
if beforeFailpointStats.QPS() < profile.MinimalQPS {
t.Errorf("Requiring minimal %f qps before failpoint injection for test results to be reliable, got %f qps", profile.MinimalQPS, beforeFailpointStats.QPS())
}
// TODO: Validate QPS post failpoint injection to ensure the that we sufficiently cover period when cluster recovers.
return reports
}

func calculateStats(reports []report.ClientReport, start, end time.Duration) (ts trafficStats) {
ts.period = end - start

for _, r := range reports {
for _, op := range r.KeyValue {
if op.Call < start.Nanoseconds() || op.Call > end.Nanoseconds() {
continue
}
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error == "" {
ts.successes++
} else {
ts.failures++
}
}
}
return ts
}

type trafficStats struct {
successes, failures int
period time.Duration
}

func (ts *trafficStats) successRate() float64 {
return float64(ts.successes) / float64(ts.successes+ts.failures)
}

func (ts *trafficStats) QPS() float64 {
return float64(ts.successes) / ts.period.Seconds()
}

type Profile struct {
Name string
MinimalQPS float64
Expand Down
Loading