10
10
# Contributor(s):
11
11
# Mike Trinkala ([email protected] )
12
12
13
+
13
14
#
14
15
# ***** END LICENSE BLOCK *****/
15
16
@@ -27,6 +28,7 @@ import (
27
28
"github.com/Shopify/sarama"
28
29
"github.com/mozilla-services/heka/message"
29
30
"github.com/mozilla-services/heka/pipeline"
31
+ "github.com/mozilla-services/heka/plugins/tcp"
30
32
)
31
33
32
34
type KafkaInputConfig struct {
@@ -39,6 +41,10 @@ type KafkaInputConfig struct {
39
41
WaitForElection uint32 `toml:"wait_for_election"`
40
42
BackgroundRefreshFrequency uint32 `toml:"background_refresh_frequency"`
41
43
44
+ // TLS Config
45
+ UseTls bool `toml:"use_tls"`
46
+ Tls tcp.TlsConfig
47
+
42
48
// Broker Config
43
49
MaxOpenRequests int `toml:"max_open_reqests"`
44
50
DialTimeout uint32 `toml:"dial_timeout"`
@@ -146,6 +152,13 @@ func (k *KafkaInput) Init(config interface{}) (err error) {
146
152
k .saramaConfig .Metadata .Retry .Backoff = time .Duration (k .config .WaitForElection ) * time .Millisecond
147
153
k .saramaConfig .Metadata .RefreshFrequency = time .Duration (k .config .BackgroundRefreshFrequency ) * time .Millisecond
148
154
155
+ k .saramaConfig .Net .TLS .Enable = k .config .UseTls
156
+ if k .config .UseTls {
157
+ if k .saramaConfig .Net .TLS .Config , err = tcp .CreateGoTlsConfig (& k .config .Tls ); err != nil {
158
+ return fmt .Errorf ("TLS init error: %s" , err )
159
+ }
160
+ }
161
+
149
162
k .saramaConfig .Net .MaxOpenRequests = k .config .MaxOpenRequests
150
163
k .saramaConfig .Net .DialTimeout = time .Duration (k .config .DialTimeout ) * time .Millisecond
151
164
k .saramaConfig .Net .ReadTimeout = time .Duration (k .config .ReadTimeout ) * time .Millisecond
0 commit comments