Skip to content

Commit 8f51613

Browse files
add MemberDowngradeUpgrade failpoint
Signed-off-by: Siyuan Zhang <[email protected]>
1 parent ebb2b06 commit 8f51613

File tree

5 files changed

+254
-180
lines changed

5 files changed

+254
-180
lines changed

tests/e2e/cluster_downgrade_test.go

Lines changed: 4 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@ package e2e
1616

1717
import (
1818
"context"
19-
"encoding/json"
2019
"fmt"
21-
"strings"
2220
"testing"
2321
"time"
2422

2523
"github.com/coreos/go-semver/semver"
2624
"github.com/stretchr/testify/assert"
2725
"github.com/stretchr/testify/require"
28-
"go.uber.org/zap"
2926

3027
"go.etcd.io/etcd/api/v3/version"
3128
"go.etcd.io/etcd/client/pkg/v3/fileutil"
@@ -38,7 +35,6 @@ import (
3835
"go.etcd.io/etcd/server/v3/storage/datadir"
3936
"go.etcd.io/etcd/tests/v3/framework/config"
4037
"go.etcd.io/etcd/tests/v3/framework/e2e"
41-
"go.etcd.io/etcd/tests/v3/framework/testutils"
4238
)
4339

4440
func TestDowngradeUpgradeClusterOf1(t *testing.T) {
@@ -78,15 +74,14 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
7874

7975
lastClusterVersion := semver.New(lastVersionStr)
8076
lastClusterVersion.Patch = 0
81-
lastClusterVersionStr := lastClusterVersion.String()
8277

8378
e2e.BeforeTest(t)
8479

8580
t.Logf("Create cluster with version %s", currentVersionStr)
8681
var snapshotCount uint64 = 10
8782
epc := newCluster(t, clusterSize, snapshotCount)
8883
for i := 0; i < len(epc.Procs); i++ {
89-
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
84+
e2e.ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
9085
Cluster: currentVersionStr,
9186
Server: version.Version,
9287
Storage: currentVersionStr,
@@ -112,35 +107,10 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
112107
require.NoError(t, err)
113108
beforeMembers, beforeKV := getMembersAndKeys(t, cc)
114109

115-
t.Logf("etcdctl downgrade enable %s", lastVersionStr)
116-
downgradeEnable(t, epc, lastVersion)
117-
118-
t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
119-
for i := 0; i < len(epc.Procs); i++ {
120-
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
121-
Cluster: lastClusterVersionStr,
122-
Server: version.Version,
123-
Storage: lastClusterVersionStr,
124-
})
125-
e2e.AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
126-
}
127-
128-
t.Log("Cluster is ready for downgrade")
110+
e2e.DowngradeEnable(t, epc, lastVersion)
129111
t.Logf("Starting downgrade process to %q", lastVersionStr)
130-
for i := 0; i < len(epc.Procs); i++ {
131-
t.Logf("Downgrading member %d by running %s binary", i, lastReleaseBinary)
132-
stopEtcd(t, epc.Procs[i])
133-
startEtcd(t, epc.Procs[i], lastReleaseBinary)
134-
}
135-
136-
t.Log("All members downgraded, validating downgrade")
112+
e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), currentVersion, lastClusterVersion)
137113
e2e.AssertProcessLogs(t, leader(t, epc), "the cluster has been downgraded")
138-
for i := 0; i < len(epc.Procs); i++ {
139-
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
140-
Cluster: lastClusterVersionStr,
141-
Server: lastVersionStr,
142-
})
143-
}
144114

145115
t.Log("Downgrade complete")
146116
afterMembers, afterKV := getMembersAndKeys(t, cc)
@@ -165,23 +135,7 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
165135
beforeMembers, beforeKV = getMembersAndKeys(t, cc)
166136

167137
t.Logf("Starting upgrade process to %q", currentVersionStr)
168-
for i := 0; i < len(epc.Procs); i++ {
169-
t.Logf("Upgrading member %d", i)
170-
stopEtcd(t, epc.Procs[i])
171-
startEtcd(t, epc.Procs[i], currentEtcdBinary)
172-
// NOTE: The leader has monitor to the cluster version, which will
173-
// update cluster version. We don't need to check the transient
174-
// version just in case that it might be flaky.
175-
}
176-
177-
t.Log("All members upgraded, validating upgrade")
178-
for i := 0; i < len(epc.Procs); i++ {
179-
validateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
180-
Cluster: currentVersionStr,
181-
Server: version.Version,
182-
Storage: currentVersionStr,
183-
})
184-
}
138+
e2e.DowngradeUpgradeMembers(t, nil, epc, len(epc.Procs), lastClusterVersion, currentVersion)
185139
t.Log("Upgrade complete")
186140

187141
afterMembers, afterKV = getMembersAndKeys(t, cc)
@@ -206,48 +160,6 @@ func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdPr
206160
return epc
207161
}
208162

209-
func startEtcd(t *testing.T, ep e2e.EtcdProcess, execPath string) {
210-
ep.Config().ExecPath = execPath
211-
err := ep.Restart(context.TODO())
212-
if err != nil {
213-
t.Fatalf("could not start etcd process cluster (%v)", err)
214-
}
215-
}
216-
217-
func downgradeEnable(t *testing.T, epc *e2e.EtcdProcessCluster, ver *semver.Version) {
218-
c := epc.Etcdctl()
219-
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
220-
err := c.DowngradeEnable(context.TODO(), ver.String())
221-
require.NoError(t, err)
222-
})
223-
}
224-
225-
func stopEtcd(t *testing.T, ep e2e.EtcdProcess) {
226-
err := ep.Stop()
227-
require.NoError(t, err)
228-
}
229-
230-
func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess, expect version.Versions) {
231-
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
232-
for {
233-
result, err := getMemberVersionByCurl(cfg, member)
234-
if err != nil {
235-
cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err), zap.String("member", member.Config().Name))
236-
time.Sleep(time.Second)
237-
continue
238-
}
239-
cfg.Logger.Info("Comparing versions", zap.String("member", member.Config().Name), zap.Any("got", result), zap.Any("want", expect))
240-
if err := compareMemberVersion(expect, result); err != nil {
241-
cfg.Logger.Warn("Versions didn't match retrying", zap.Error(err), zap.String("member", member.Config().Name))
242-
time.Sleep(time.Second)
243-
continue
244-
}
245-
cfg.Logger.Info("Versions match", zap.String("member", member.Config().Name))
246-
break
247-
}
248-
})
249-
}
250-
251163
func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
252164
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
253165
defer cancel()
@@ -269,36 +181,6 @@ func leader(t *testing.T, epc *e2e.EtcdProcessCluster) e2e.EtcdProcess {
269181
return nil
270182
}
271183

272-
func compareMemberVersion(expect version.Versions, target version.Versions) error {
273-
if expect.Server != "" && expect.Server != target.Server {
274-
return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server)
275-
}
276-
277-
if expect.Cluster != "" && expect.Cluster != target.Cluster {
278-
return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster)
279-
}
280-
281-
if expect.Storage != "" && expect.Storage != target.Storage {
282-
return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage)
283-
}
284-
return nil
285-
}
286-
287-
func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdProcess) (version.Versions, error) {
288-
args := e2e.CURLPrefixArgsCluster(cfg, member, "GET", e2e.CURLReq{Endpoint: "/version"})
289-
lines, err := e2e.RunUtilCompletion(args, nil)
290-
if err != nil {
291-
return version.Versions{}, err
292-
}
293-
294-
data := strings.Join(lines, "\n")
295-
result := version.Versions{}
296-
if err := json.Unmarshal([]byte(data), &result); err != nil {
297-
return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err)
298-
}
299-
return result, nil
300-
}
301-
302184
func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
303185
ctx, cancel := context.WithCancel(context.Background())
304186
defer cancel()

tests/framework/e2e/curl.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,10 +123,8 @@ func CURLPut(clus *EtcdProcessCluster, req CURLReq) error {
123123
}
124124

125125
func CURLGet(clus *EtcdProcessCluster, req CURLReq) error {
126-
ctx, cancel := context.WithTimeout(context.Background(), req.timeoutDuration())
127-
defer cancel()
128-
129-
return SpawnWithExpectsContext(ctx, CURLPrefixArgsCluster(clus.Cfg, clus.Procs[rand.Intn(clus.Cfg.ClusterSize)], "GET", req), nil, req.Expected)
126+
member := clus.Procs[rand.Intn(clus.Cfg.ClusterSize)]
127+
return CURLGetFromMember(clus, member, req)
130128
}
131129

132130
func CURLGetFromMember(clus *EtcdProcessCluster, member EtcdProcess, req CURLReq) error {

tests/framework/e2e/downgrade.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
// Copyright 2025 The etcd Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package e2e
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"math/rand"
22+
"strings"
23+
"testing"
24+
"time"
25+
26+
"github.com/coreos/go-semver/semver"
27+
"github.com/stretchr/testify/require"
28+
"go.uber.org/zap"
29+
30+
"go.etcd.io/etcd/api/v3/version"
31+
"go.etcd.io/etcd/server/v3/etcdserver"
32+
"go.etcd.io/etcd/tests/v3/framework/testutils"
33+
)
34+
35+
func DowngradeEnable(t *testing.T, epc *EtcdProcessCluster, ver *semver.Version) {
36+
t.Logf("etcdctl downgrade enable %s", ver.String())
37+
c := epc.Etcdctl()
38+
testutils.ExecuteWithTimeout(t, 20*time.Second, func() {
39+
err := c.DowngradeEnable(context.TODO(), ver.String())
40+
require.NoError(t, err)
41+
})
42+
43+
t.Log("Downgrade enabled, validating if cluster is ready for downgrade")
44+
for i := 0; i < len(epc.Procs); i++ {
45+
ValidateVersion(t, epc.Cfg, epc.Procs[i], version.Versions{
46+
Cluster: ver.String(),
47+
Server: offsetMinor(ver, 1).String(),
48+
Storage: ver.String(),
49+
})
50+
AssertProcessLogs(t, epc.Procs[i], "The server is ready to downgrade")
51+
}
52+
53+
t.Log("Cluster is ready for downgrade")
54+
}
55+
56+
func DowngradeUpgradeMembers(t *testing.T, lg *zap.Logger, clus *EtcdProcessCluster, numberOfMembersToChange int, currentVersion, targetVersion *semver.Version) error {
57+
if lg == nil {
58+
lg = clus.lg
59+
}
60+
isDowngrade := targetVersion.LessThan(*currentVersion)
61+
opString := "upgrading"
62+
newExecPath := BinPath.Etcd
63+
if isDowngrade {
64+
opString = "downgrading"
65+
newExecPath = BinPath.EtcdLastRelease
66+
}
67+
membersToChange := rand.Perm(len(clus.Procs))[:numberOfMembersToChange]
68+
lg.Info(fmt.Sprintf("Test %s members", opString), zap.Any("members", membersToChange))
69+
70+
// Need to wait health interval for cluster to prepare for downgrade/upgrade
71+
time.Sleep(etcdserver.HealthInterval)
72+
73+
for _, memberID := range membersToChange {
74+
member := clus.Procs[memberID]
75+
if member.Config().ExecPath == newExecPath {
76+
return fmt.Errorf("member:%s is already running with the %s target binary - %s", member.Config().Name, opString, member.Config().ExecPath)
77+
}
78+
lg.Info(fmt.Sprintf("%s member", opString), zap.String("member", member.Config().Name))
79+
if err := member.Stop(); err != nil {
80+
return err
81+
}
82+
member.Config().ExecPath = newExecPath
83+
lg.Info("Restarting member", zap.String("member", member.Config().Name))
84+
err := member.Start(context.TODO())
85+
if err != nil {
86+
return err
87+
}
88+
}
89+
lg.Info("Validating versions")
90+
for _, memberID := range membersToChange {
91+
member := clus.Procs[memberID]
92+
if isDowngrade || numberOfMembersToChange == len(clus.Procs) {
93+
ValidateVersion(t, clus.Cfg, member, version.Versions{
94+
Cluster: targetVersion.String(),
95+
Server: targetVersion.String(),
96+
})
97+
} else {
98+
ValidateVersion(t, clus.Cfg, member, version.Versions{
99+
Cluster: currentVersion.String(),
100+
Server: targetVersion.String(),
101+
})
102+
}
103+
}
104+
return nil
105+
}
106+
107+
func ValidateVersion(t *testing.T, cfg *EtcdProcessClusterConfig, member EtcdProcess, expect version.Versions) {
108+
testutils.ExecuteWithTimeout(t, 30*time.Second, func() {
109+
for {
110+
result, err := getMemberVersionByCurl(cfg, member)
111+
if err != nil {
112+
cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err), zap.String("member", member.Config().Name))
113+
time.Sleep(time.Second)
114+
continue
115+
}
116+
cfg.Logger.Info("Comparing versions", zap.String("member", member.Config().Name), zap.Any("got", result), zap.Any("want", expect))
117+
if err := compareMemberVersion(expect, result); err != nil {
118+
cfg.Logger.Warn("Versions didn't match retrying", zap.Error(err), zap.String("member", member.Config().Name))
119+
time.Sleep(time.Second)
120+
continue
121+
}
122+
cfg.Logger.Info("Versions match", zap.String("member", member.Config().Name))
123+
break
124+
}
125+
})
126+
}
127+
128+
// offsetMinor returns the version with offset from the original minor, with the same major.
129+
func offsetMinor(v *semver.Version, offset int) *semver.Version {
130+
var minor int64
131+
if offset >= 0 {
132+
minor = v.Minor + int64(offset)
133+
} else {
134+
diff := int64(-offset)
135+
if diff < v.Minor {
136+
minor = v.Minor - diff
137+
}
138+
}
139+
return &semver.Version{Major: v.Major, Minor: minor}
140+
}
141+
142+
func majorMinorVersionsEqual(v1, v2 string) bool {
143+
ver1 := semver.New(v1)
144+
ver2 := semver.New(v2)
145+
return ver1.Major == ver2.Major && ver1.Minor == ver2.Minor
146+
}
147+
148+
func compareMemberVersion(expect version.Versions, target version.Versions) error {
149+
if expect.Server != "" && !majorMinorVersionsEqual(expect.Server, target.Server) {
150+
return fmt.Errorf("expect etcdserver version %v, but got %v", expect.Server, target.Server)
151+
}
152+
153+
if expect.Cluster != "" && !majorMinorVersionsEqual(expect.Cluster, target.Cluster) {
154+
return fmt.Errorf("expect etcdcluster version %v, but got %v", expect.Cluster, target.Cluster)
155+
}
156+
157+
if expect.Storage != "" && !majorMinorVersionsEqual(expect.Storage, target.Storage) {
158+
return fmt.Errorf("expect storage version %v, but got %v", expect.Storage, target.Storage)
159+
}
160+
return nil
161+
}
162+
163+
func getMemberVersionByCurl(cfg *EtcdProcessClusterConfig, member EtcdProcess) (version.Versions, error) {
164+
args := CURLPrefixArgsCluster(cfg, member, "GET", CURLReq{Endpoint: "/version"})
165+
lines, err := RunUtilCompletion(args, nil)
166+
if err != nil {
167+
return version.Versions{}, err
168+
}
169+
170+
data := strings.Join(lines, "\n")
171+
result := version.Versions{}
172+
if err := json.Unmarshal([]byte(data), &result); err != nil {
173+
return version.Versions{}, fmt.Errorf("failed to unmarshal (%v): %w", data, err)
174+
}
175+
return result, nil
176+
}

0 commit comments

Comments
 (0)