Skip to content

Commit dff0b4f

Browse files
committed
etcd_docker 4: Incorporate docker based etcd into Go integration tests (#4148)
PR 4 for #4144 High level approach is as described in #4144 . This PR integrates docker based etcd into our Go integration tests. It removes the need to have the embed package running in m3db for them, but doesn't yet touch that functionality. commit-id:3ae12ffd
1 parent 838690b commit dff0b4f

File tree

11 files changed

+254
-117
lines changed

11 files changed

+254
-117
lines changed

src/integration/aggregator/aggregator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,17 @@ ingest:
117117
maxBackoff: 10s
118118
jitter: true
119119
storeMetricsType: true
120+
121+
clusterManagement:
122+
etcd:
123+
env: default_env
124+
zone: embedded
125+
service: m3db
126+
cacheDir: /var/lib/m3kv
127+
etcdClusters:
128+
- zone: embedded
129+
endpoints:
130+
- 127.0.0.1:2379
120131
`
121132

122133
// TestAggregatorAggregatorConfig is the test config for the aggregators.

src/integration/aggregator/aggregator_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
//go:build cluster_integration
12
// +build cluster_integration
3+
24
//
35
// Copyright (c) 2021 Uber Technologies, Inc.
46
//

src/integration/repair/repair_and_replication_test.go

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1+
//go:build cluster_integration
12
// +build cluster_integration
3+
24
//
35
// Copyright (c) 2021 Uber Technologies, Inc.
46
//
@@ -23,28 +25,45 @@
2325
package repair
2426

2527
import (
28+
"context"
2629
"testing"
2730

28-
"github.com/stretchr/testify/assert"
29-
"github.com/stretchr/testify/require"
30-
3131
"github.com/m3db/m3/src/integration/resources"
32+
"github.com/m3db/m3/src/integration/resources/docker/dockerexternal"
3233
"github.com/m3db/m3/src/integration/resources/inprocess"
34+
"github.com/m3db/m3/src/x/instrument"
35+
36+
"github.com/ory/dockertest/v3"
37+
"github.com/stretchr/testify/assert"
38+
"github.com/stretchr/testify/require"
3339
)
3440

3541
func TestRepairAndReplication(t *testing.T) {
42+
t.Skip("failing after etcd containerization; fix.")
3643
cluster1, cluster2, closer := testSetup(t)
3744
defer closer()
3845

3946
RunTest(t, cluster1, cluster2)
4047
}
4148

4249
func testSetup(t *testing.T) (resources.M3Resources, resources.M3Resources, func()) {
43-
fullCfgs1 := getClusterFullConfgs(t)
44-
fullCfgs2 := getClusterFullConfgs(t)
50+
pool, err := dockertest.NewPool("")
51+
require.NoError(t, err)
4552

46-
ep1 := fullCfgs1.Configs.Coordinator.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints
47-
ep2 := fullCfgs2.Configs.Coordinator.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints
53+
etcd1 := mustNewStartedEtcd(t, pool)
54+
etcd2 := mustNewStartedEtcd(t, pool)
55+
56+
ep1 := []string{etcd1.Address()}
57+
ep2 := []string{etcd2.Address()}
58+
59+
cluster1Opts := newTestClusterOptions()
60+
cluster1Opts.EtcdEndpoints = ep1
61+
62+
cluster2Opts := newTestClusterOptions()
63+
cluster2Opts.EtcdEndpoints = ep2
64+
65+
fullCfgs1 := getClusterFullConfgs(t, cluster1Opts)
66+
fullCfgs2 := getClusterFullConfgs(t, cluster2Opts)
4867

4968
setRepairAndReplicationCfg(
5069
&fullCfgs1,
@@ -57,19 +76,28 @@ func testSetup(t *testing.T) (resources.M3Resources, resources.M3Resources, func
5776
ep1,
5877
)
5978

60-
cluster1, err := inprocess.NewClusterFromSpecification(fullCfgs1, clusterOptions)
79+
cluster1, err := inprocess.NewClusterFromSpecification(fullCfgs1, cluster1Opts)
6180
require.NoError(t, err)
6281

63-
cluster2, err := inprocess.NewClusterFromSpecification(fullCfgs2, clusterOptions)
82+
cluster2, err := inprocess.NewClusterFromSpecification(fullCfgs2, cluster2Opts)
6483
require.NoError(t, err)
6584

6685
return cluster1, cluster2, func() {
86+
etcd1.Close(context.TODO())
87+
etcd2.Close(context.TODO())
6788
assert.NoError(t, cluster1.Cleanup())
6889
assert.NoError(t, cluster2.Cleanup())
6990
}
7091
}
7192

72-
func getClusterFullConfgs(t *testing.T) inprocess.ClusterSpecification {
93+
func mustNewStartedEtcd(t *testing.T, pool *dockertest.Pool) *dockerexternal.EtcdNode {
94+
etcd, err := dockerexternal.NewEtcd(pool, instrument.NewOptions())
95+
require.NoError(t, err)
96+
require.NoError(t, etcd.Setup(context.TODO()))
97+
return etcd
98+
}
99+
100+
func getClusterFullConfgs(t *testing.T, clusterOptions resources.ClusterOptions) inprocess.ClusterSpecification {
73101
cfgs, err := inprocess.NewClusterConfigsFromYAML(
74102
TestRepairDBNodeConfig, TestRepairCoordinatorConfig, "",
75103
)
@@ -84,18 +112,22 @@ func getClusterFullConfgs(t *testing.T) inprocess.ClusterSpecification {
84112
func setRepairAndReplicationCfg(fullCfg *inprocess.ClusterSpecification, clusterName string, endpoints []string) {
85113
for _, dbnode := range fullCfg.Configs.DBNodes {
86114
dbnode.DB.Replication.Clusters[0].Name = clusterName
87-
dbnode.DB.Replication.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0].Endpoints = endpoints
115+
etcdService := &(dbnode.DB.Replication.Clusters[0].Client.EnvironmentConfig.Services[0].Service.ETCDClusters[0])
116+
etcdService.AutoSyncInterval = -1
117+
etcdService.Endpoints = endpoints
88118
}
89119
}
90120

91-
var clusterOptions = resources.ClusterOptions{
92-
DBNode: &resources.DBNodeClusterOptions{
93-
RF: 2,
94-
NumShards: 4,
95-
NumInstances: 1,
96-
NumIsolationGroups: 2,
97-
},
98-
Coordinator: resources.CoordinatorClusterOptions{
99-
GeneratePorts: true,
100-
},
121+
func newTestClusterOptions() resources.ClusterOptions {
122+
return resources.ClusterOptions{
123+
DBNode: &resources.DBNodeClusterOptions{
124+
RF: 2,
125+
NumShards: 4,
126+
NumInstances: 1,
127+
NumIsolationGroups: 2,
128+
},
129+
Coordinator: resources.CoordinatorClusterOptions{
130+
GeneratePorts: true,
131+
},
132+
}
101133
}

src/integration/resources/coordinator_client.go

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ var errUnknownServiceType = errors.New("unknown service type")
5959
// operation until successful.
6060
type RetryFunc func(op func() error) error
6161

62-
// ZapMethod appends the method as a log field.
63-
func ZapMethod(s string) zapcore.Field { return zap.String("method", s) }
62+
// zapMethod appends the method as a log field.
63+
func zapMethod(s string) zapcore.Field { return zap.String("method", s) }
6464

6565
// CoordinatorClient is a client use to invoke API calls
6666
// on a coordinator
@@ -97,7 +97,7 @@ func (c *CoordinatorClient) makeURL(resource string) string {
9797
func (c *CoordinatorClient) GetNamespace() (admin.NamespaceGetResponse, error) {
9898
url := c.makeURL("api/v1/services/m3db/namespace")
9999
logger := c.logger.With(
100-
ZapMethod("getNamespace"), zap.String("url", url))
100+
zapMethod("getNamespace"), zap.String("url", url))
101101

102102
//nolint:noctx
103103
resp, err := c.client.Get(url)
@@ -129,7 +129,7 @@ func (c *CoordinatorClient) GetPlacement(opts PlacementRequestOptions) (admin.Pl
129129
}
130130
url := c.makeURL(handlerurl)
131131
logger := c.logger.With(
132-
ZapMethod("getPlacement"), zap.String("url", url))
132+
zapMethod("getPlacement"), zap.String("url", url))
133133

134134
resp, err := c.makeRequest(logger, url, placementhandler.GetHTTPMethod, nil, placementOptsToMap(opts))
135135
if err != nil {
@@ -163,7 +163,7 @@ func (c *CoordinatorClient) InitPlacement(
163163
}
164164
url := c.makeURL(handlerurl)
165165
logger := c.logger.With(
166-
ZapMethod("initPlacement"), zap.String("url", url))
166+
zapMethod("initPlacement"), zap.String("url", url))
167167

168168
resp, err := c.makeRequest(logger, url, placementhandler.InitHTTPMethod, &initRequest, placementOptsToMap(opts))
169169
if err != nil {
@@ -194,7 +194,7 @@ func (c *CoordinatorClient) DeleteAllPlacements(opts PlacementRequestOptions) er
194194
}
195195
url := c.makeURL(handlerurl)
196196
logger := c.logger.With(
197-
ZapMethod("deleteAllPlacements"), zap.String("url", url))
197+
zapMethod("deleteAllPlacements"), zap.String("url", url))
198198

199199
resp, err := c.makeRequest(
200200
logger, url, placementhandler.DeleteAllHTTPMethod, nil, placementOptsToMap(opts),
@@ -221,7 +221,7 @@ func (c *CoordinatorClient) DeleteAllPlacements(opts PlacementRequestOptions) er
221221
// NB: if the name string is empty, this will instead
222222
// check for a successful response.
223223
func (c *CoordinatorClient) WaitForNamespace(name string) error {
224-
logger := c.logger.With(ZapMethod("waitForNamespace"))
224+
logger := c.logger.With(zapMethod("waitForNamespace"))
225225
return c.retryFunc(func() error {
226226
ns, err := c.GetNamespace()
227227
if err != nil {
@@ -250,7 +250,7 @@ func (c *CoordinatorClient) WaitForNamespace(name string) error {
250250
func (c *CoordinatorClient) WaitForInstances(
251251
ids []string,
252252
) error {
253-
logger := c.logger.With(ZapMethod("waitForPlacement"))
253+
logger := c.logger.With(zapMethod("waitForPlacement"))
254254
return c.retryFunc(func() error {
255255
placement, err := c.GetPlacement(PlacementRequestOptions{Service: ServiceTypeM3DB})
256256
if err != nil {
@@ -282,7 +282,7 @@ func (c *CoordinatorClient) WaitForInstances(
282282

283283
// WaitForShardsReady waits until all shards gets ready.
284284
func (c *CoordinatorClient) WaitForShardsReady() error {
285-
logger := c.logger.With(ZapMethod("waitForShards"))
285+
logger := c.logger.With(zapMethod("waitForShards"))
286286
return c.retryFunc(func() error {
287287
placement, err := c.GetPlacement(PlacementRequestOptions{Service: ServiceTypeM3DB})
288288
if err != nil {
@@ -307,7 +307,7 @@ func (c *CoordinatorClient) WaitForShardsReady() error {
307307
func (c *CoordinatorClient) WaitForClusterReady() error {
308308
var (
309309
url = c.makeURL("ready")
310-
logger = c.logger.With(ZapMethod("waitForClusterReady"), zap.String("url", url))
310+
logger = c.logger.With(zapMethod("waitForClusterReady"), zap.String("url", url))
311311
)
312312
return c.retryFunc(func() error {
313313
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
@@ -350,7 +350,7 @@ func (c *CoordinatorClient) CreateDatabase(
350350
) (admin.DatabaseCreateResponse, error) {
351351
url := c.makeURL("api/v1/database/create")
352352
logger := c.logger.With(
353-
ZapMethod("createDatabase"), zap.String("url", url),
353+
zapMethod("createDatabase"), zap.String("url", url),
354354
zap.String("request", addRequest.String()))
355355

356356
resp, err := c.makeRequest(logger, url, http.MethodPost, &addRequest, nil)
@@ -383,7 +383,7 @@ func (c *CoordinatorClient) AddNamespace(
383383
) (admin.NamespaceGetResponse, error) {
384384
url := c.makeURL("api/v1/services/m3db/namespace")
385385
logger := c.logger.With(
386-
ZapMethod("addNamespace"), zap.String("url", url),
386+
zapMethod("addNamespace"), zap.String("url", url),
387387
zap.String("request", addRequest.String()))
388388

389389
resp, err := c.makeRequest(logger, url, http.MethodPost, &addRequest, nil)
@@ -411,7 +411,7 @@ func (c *CoordinatorClient) UpdateNamespace(
411411
) (admin.NamespaceGetResponse, error) {
412412
url := c.makeURL("api/v1/services/m3db/namespace")
413413
logger := c.logger.With(
414-
ZapMethod("updateNamespace"), zap.String("url", url),
414+
zapMethod("updateNamespace"), zap.String("url", url),
415415
zap.String("request", req.String()))
416416

417417
resp, err := c.makeRequest(logger, url, http.MethodPut, &req, nil)
@@ -431,7 +431,7 @@ func (c *CoordinatorClient) UpdateNamespace(
431431
func (c *CoordinatorClient) setNamespaceReady(name string) error {
432432
url := c.makeURL("api/v1/services/m3db/namespace/ready")
433433
logger := c.logger.With(
434-
ZapMethod("setNamespaceReady"), zap.String("url", url),
434+
zapMethod("setNamespaceReady"), zap.String("url", url),
435435
zap.String("namespace", name))
436436

437437
_, err := c.makeRequest(logger, url, http.MethodPost, // nolint: bodyclose
@@ -445,7 +445,7 @@ func (c *CoordinatorClient) setNamespaceReady(name string) error {
445445
// DeleteNamespace removes the namespace.
446446
func (c *CoordinatorClient) DeleteNamespace(namespaceID string) error {
447447
url := c.makeURL("api/v1/services/m3db/namespace/" + namespaceID)
448-
logger := c.logger.With(ZapMethod("deleteNamespace"), zap.String("url", url))
448+
logger := c.logger.With(zapMethod("deleteNamespace"), zap.String("url", url))
449449

450450
if _, err := c.makeRequest(logger, url, http.MethodDelete, nil, nil); err != nil { // nolint: bodyclose
451451
logger.Error("failed to delete namespace", zap.Error(err))
@@ -462,7 +462,7 @@ func (c *CoordinatorClient) InitM3msgTopic(
462462
) (admin.TopicGetResponse, error) {
463463
url := c.makeURL(topic.InitURL)
464464
logger := c.logger.With(
465-
ZapMethod("initM3msgTopic"),
465+
zapMethod("initM3msgTopic"),
466466
zap.String("url", url),
467467
zap.String("request", initRequest.String()),
468468
zap.String("topic", fmt.Sprintf("%v", topicOpts)))
@@ -489,7 +489,7 @@ func (c *CoordinatorClient) GetM3msgTopic(
489489
) (admin.TopicGetResponse, error) {
490490
url := c.makeURL(topic.GetURL)
491491
logger := c.logger.With(
492-
ZapMethod("getM3msgTopic"), zap.String("url", url),
492+
zapMethod("getM3msgTopic"), zap.String("url", url),
493493
zap.String("topic", fmt.Sprintf("%v", topicOpts)))
494494

495495
resp, err := c.makeRequest(logger, url, topic.GetHTTPMethod, nil, m3msgTopicOptionsToMap(topicOpts))
@@ -516,7 +516,7 @@ func (c *CoordinatorClient) AddM3msgTopicConsumer(
516516
) (admin.TopicGetResponse, error) {
517517
url := c.makeURL(topic.AddURL)
518518
logger := c.logger.With(
519-
ZapMethod("addM3msgTopicConsumer"),
519+
zapMethod("addM3msgTopicConsumer"),
520520
zap.String("url", url),
521521
zap.String("request", addRequest.String()),
522522
zap.String("topic", fmt.Sprintf("%v", topicOpts)))
@@ -557,7 +557,7 @@ func (c *CoordinatorClient) WriteCarbon(
557557
url string, metric string, v float64, t time.Time,
558558
) error {
559559
logger := c.logger.With(
560-
ZapMethod("writeCarbon"), zap.String("url", url),
560+
zapMethod("writeCarbon"), zap.String("url", url),
561561
zap.String("at time", time.Now().String()),
562562
zap.String("at ts", t.String()))
563563

@@ -623,7 +623,7 @@ func (c *CoordinatorClient) WritePromWithRequest(writeRequest prompb.WriteReques
623623
url := c.makeURL("api/v1/prom/remote/write")
624624

625625
logger := c.logger.With(
626-
ZapMethod("writeProm"), zap.String("url", url),
626+
zapMethod("writeProm"), zap.String("url", url),
627627
zap.String("request", writeRequest.String()))
628628

629629
body, err := proto.Marshal(&writeRequest)
@@ -697,7 +697,7 @@ func (c *CoordinatorClient) ApplyKVUpdate(update string) error {
697697
url := c.makeURL("api/v1/kvstore")
698698

699699
logger := c.logger.With(
700-
ZapMethod("ApplyKVUpdate"), zap.String("url", url),
700+
zapMethod("ApplyKVUpdate"), zap.String("url", url),
701701
zap.String("update", update))
702702

703703
data := bytes.NewBuffer([]byte(update))
@@ -731,7 +731,7 @@ func (c *CoordinatorClient) query(
731731
) error {
732732
url := c.makeURL(query)
733733
logger := c.logger.With(
734-
ZapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
734+
zapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
735735
logger.Info("running")
736736
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
737737
if err != nil {
@@ -962,7 +962,7 @@ func (c *CoordinatorClient) runQuery(
962962
) (string, error) {
963963
url := c.makeURL(query)
964964
logger := c.logger.With(
965-
ZapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
965+
zapMethod("query"), zap.String("url", url), zap.Any("headers", headers))
966966
logger.Info("running")
967967
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
968968
if err != nil {
@@ -1000,7 +1000,7 @@ func (c *CoordinatorClient) runQuery(
10001000
func (c *CoordinatorClient) RunQuery(
10011001
verifier ResponseVerifier, query string, headers map[string][]string,
10021002
) error {
1003-
logger := c.logger.With(ZapMethod("runQuery"),
1003+
logger := c.logger.With(zapMethod("runQuery"),
10041004
zap.String("query", query))
10051005
err := c.retryFunc(func() error {
10061006
err := c.query(verifier, query, headers)
@@ -1067,7 +1067,7 @@ func (c *CoordinatorClient) GraphiteQuery(
10671067

10681068
url := c.makeURL(queryStr)
10691069
logger := c.logger.With(
1070-
ZapMethod("graphiteQuery"), zap.String("url", url))
1070+
zapMethod("graphiteQuery"), zap.String("url", url))
10711071
logger.Info("running")
10721072
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil)
10731073
if err != nil {

src/integration/resources/docker/dockerexternal/etcd.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,6 @@ func (c *EtcdNode) Setup(ctx context.Context) (closeErr error) {
151151
// This is coming from the equivalent of docker inspect <container_id>
152152
portBinds := container.NetworkSettings.Ports["2379/tcp"]
153153

154-
// If running in a docker container e.g. on buildkite, route to etcd using the published port on the *host* machine.
155-
// See also http://github.com/m3db/m3/blob/master/docker-compose.yml#L16-L16
156154
ipAddr := "127.0.0.1"
157155
_, err = net.ResolveIPAddr("ip4", "host.docker.internal")
158156
if err == nil {

0 commit comments

Comments
 (0)