|
1 |
| -// Package kafkatest provides a package intended for running tests |
2 |
| -// that require a Kafka backend. |
3 | 1 | package kafkatest
|
4 | 2 |
|
5 | 3 | import (
|
6 |
| - "crypto/rand" |
7 |
| - "fmt" |
8 |
| - "log" |
9 |
| - "net" |
10 |
| - "os" |
11 |
| - "strconv" |
12 |
| - "strings" |
13 |
| - "time" |
14 |
| - |
15 |
| - "github.com/Shopify/sarama" |
16 |
| - "gopkg.in/retry.v1" |
| 4 | + _ "github.com/Shopify/sarama" |
17 | 5 | )
|
18 |
| - |
19 |
| -var ErrDisabled = fmt.Errorf("kafka tests are disabled") |
20 |
| - |
21 |
| -// New connects to a Kafka instance and returns a Kafka |
22 |
| -// instance that uses it. |
23 |
| -// |
24 |
| -// The following environment variables can be used to |
25 |
| -// configure the connection parameters: |
26 |
| -// |
27 |
| -// - $KAFKA_DISABLE |
28 |
| -// A boolean as parsed by strconv.ParseBool. If this is true, |
29 |
| -// New will return ErrDisabled. |
30 |
| -// - $KAFKA_ADDRS |
31 |
| -// A comma-separate list of Kafka broker addresses in host:port |
32 |
| -// form. If this is empty, localhost:9092 will be used. |
33 |
| -// The list of address can be discovered by calling Client.Addrs. |
34 |
| -// - $KAFKA_USERNAME, $KAFKA_PASWORD |
35 |
| -// The username and password to use for SASL authentication. |
36 |
| -// When $KAFKA_USERNAME is non-empty, SASL will be |
37 |
| -// enabled. |
38 |
| -// - $KAFKA_USE_TLS |
39 |
| -// A boolean as parsed by strconv.ParseBool. If this |
40 |
| -// is true, a secure TLS connection will be used. |
41 |
| -// |
42 |
| -// - $KAFKA_TIMEOUT |
43 |
| -// The maximum duration to wait when trying to connect |
44 |
| -// to Kakfa. Defaults to "30s". |
45 |
| -// |
46 |
| -// The returned Kafka instance must be closed after use. |
47 |
| -func New() (*Kafka, error) { |
48 |
| - disabled, err := boolVar("KAFKA_DISABLE") |
49 |
| - if err != nil { |
50 |
| - return nil, fmt.Errorf("bad value for $KAFKA_DISABLE: %v", err) |
51 |
| - } |
52 |
| - if disabled { |
53 |
| - return nil, ErrDisabled |
54 |
| - } |
55 |
| - addrsStr := os.Getenv("KAFKA_ADDRS") |
56 |
| - if addrsStr == "" { |
57 |
| - addrsStr = "localhost:9092" |
58 |
| - } |
59 |
| - _, err = net.Dial("tcp", addrsStr) |
60 |
| - log.Printf("in kafkatest: net.Dial %q: %v", addrsStr, err) |
61 |
| - addrs := strings.Split(addrsStr, ",") |
62 |
| - useTLS, err := boolVar("KAFKA_USE_TLS") |
63 |
| - if err != nil { |
64 |
| - return nil, fmt.Errorf("bad value for KAFKA_USE_TLS: %v", err) |
65 |
| - } |
66 |
| - client := &Kafka{ |
67 |
| - addrs: addrs, |
68 |
| - useTLS: useTLS, |
69 |
| - saslUser: os.Getenv("KAFKA_USERNAME"), |
70 |
| - saslPassword: os.Getenv("KAFKA_PASSWORD"), |
71 |
| - } |
72 |
| - // The cluster might not be available immediately, so try |
73 |
| - // for a while before giving up. |
74 |
| - retryLimit := 30 * time.Second |
75 |
| - if limit := os.Getenv("KAFKA_TIMEOUT"); limit != "" { |
76 |
| - retryLimit, err = time.ParseDuration(limit) |
77 |
| - if err != nil { |
78 |
| - return nil, fmt.Errorf("bad value for KAFKA_TIMEOUT: %v", err) |
79 |
| - } |
80 |
| - } |
81 |
| - retryStrategy := retry.LimitTime(retryLimit, retry.Exponential{ |
82 |
| - Initial: time.Millisecond, |
83 |
| - MaxDelay: time.Second, |
84 |
| - }) |
85 |
| - for a := retry.Start(retryStrategy, nil); a.Next(); { |
86 |
| - admin, err := sarama.NewClusterAdmin(addrs, client.Config()) |
87 |
| - if err == nil { |
88 |
| - client.admin = admin |
89 |
| - break |
90 |
| - } |
91 |
| - if !a.More() { |
92 |
| - return nil, fmt.Errorf("cannot connect to Kafka cluster at %q after %v: %v", addrs, retryLimit, err) |
93 |
| - } |
94 |
| - } |
95 |
| - return client, nil |
96 |
| -} |
97 |
| - |
98 |
| -// Kafka represents a connection to a Kafka cluster. |
99 |
| -type Kafka struct { |
100 |
| - addrs []string |
101 |
| - useTLS bool |
102 |
| - saslUser string |
103 |
| - saslPassword string |
104 |
| - admin sarama.ClusterAdmin |
105 |
| - topics []string |
106 |
| -} |
107 |
| - |
108 |
| -// Config returns a sarama configuration that will |
109 |
| -// use connection parameters defined in the environment |
110 |
| -// variables described in New. |
111 |
| -func (k *Kafka) Config() *sarama.Config { |
112 |
| - cfg := sarama.NewConfig() |
113 |
| - k.InitConfig(cfg) |
114 |
| - return cfg |
115 |
| -} |
116 |
| - |
117 |
| -// InitConfig is similar to Config, except that instead of |
118 |
| -// returning a new configuration, it configures an existing |
119 |
| -// one. |
120 |
| -func (k *Kafka) InitConfig(cfg *sarama.Config) { |
121 |
| - if cfg.Version == sarama.MinVersion { |
122 |
| - cfg.Version = sarama.V1_0_0_0 |
123 |
| - } |
124 |
| - cfg.Net.TLS.Enable = k.useTLS |
125 |
| - if k.saslUser != "" { |
126 |
| - cfg.Net.SASL.Enable = true |
127 |
| - cfg.Net.SASL.User = k.saslUser |
128 |
| - cfg.Net.SASL.Password = k.saslPassword |
129 |
| - } |
130 |
| -} |
131 |
| - |
132 |
| -// Addrs returns the configured Kakfa broker addresses. |
133 |
| -func (k *Kafka) Addrs() []string { |
134 |
| - return k.addrs |
135 |
| -} |
136 |
| - |
137 |
| -// NewTopic creates a new Kafka topic with a random name and |
138 |
| -// single partition. It returns the topic's name. The topic will be deleted |
139 |
| -// when c.Close is called. |
140 |
| -// |
141 |
| -// NewTopic panics if the topic cannot be created. |
142 |
| -func (k *Kafka) NewTopic() string { |
143 |
| - if k.admin == nil { |
144 |
| - panic("cannot create topic with closed kafkatest.Kafka instance") |
145 |
| - } |
146 |
| - topic := randomName("kafkatest-") |
147 |
| - if err := k.admin.CreateTopic(topic, &sarama.TopicDetail{ |
148 |
| - NumPartitions: 1, |
149 |
| - ReplicationFactor: 1, |
150 |
| - }, false); err != nil { |
151 |
| - panic(fmt.Errorf("cannot create topic %q: %v", topic, err)) |
152 |
| - } |
153 |
| - k.topics = append(k.topics, topic) |
154 |
| - return topic |
155 |
| -} |
156 |
| - |
157 |
| -// Close closes the client connection and removes any topics |
158 |
| -// created by Topic. This method may be called more than once. |
159 |
| -func (k *Kafka) Close() error { |
160 |
| - if k.admin == nil { |
161 |
| - return nil |
162 |
| - } |
163 |
| - for ; len(k.topics) != 0; k.topics = k.topics[1:] { |
164 |
| - if err := k.admin.DeleteTopic(k.topics[0]); err != nil { |
165 |
| - return fmt.Errorf("cannot delete topic %q: %v", k.topics[0], err) |
166 |
| - } |
167 |
| - } |
168 |
| - k.admin.Close() |
169 |
| - k.admin = nil |
170 |
| - return nil |
171 |
| -} |
172 |
| - |
173 |
| -func boolVar(envVar string) (bool, error) { |
174 |
| - s := os.Getenv(envVar) |
175 |
| - if s == "" { |
176 |
| - return false, nil |
177 |
| - } |
178 |
| - b, err := strconv.ParseBool(s) |
179 |
| - if err != nil { |
180 |
| - return false, fmt.Errorf("invalid boolean value %q (possible values are: 1, t, T, TRUE, true, True, 0, f, F, FALSE)", s) |
181 |
| - } |
182 |
| - return b, nil |
183 |
| -} |
184 |
| - |
185 |
| -func randomName(prefix string) string { |
186 |
| - buf := make([]byte, 8) |
187 |
| - if _, err := rand.Read(buf); err != nil { |
188 |
| - panic(err) |
189 |
| - } |
190 |
| - return fmt.Sprintf("%s%x", prefix, buf) |
191 |
| -} |
0 commit comments