-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathoptions.go
205 lines (180 loc) · 5.26 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package kubemq
import (
"errors"
"strings"
"time"
)
const kubeMQAuthTokenHeader = "authorization"
type Option interface {
apply(*Options)
}
type TransportType int
const (
TransportTypeGRPC TransportType = iota
TransportTypeRest
)
type Options struct {
host string
port int
isSecured bool
certFile string
certData string
serverOverrideDomain string
authToken string
clientId string
receiveBufferSize int
defaultChannel string
defaultCacheTTL time.Duration
transportType TransportType
restUri string
webSocketUri string
autoReconnect bool
reconnectInterval time.Duration
maxReconnect int
checkConnection bool
}
type funcOptions struct {
fn func(*Options)
}
func (fo *funcOptions) apply(o *Options) {
fo.fn(o)
}
func newFuncOption(f func(*Options)) *funcOptions {
return &funcOptions{
fn: f,
}
}
// WithAddress - set host and port address of KubeMQ server
func WithAddress(host string, port int) Option {
return newFuncOption(func(o *Options) {
o.host = host
o.port = port
})
}
// WithUriAddress - set uri address of KubeMQ server
func WithUri(uri string) Option {
return newFuncOption(func(o *Options) {
o.restUri = uri
o.webSocketUri = strings.Replace(uri, "http", "ws", 1)
})
}
// WithCredentials - set secured TLS credentials from the input certificate file for client.
// serverNameOverride is for testing only. If set to a non empty string,
// it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCredentials(certFile, serverOverrideDomain string) Option {
return newFuncOption(func(o *Options) {
o.isSecured = true
o.certFile = certFile
o.serverOverrideDomain = serverOverrideDomain
})
}
// WithCertificate - set secured TLS credentials from the input certificate data for client.
// serverNameOverride is for testing only. If set to a non empty string,
// it will override the virtual host name of authority (e.g. :authority header field) in requests.
func WithCertificate(certData, serverOverrideDomain string) Option {
return newFuncOption(func(o *Options) {
o.isSecured = true
o.certData = certData
o.serverOverrideDomain = serverOverrideDomain
})
}
// WithAuthToken - set KubeMQ JWT Auth token to be used for KubeMQ connection
func WithAuthToken(token string) Option {
return newFuncOption(func(o *Options) {
o.authToken = token
})
}
// WithClientId - set client id to be used in all functions call with this client - mandatory
func WithClientId(id string) Option {
return newFuncOption(func(o *Options) {
o.clientId = id
})
}
// WithReceiveBufferSize - set length of buffered channel to be set in all subscriptions
func WithReceiveBufferSize(size int) Option {
return newFuncOption(func(o *Options) {
o.receiveBufferSize = size
})
}
// WithDefaultChannel - set default channel for any outbound requests
func WithDefaultChannel(channel string) Option {
return newFuncOption(func(o *Options) {
o.defaultChannel = channel
})
}
// WithDefaultCacheTTL - set default cache time to live for any query requests with any CacheKey set value
func WithDefaultCacheTTL(ttl time.Duration) Option {
return newFuncOption(func(o *Options) {
o.defaultCacheTTL = ttl
})
}
// WithAutoReconnect - set automatic reconnection in case of lost connectivity to server
func WithAutoReconnect(value bool) Option {
return newFuncOption(func(o *Options) {
o.autoReconnect = value
})
}
// WithReconnectInterval - set reconnection interval duration, default is 5 seconds
func WithReconnectInterval(duration time.Duration) Option {
return newFuncOption(func(o *Options) {
o.reconnectInterval = duration
})
}
// WithMaxReconnects - set max reconnects before return error, default 0, never.
func WithMaxReconnects(value int) Option {
return newFuncOption(func(o *Options) {
o.maxReconnect = value
})
}
// WithTransportType - set client transport type, currently GRPC or Rest
func WithTransportType(transportType TransportType) Option {
return newFuncOption(func(o *Options) {
o.transportType = transportType
})
}
// WithCheckConnection - set server connectivity on client create
func WithCheckConnection(value bool) Option {
return newFuncOption(func(o *Options) {
o.checkConnection = value
})
}
func GetDefaultOptions() *Options {
return &Options{
host: "",
port: 0,
isSecured: false,
certFile: "",
certData: "",
serverOverrideDomain: "",
authToken: "",
clientId: "ClientId",
receiveBufferSize: 10,
defaultChannel: "",
defaultCacheTTL: time.Minute * 15,
transportType: 0,
restUri: "",
webSocketUri: "",
autoReconnect: false,
reconnectInterval: 5 * time.Second,
maxReconnect: 0,
checkConnection: false,
}
}
func (o *Options) Validate() error {
switch o.transportType {
case TransportTypeGRPC:
if o.host == "" {
return errors.New("invalid host")
}
if o.port <= 0 {
return errors.New("invalid port")
}
case TransportTypeRest:
if o.restUri == "" {
return errors.New("invalid address uri")
}
default:
return errors.New("no transport type was set")
}
return nil
}