Skip to content

Commit 6bef99b

Browse files
authored
chore(ci): improve Docker Compose files for a more robust experience running integration tests (#182)
1 parent a6265f3 commit 6bef99b

File tree

5 files changed

+51
-106
lines changed

5 files changed

+51
-106
lines changed

Dockercompose.test.yml

Lines changed: 33 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,123 +1,83 @@
11
---
2-
version: '2'
2+
version: '3'
3+
34
services:
4-
zookeeper-1:
5+
zookeeper:
56
image: confluentinc/cp-zookeeper:latest
67
environment:
7-
ZOOKEEPER_SERVER_ID: 1
88
ZOOKEEPER_CLIENT_PORT: 22181
9-
ZOOKEEPER_TICK_TIME: 2000
10-
ZOOKEEPER_INIT_LIMIT: 5
11-
ZOOKEEPER_SYNC_LIMIT: 2
12-
ZOOKEEPER_SERVERS: localhost:22888:23888;localhost:32888:33888;localhost:42888:43888
13-
network_mode: host
14-
extra_hosts:
15-
- "moby:127.0.0.1"
16-
logging:
17-
driver: none # change this to json-file if you want to debug kafka
18-
19-
zookeeper-2:
20-
image: confluentinc/cp-zookeeper:latest
21-
environment:
22-
ZOOKEEPER_SERVER_ID: 2
23-
ZOOKEEPER_CLIENT_PORT: 32181
24-
ZOOKEEPER_TICK_TIME: 2000
25-
ZOOKEEPER_INIT_LIMIT: 5
26-
ZOOKEEPER_SYNC_LIMIT: 2
27-
ZOOKEEPER_SERVERS: localhost:22888:23888;localhost:32888:33888;localhost:42888:43888
28-
network_mode: host
29-
extra_hosts:
30-
- "moby:127.0.0.1"
31-
logging:
32-
driver: none # change this to json-file if you want to debug kafka
33-
34-
zookeeper-3:
35-
image: confluentinc/cp-zookeeper:latest
36-
environment:
37-
ZOOKEEPER_SERVER_ID: 3
38-
ZOOKEEPER_CLIENT_PORT: 42181
39-
ZOOKEEPER_TICK_TIME: 2000
40-
ZOOKEEPER_INIT_LIMIT: 5
41-
ZOOKEEPER_SYNC_LIMIT: 2
42-
ZOOKEEPER_SERVERS: localhost:22888:23888;localhost:32888:33888;localhost:42888:43888
43-
network_mode: host
44-
extra_hosts:
45-
- "moby:127.0.0.1"
9+
ports:
10+
- 22181:22181
4611
logging:
4712
driver: none # change this to json-file if you want to debug kafka
4813

4914
kafka-1:
5015
image: confluentinc/cp-kafka:latest
51-
network_mode: host
16+
hostname: kafka-1
5217
depends_on:
53-
- zookeeper-1
54-
- zookeeper-2
55-
- zookeeper-3
18+
- zookeeper
5619
environment:
5720
KAFKA_BROKER_ID: 1
58-
KAFKA_ZOOKEEPER_CONNECT: localhost:22181,localhost:32181,localhost:42181
59-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:19092
21+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
22+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:22181
23+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:19092,PLAINTEXT_HOST://localhost:19093
6024
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
61-
extra_hosts:
62-
- "moby:127.0.0.1"
25+
ports:
26+
- 19093:19093 # From outside docker network
6327
logging:
6428
driver: none # change this to json-file if you want to debug kafka
6529

6630
kafka-2:
6731
image: confluentinc/cp-kafka:latest
68-
network_mode: host
32+
hostname: kafka-2
6933
depends_on:
70-
- zookeeper-1
71-
- zookeeper-2
72-
- zookeeper-3
34+
- zookeeper
7335
environment:
7436
KAFKA_BROKER_ID: 2
75-
KAFKA_ZOOKEEPER_CONNECT: localhost:22181,localhost:32181,localhost:42181
76-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:29092
37+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
38+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:22181
39+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://localhost:29093
7740
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
78-
extra_hosts:
79-
- "moby:127.0.0.1"
41+
ports:
42+
- 29092:29092
43+
- 29093:29093 # From outside docker network
8044
logging:
81-
driver: none # change this to json-file if you want to debug kafka
45+
driver: none # change this to json-file if you want to debug kafka
8246

8347
kafka-3:
8448
image: confluentinc/cp-kafka:latest
85-
network_mode: host
49+
hostname: kafka-3
8650
depends_on:
87-
- zookeeper-1
88-
- zookeeper-2
89-
- zookeeper-3
51+
- zookeeper
9052
environment:
9153
KAFKA_BROKER_ID: 3
92-
KAFKA_ZOOKEEPER_CONNECT: localhost:22181,localhost:32181,localhost:42181
93-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:39092
54+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
55+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:22181
56+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:39092,PLAINTEXT_HOST://localhost:39093
9457
KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
95-
extra_hosts:
96-
- "moby:127.0.0.1"
58+
ports:
59+
- 39092:39092
60+
- 39093:39093 # From outside docker network
9761
logging:
98-
driver: none # change this to json-file if you want to debug kafka
62+
driver: none # change this to json-file if you want to debug kafka
9963

10064
kafka_setup:
10165
build:
10266
context: .
10367
dockerfile: Dockerfile.setup
10468
depends_on:
105-
- zookeeper-1
106-
- zookeeper-2
107-
- zookeeper-3
69+
- zookeeper
10870
- kafka-1
10971
- kafka-2
11072
- kafka-3
111-
network_mode: host
11273
logging:
113-
driver: none # change this to json-file if you want to debug kafka setup
74+
driver: none # change this to json-file if you want to debug kafka setup
11475

11576
golang_tests:
11677
build:
11778
context: .
11879
dockerfile: Dockerfile.test
11980
depends_on:
12081
- kafka_setup
121-
network_mode: host
12282
environment:
123-
KAFKA_TEST_BROKERS: localhost:19092,localhost:29092,localhost:39092
83+
KAFKA_TEST_BROKERS: kafka-1:19092,kafka-2:29092,kafka-3:39092

Dockerfile.setup

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
FROM confluentinc/cp-kafka
22

33
# Create the Kafka Topic & sleep forever
4-
CMD kafka-topics --create --bootstrap-server localhost:19092 --replication-factor 1 --partitions 3 --topic gotest && sleep infinity
4+
CMD bash -c "kafka-topics --delete --bootstrap-server kafka-1:19092 --topic gotest || true && sleep 1 && kafka-topics --create --bootstrap-server kafka-1:19092 --replication-factor 1 --partitions 3 --topic gotest && sleep infinity"

kafka/consume.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ type ConfluentConsumer struct {
6262
// permission on the group coordinator for managing commits, so it needs a consumer group in the broker.
6363
// In order to simplify, the default consumer group id is copied from the configured topic name, so make sure you have a
6464
// policy that gives permission to such consumer group.
65-
func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) {
65+
func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error) {
6666
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
6767
kafkaConf := conf.baseKafkaConfig()
6868
_ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. It is mandatory for detached consumers.
@@ -128,7 +128,7 @@ func NewDetachedConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt)
128128
// NewConsumer creates a ConfluentConsumer based on config.
129129
// - NOTE if the partition is set and the partition key is not set in config we have no way
130130
// of knowing where to assign the consumer to in the case of a rebalance
131-
func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consumer, error) {
131+
func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (*ConfluentConsumer, error) {
132132
// See Reference at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
133133
kafkaConf := conf.baseKafkaConfig()
134134
_ = kafkaConf.SetKey("enable.auto.offset.store", false) // manually StoreOffset after processing a message. Otherwise races may happen.)
@@ -173,7 +173,7 @@ func NewConsumer(log logrus.FieldLogger, conf Config, opts ...ConfigOpt) (Consum
173173
}
174174

175175
logFields["kafka_partition_key"] = cc.conf.Consumer.PartitionKey
176-
logFields["kafka_partition"] = cc.conf.Consumer.Partition
176+
logFields["kafka_partition"] = *cc.conf.Consumer.Partition
177177
}
178178

179179
cc.setupRebalanceHandler()

kafka/consume_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,5 @@ func consumer(t *testing.T) (*ConfluentConsumer, Config) {
3838
c, err := NewConsumer(logger(), conf)
3939
require.NoError(t, err)
4040

41-
return c.(*ConfluentConsumer), conf
41+
return c, conf
4242
}

kafka/integration_test.go

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
)
1616

1717
func TestIntegration(t *testing.T) {
18-
1918
testBrokers := os.Getenv("KAFKA_TEST_BROKERS")
2019
if testBrokers == "" {
2120
t.Skipf("No local Kafka broker available to run tests")
@@ -36,16 +35,14 @@ func TestIntegration(t *testing.T) {
3635
assert := assert.New(t)
3736

3837
ctx := context.Background()
39-
offset := int64(0)
4038

4139
// create netlify kafka config
4240
conf := Config{
4341
Brokers: strings.Split(testBrokers, ","),
4442
Topic: "gotest",
4543
Consumer: ConsumerConfig{
46-
GroupID: "gotest",
47-
PartitionKey: "test",
48-
InitialOffset: &offset,
44+
GroupID: "gotest",
45+
PartitionKey: "test",
4946
},
5047
}
5148

@@ -65,7 +62,7 @@ func TestIntegration(t *testing.T) {
6562
assert.NoError(err)
6663
assert.Len(parts, 3)
6764

68-
c, err := NewConsumer(log, conf)
65+
c, err := NewDetachedConsumer(log, conf)
6966
assert.NoError(err)
7067
assert.NotNil(c)
7168

@@ -82,35 +79,27 @@ func TestIntegration(t *testing.T) {
8279
}
8380

8481
t := time.Now()
85-
err = p.Produce(ctx, m)
86-
assert.NoError(err)
82+
assert.NoError(p.Produce(ctx, m))
8783

8884
p := GetPartition(k, parts, PartitionerMurMur2)
8985

90-
err = c.AssignPartitionByKey(k, PartitionerMurMur2)
91-
assert.NoError(err)
92-
93-
err = c.SeekToTime(t)
94-
assert.NoError(err)
86+
assert.NoError(c.AssignPartitionByKey(k, PartitionerMurMur2))
87+
assert.NoError(c.SeekToTime(t))
9588

9689
m, err = c.FetchMessage(ctx)
9790
assert.NoError(err)
9891
assert.NotNil(m)
9992
assert.Equal([]byte(k), m.Key, "Partition to read from: %d, Msg: %+v", p, m)
10093
assert.Equal([]byte(v), m.Value, "Partition to read from: %d, Msg: %+v", p, m)
10194

102-
err = c.CommitMessage(m)
103-
assert.NoError(err)
104-
95+
assert.NoError(c.CommitMessage(m))
10596
}
10697

10798
// chaos 🙈🙊🙉
10899
// force a rebalance event
109100
chaosTest(testBrokers, assert)
110101

111-
err = c.Close()
112-
assert.NoError(err)
113-
102+
assert.NoError(c.Close())
114103
})
115104

116105
t.Run("ConsumerWithGroup", func(t *testing.T) {
@@ -143,20 +132,17 @@ func TestIntegration(t *testing.T) {
143132
Value: []byte(val),
144133
}
145134

146-
err = p.Produce(ctx, m)
147-
assert.NoError(err)
135+
assert.NoError(p.Produce(ctx, m))
148136

149-
c, err := NewConsumer(log, conf, WithConsumerGroupID("gotest"))
137+
c, err := NewConsumer(log, conf) // Consumer attached to consumer group
150138
assert.NoError(err)
151139
assert.NotNil(c)
152140

153141
m, err = c.FetchMessage(ctx)
154142
assert.NoError(err)
155143
assert.Contains(string(m.Value), val)
156-
assert.Equal(kafkalib.Offset(30), m.TopicPartition.Offset)
157144

158-
err = c.CommitMessage(m)
159-
assert.NoError(err)
145+
assert.NoError(c.CommitMessage(m))
160146

161147
// chaos 🙈🙊🙉
162148
// force a rebalance event
@@ -171,15 +157,15 @@ func TestIntegration(t *testing.T) {
171157
assert := assert.New(t)
172158

173159
ctx := context.Background()
174-
offset := int64(1)
160+
initialOffset := int64(1)
175161

176162
// create netlify kafka config
177163
conf := Config{
178164
Brokers: strings.Split(testBrokers, ","),
179165
Topic: "gotest",
180166
Consumer: ConsumerConfig{
181167
GroupID: "gotest",
182-
InitialOffset: &offset,
168+
InitialOffset: &initialOffset,
183169
},
184170
}
185171

@@ -246,5 +232,4 @@ func chaosTest(testBrokers string, assert *assert.Assertions) {
246232
assert.NotNil(results)
247233
a.Close()
248234
}
249-
250235
}

0 commit comments

Comments
 (0)