Skip to content

Commit 2276fe8

Browse files
authored
Configurable Pulsar TLS parameters (#51)
* configuration Pulsar TLS parameters, consolidate pulsar.Client creation in a function * upversion apache-go-client to pick brokerTlsUrl redirect fix
1 parent 3b58380 commit 2276fe8

File tree

8 files changed

+113
-87
lines changed

8 files changed

+113
-87
lines changed

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ go 1.13
44

55
require (
66
github.com/DataDog/zstd v1.4.4 // indirect
7-
github.com/apache/pulsar-client-go v0.0.0-20200214184451-fc390a6a37f3
7+
github.com/apache/pulsar-client-go v0.1.1-0.20200425133951-6edc8f4ef954
88
github.com/dgrijalva/jwt-go v3.2.0+incompatible
99
github.com/ghodss/yaml v1.0.0
1010
github.com/golang/snappy v0.0.1 // indirect
@@ -18,5 +18,4 @@ require (
1818
github.com/xdg/stringprep v1.0.0 // indirect
1919
go.mongodb.org/mongo-driver v1.2.0
2020
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 // indirect
21-
golang.org/x/text v0.3.2 // indirect
2221
)

go.sum

Lines changed: 22 additions & 48 deletions
Large diffs are not rendered by default.

scripts/test_coverage.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ ALL_PKGS=""
1212

1313
cd $DIR/../src
1414
for d in */ ; do
15-
if [[ ${d} != "unit-test/" && ${d} != "e2e/" ]] # exclude unit-test for test coverage
15+
if [[ ${d} != "unit-test/" && ${d} != "e2e/" && ${d} != "docs/" ]] # exclude unit-test for test coverage
1616
then
1717
pkg=${d%/}
1818
ALL_PKGS=${ALL_PKGS}","${BASE_PKG_DIR}${pkg}

src/db/pulsardb.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
"fmt"
87
"strings"
98
"sync"
109
"time"
1110

1211
"github.com/apache/pulsar-client-go/pulsar"
1312
"github.com/kafkaesque-io/pulsar-beam/src/model"
13+
"github.com/kafkaesque-io/pulsar-beam/src/pulsardriver"
1414
"github.com/kafkaesque-io/pulsar-beam/src/util"
1515

1616
log "github.com/sirupsen/logrus"
@@ -52,27 +52,13 @@ func (s *PulsarHandler) Init() error {
5252
s.topicName = util.GetConfig().DbName
5353
tokenStr := util.GetConfig().DbPassword
5454

55-
s.logger.Infof("pulsar URL: %s token: %s", pulsarURL, tokenStr)
56-
clientOpt := pulsar.ClientOptions{
57-
URL: pulsarURL,
58-
OperationTimeout: 30 * time.Second,
59-
ConnectionTimeout: 30 * time.Second,
60-
}
61-
62-
if tokenStr != "" {
63-
clientOpt.Authentication = pulsar.NewAuthenticationToken(tokenStr)
64-
}
65-
66-
if strings.HasPrefix(pulsarURL, "pulsar+ssl://") {
67-
trustStore := util.GetConfig().TrustStore //"/etc/ssl/certs/ca-bundle.crt"
68-
if trustStore == "" {
69-
return fmt.Errorf("this is fatal that we are missing trustStore while pulsar+ssl is required")
70-
}
71-
clientOpt.TLSTrustCertsFilePath = trustStore
55+
s.logger.Infof("database pulsar URL: %s", pulsarURL)
56+
if log.GetLevel() == log.DebugLevel {
57+
s.logger.Debugf("database pulsar token string is %s", tokenStr)
7258
}
7359

7460
var err error
75-
s.client, err = pulsar.NewClient(clientOpt)
61+
s.client, err = pulsardriver.NewPulsarClient(pulsarURL, tokenStr)
7662
if err != nil {
7763
// this would be a serious problem so that we return with error
7864
return err

src/pulsardriver/pulsar-client.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,23 +61,7 @@ func (c *PulsarClient) GetClient(url, tokenStr string) (pulsar.Client, error) {
6161
return c.client, nil
6262
}
6363

64-
clientOpt := pulsar.ClientOptions{
65-
URL: url,
66-
OperationTimeout: time.Duration(clientOpsTimeout) * time.Second,
67-
ConnectionTimeout: time.Duration(clientConnectTimeout) * time.Second,
68-
}
69-
70-
if tokenStr != "" {
71-
clientOpt.Authentication = pulsar.NewAuthenticationToken(tokenStr)
72-
}
73-
74-
if strings.HasPrefix(url, "pulsar+ssl://") {
75-
trustStore := util.AssignString(util.GetConfig().TrustStore, "/etc/ssl/certs/ca-bundle.crt")
76-
clientOpt.TLSTrustCertsFilePath = trustStore
77-
}
78-
79-
driver, err := pulsar.NewClient(clientOpt)
80-
64+
driver, err := NewPulsarClient(url, tokenStr)
8165
if err != nil {
8266
log.Errorf("failed instantiate pulsar client %v", err)
8367
return nil, fmt.Errorf("Could not instantiate Pulsar client: %v", err)
@@ -110,3 +94,40 @@ func (c *PulsarClient) Reconnect() (pulsar.Client, error) {
11094
c.Close()
11195
return c.GetClient(c.pulsarURL, c.token)
11296
}
97+
98+
// NewPulsarClient always creates a new pulsar.Client connection
99+
func NewPulsarClient(url, tokenStr string) (pulsar.Client, error) {
100+
clientOpt := pulsar.ClientOptions{
101+
URL: url,
102+
OperationTimeout: time.Duration(clientOpsTimeout) * time.Second,
103+
ConnectionTimeout: time.Duration(clientConnectTimeout) * time.Second,
104+
}
105+
106+
if tokenStr != "" {
107+
clientOpt.Authentication = pulsar.NewAuthenticationToken(tokenStr)
108+
}
109+
110+
if strings.HasPrefix(url, "pulsar+ssl://") {
111+
trustStore := util.GetConfig().TrustStore //"/etc/ssl/certs/ca-bundle.crt"
112+
if trustStore == "" {
113+
return nil, fmt.Errorf("this is fatal that we are missing trustStore while pulsar+ssl is required")
114+
}
115+
clientOpt.TLSTrustCertsFilePath = trustStore
116+
}
117+
118+
// default is false for these two configuration parameters
119+
clientOpt.TLSAllowInsecureConnection = util.StringToBool(util.GetConfig().PulsarTLSAllowInsecureConnection)
120+
clientOpt.TLSValidateHostname = util.StringToBool(util.GetConfig().PulsarTLSValidateHostname)
121+
122+
driver, err := pulsar.NewClient(clientOpt)
123+
124+
if err != nil {
125+
log.Errorf("failed instantiate pulsar client %v", err)
126+
return nil, fmt.Errorf("Could not instantiate Pulsar client: %v", err)
127+
}
128+
if log.GetLevel() == log.DebugLevel {
129+
log.Debugf("pulsar client url %s\n token %s", url, tokenStr)
130+
}
131+
132+
return driver, nil
133+
}

src/unit-test/util_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,28 @@ func TestConcurrencyTTLCache(t *testing.T) {
369369
assert(t, 14 == cache.Count(), "some objects have expired")
370370
assert(t, !object1.isClosed, "object1 has not expired yet")
371371
}
372+
373+
func TestStringToBoo(t *testing.T) {
374+
// true cases
375+
assert(t, StringToBool("true"), "string true yields boolean true")
376+
assert(t, StringToBool("True"), "string True yields boolean true")
377+
assert(t, StringToBool(" tRue"), "string tRue with space yields boolean true")
378+
assert(t, StringToBool("yes"), "string true yields boolean true")
379+
assert(t, StringToBool("1"), "string true yields boolean true")
380+
assert(t, StringToBool("enable"), "string true yields boolean true")
381+
assert(t, StringToBool(" Enabled "), "string Enabled with space yields boolean true")
382+
assert(t, StringToBool("ok"), "string ok yields boolean true")
383+
assert(t, StringToBool("Ok"), "string Ok yields boolean true")
384+
385+
// false cases
386+
assert(t, !StringToBool(" "), "string space yields boolean false")
387+
assert(t, !StringToBool(""), "string empty string yields boolean false")
388+
assert(t, !StringToBool(" t rue"), "string t rue with space yields boolean false")
389+
assert(t, !StringToBool("no"), "string no yields boolean false")
390+
assert(t, !StringToBool("10"), "string 10 yields boolean false")
391+
assert(t, !StringToBool("0"), "string 0 yields boolean false")
392+
assert(t, !StringToBool("notok"), "string notok yields boolean false")
393+
assert(t, !StringToBool("disable"), "string disable yields boolean false")
394+
assert(t, !StringToBool("adsfasdf"), "string any string yields boolean false")
395+
396+
}

src/util/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ type Configuration struct {
4444
SuperRoles string `json:"SuperRoles"`
4545
PulsarBrokerURL string `json:"PulsarBrokerURL"`
4646

47+
// Configure whether the Pulsar client accept untrusted TLS certificate from broker (default: false)
48+
// Set to `true` to enable
49+
PulsarTLSAllowInsecureConnection string `json:"PulsarTLSAllowInsecureConnection"`
50+
51+
// Configure whether the Pulsar client verify the validity of the host name from broker (default: false)
52+
// Set to `true` to enable
53+
PulsarTLSValidateHostname string `json:"PulsarTLSValidateHostname"`
54+
4755
// Webhook consumers pool checked interval to stop deleted consumers and start new ones
4856
// default value 180s
4957
PbDbInterval string `json:"PbDbInterval"`

src/util/util.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,16 @@ func GetEnvInt(env string, defaultNum int) int {
110110
}
111111
return defaultNum
112112
}
113+
114+
// StringToBool format various strings to boolean
115+
// strconv.ParseBool only covers `true` and `false` cases
116+
func StringToBool(str string) bool {
117+
s := strings.ToLower(strings.TrimSpace(str))
118+
119+
// `1` is true because the default Golang boolean is initialized as false
120+
if s == "true" || s == "yes" || s == "enable" || s == "enabled" || s == "1" || s == "ok" {
121+
return true
122+
}
123+
124+
return false
125+
}

0 commit comments

Comments
 (0)