forked from meshery/meshery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmeshery_controllers.go
400 lines (364 loc) · 13 KB
/
meshery_controllers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
package models
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/layer5io/meshkit/broker/nats"
"github.com/layer5io/meshkit/database"
"github.com/layer5io/meshkit/logger"
"github.com/layer5io/meshkit/models/controllers"
"github.com/layer5io/meshkit/utils"
mesherykube "github.com/layer5io/meshkit/utils/kubernetes"
"github.com/spf13/viper"
)
const (
ChartRepo = "https://meshery.github.io/meshery.io/charts"
MesheryServerBrokerConnection = "meshery-server"
)
type MesheryController int
const (
MesheryBroker MesheryController = iota
Meshsync
MesheryOperator
)
type MesheryControllersHelper struct {
// maps each context with the controller handlers
// this map will be used as the source of truth
ctxControllerHandlersMap map[string]map[MesheryController]controllers.IMesheryController
// maps each context with it's operator status
ctxOperatorStatusMap map[string]controllers.MesheryControllerStatus
// maps each context with a meshsync data handler
ctxMeshsyncDataHandlerMap map[string]MeshsyncDataHandler
mu sync.Mutex
log logger.Handler
oprDepConfig controllers.OperatorDeploymentConfig
dbHandler *database.Handler
}
func (mch *MesheryControllersHelper) GetControllerHandlersForEachContext() map[string]map[MesheryController]controllers.IMesheryController {
return mch.ctxControllerHandlersMap
}
func (mch *MesheryControllersHelper) GetMeshSyncDataHandlersForEachContext() map[string]MeshsyncDataHandler {
return mch.ctxMeshsyncDataHandlerMap
}
func (mch *MesheryControllersHelper) GetOperatorsStatusMap() map[string]controllers.MesheryControllerStatus {
return mch.ctxOperatorStatusMap
}
func NewMesheryControllersHelper(log logger.Handler, operatorDepConfig controllers.OperatorDeploymentConfig, dbHandler *database.Handler) *MesheryControllersHelper {
return &MesheryControllersHelper{
ctxControllerHandlersMap: make(map[string]map[MesheryController]controllers.IMesheryController),
log: log,
oprDepConfig: operatorDepConfig,
ctxOperatorStatusMap: make(map[string]controllers.MesheryControllerStatus),
ctxMeshsyncDataHandlerMap: make(map[string]MeshsyncDataHandler),
dbHandler: dbHandler,
}
}
// initializes Meshsync data handler for the contexts for whom it has not been
// initialized yet. Apart from updating the map, it also runs the handler after
// updating the map. The presence of a handler for a context in a map indicate that
// the meshsync data for that context is properly being handled
func (mch *MesheryControllersHelper) UpdateMeshsynDataHandlers() *MesheryControllersHelper {
// only checking those contexts whose MesheryConrollers are active
go func(mch *MesheryControllersHelper) {
mch.mu.Lock()
defer mch.mu.Unlock()
for ctxID, controllerHandlers := range mch.ctxControllerHandlersMap {
if _, ok := mch.ctxMeshsyncDataHandlerMap[ctxID]; !ok {
// brokerStatus := controllerHandlers[MesheryBroker].GetStatus()
// do something if broker is being deployed , maybe try again after sometime
brokerEndpoint, err := controllerHandlers[MesheryBroker].GetPublicEndpoint()
if brokerEndpoint == "" {
if err != nil {
mch.log.Warn(err)
}
mch.log.Info(fmt.Sprintf("Meshery Broker unreachable for Kubernetes context (%v)", ctxID))
continue
}
mch.log.Info(fmt.Sprintf("Connected to Meshery Broker (%s) for Kubernetes context (%s)", brokerEndpoint, ctxID))
brokerHandler, err := nats.New(nats.Options{
// URLS: []string{"localhost:4222"},
URLS: []string{brokerEndpoint},
ConnectionName: MesheryServerBrokerConnection,
Username: "",
Password: "",
ReconnectWait: 2 * time.Second,
MaxReconnect: 60,
})
if err != nil {
mch.log.Warn(err)
mch.log.Info(fmt.Sprintf("MeshSync not configured for Kubernetes context (%v) due to '%v'", ctxID, err.Error()))
continue
}
mch.log.Info(fmt.Sprintf("Connected to Meshery Broker (%v) for Kubernetes context (%v)", brokerEndpoint, ctxID))
msDataHandler := NewMeshsyncDataHandler(brokerHandler, *mch.dbHandler, mch.log)
err = msDataHandler.Run()
if err != nil {
mch.log.Warn(err)
mch.log.Info(fmt.Sprintf("Unable to connect MeshSync for Kubernetes context (%s) due to: %s", ctxID, err.Error()))
continue
}
mch.ctxMeshsyncDataHandlerMap[ctxID] = *msDataHandler
mch.log.Info(fmt.Sprintf("MeshSync connected for Kubernetes context (%s)", ctxID))
}
}
}(mch)
return mch
}
// attach a MesheryController for each context if
// 1. the config is valid
// 2. if it is not already attached
func (mch *MesheryControllersHelper) UpdateCtxControllerHandlers(ctxs []K8sContext) *MesheryControllersHelper {
go func(mch *MesheryControllersHelper) {
mch.mu.Lock()
defer mch.mu.Unlock()
// resetting this value as a specific controller handler instance does not have any significance opposed to
// a MeshsyncDataHandler instance where it signifies whether or not a listener is attached
mch.ctxControllerHandlersMap = make(map[string]map[MesheryController]controllers.IMesheryController)
for _, ctx := range ctxs {
ctxID := ctx.ID
cfg, _ := ctx.GenerateKubeConfig()
client, err := mesherykube.New(cfg)
// means that the config is invalid
if err != nil {
// invalid configs are not added to the map
continue
}
mch.ctxControllerHandlersMap[ctxID] = map[MesheryController]controllers.IMesheryController{
MesheryBroker: controllers.NewMesheryBrokerHandler(client),
MesheryOperator: controllers.NewMesheryOperatorHandler(client, mch.oprDepConfig),
Meshsync: controllers.NewMeshsyncHandler(client),
}
}
}(mch)
return mch
}
// update the status of MesheryOperator in all the contexts
// for whom MesheryControllers are attached
// should be called after UpdateCtxControllerHandlers
func (mch *MesheryControllersHelper) UpdateOperatorsStatusMap(ot *OperatorTracker) *MesheryControllersHelper {
go func(mch *MesheryControllersHelper) {
mch.mu.Lock()
defer mch.mu.Unlock()
mch.ctxOperatorStatusMap = make(map[string]controllers.MesheryControllerStatus)
for ctxID, ctrlHandler := range mch.ctxControllerHandlersMap {
if ot.IsUndeployed(ctxID) {
mch.ctxOperatorStatusMap[ctxID] = controllers.Undeployed
} else {
mch.ctxOperatorStatusMap[ctxID] = ctrlHandler[MesheryOperator].GetStatus()
}
}
}(mch)
return mch
}
type OperatorTracker struct {
ctxIDtoDeploymentStatus map[string]bool
mx sync.Mutex
DisableOperator bool
}
func NewOperatorTracker(disabled bool) *OperatorTracker {
return &OperatorTracker{
ctxIDtoDeploymentStatus: make(map[string]bool),
mx: sync.Mutex{},
DisableOperator: disabled,
}
}
func (ot *OperatorTracker) Undeployed(ctxID string, undeployed bool) {
if ot.DisableOperator { //no-op when operator is disabled
return
}
ot.mx.Lock()
defer ot.mx.Unlock()
if ot.ctxIDtoDeploymentStatus == nil {
ot.ctxIDtoDeploymentStatus = make(map[string]bool)
}
ot.ctxIDtoDeploymentStatus[ctxID] = undeployed
}
func (ot *OperatorTracker) IsUndeployed(ctxID string) bool {
if ot.DisableOperator { //Return true everytime so that operators stay in undeployed state across all contexts
return true
}
ot.mx.Lock()
defer ot.mx.Unlock()
if ot.ctxIDtoDeploymentStatus == nil {
ot.ctxIDtoDeploymentStatus = make(map[string]bool)
return false
}
return ot.ctxIDtoDeploymentStatus[ctxID]
}
// looks at the status of Meshery Operator for each cluster and takes necessary action.
// it will deploy the operator only when it is in NotDeployed state
func (mch *MesheryControllersHelper) DeployUndeployedOperators(ot *OperatorTracker) *MesheryControllersHelper {
if ot.DisableOperator { //Return true everytime so that operators stay in undeployed state across all contexts
return mch
}
go func(mch *MesheryControllersHelper) {
mch.mu.Lock()
defer mch.mu.Unlock()
for ctxID, ctrlHandler := range mch.ctxControllerHandlersMap {
if oprStatus, ok := mch.ctxOperatorStatusMap[ctxID]; ok {
if oprStatus == controllers.NotDeployed {
err := ctrlHandler[MesheryOperator].Deploy(false)
if err != nil {
mch.log.Error(err)
}
}
}
}
}(mch)
return mch
}
func NewOperatorDeploymentConfig(adapterTracker AdaptersTrackerInterface) controllers.OperatorDeploymentConfig {
// get meshery release version
mesheryReleaseVersion := viper.GetString("BUILD")
if mesheryReleaseVersion == "" || mesheryReleaseVersion == "Not Set" || mesheryReleaseVersion == "edge-latest" {
_, latestRelease, err := CheckLatestVersion(mesheryReleaseVersion)
// if unable to fetch latest release tag, meshkit helm functions handle
// this automatically fetch the latest one
if err != nil {
// mch.log.Error(fmt.Errorf("Couldn't check release tag: %s. Will use latest version", err))
mesheryReleaseVersion = ""
} else {
mesheryReleaseVersion = latestRelease
}
}
return controllers.OperatorDeploymentConfig{
MesheryReleaseVersion: mesheryReleaseVersion,
GetHelmOverrides: func(delete bool) map[string]interface{} {
return setOverrideValues(delete, adapterTracker)
},
HelmChartRepo: ChartRepo,
}
}
// checkLatestVersion takes in the current server version compares it with the target
// and returns the (isOutdated, latestVersion, error)
func CheckLatestVersion(serverVersion string) (*bool, string, error) {
// Inform user of the latest release version
versions, err := utils.GetLatestReleaseTagsSorted("meshery", "meshery")
latestVersion := versions[len(versions)-1]
isOutdated := false
if err != nil {
return nil, "", ErrCreateOperatorDeploymentConfig(err)
}
// Compare current running Meshery server version to the latest available Meshery release on GitHub.
if latestVersion != serverVersion {
isOutdated = true
return &isOutdated, latestVersion, nil
}
return &isOutdated, latestVersion, nil
}
// setOverrideValues detects the currently insalled adapters and sets appropriate
// overrides so as to not uninstall them. It also sets override values for
// operator so that it can be enabled or disabled depending on the need
func setOverrideValues(delete bool, adapterTracker AdaptersTrackerInterface) map[string]interface{} {
installedAdapters := make([]string, 0)
adapters := adapterTracker.GetAdapters(context.TODO())
for _, adapter := range adapters {
if adapter.Name != "" {
installedAdapters = append(installedAdapters, strings.Split(adapter.Location, ":")[0])
}
}
overrideValues := map[string]interface{}{
"fullnameOverride": "meshery-operator",
"meshery": map[string]interface{}{
"enabled": false,
},
"meshery-istio": map[string]interface{}{
"enabled": false,
},
"meshery-cilium": map[string]interface{}{
"enabled": false,
},
"meshery-linkerd": map[string]interface{}{
"enabled": false,
},
"meshery-consul": map[string]interface{}{
"enabled": false,
},
"meshery-kuma": map[string]interface{}{
"enabled": false,
},
"meshery-nsm": map[string]interface{}{
"enabled": false,
},
"meshery-nginx-sm": map[string]interface{}{
"enabled": false,
},
"meshery-traefik-mesh": map[string]interface{}{
"enabled": false,
},
"meshery-app-mesh": map[string]interface{}{
"enabled": false,
},
"meshery-operator": map[string]interface{}{
"enabled": true,
},
}
for _, adapter := range installedAdapters {
if _, ok := overrideValues[adapter]; ok {
overrideValues[adapter] = map[string]interface{}{
"enabled": true,
}
}
}
if delete {
overrideValues["meshery-operator"] = map[string]interface{}{
"enabled": false,
}
}
return overrideValues
}
// setOverrideValues detects the currently insalled adapters and sets appropriate
// overrides so as to not uninstall them.
func SetOverrideValuesForMesheryDeploy(adapters []Adapter, adapter Adapter, install bool) map[string]interface{} {
installedAdapters := make([]string, 0)
for _, adapter := range adapters {
if adapter.Name != "" {
installedAdapters = append(installedAdapters, strings.Split(adapter.Location, ":")[0])
}
}
overrideValues := map[string]interface{}{
"meshery-istio": map[string]interface{}{
"enabled": false,
},
"meshery-cilium": map[string]interface{}{
"enabled": false,
},
"meshery-linkerd": map[string]interface{}{
"enabled": false,
},
"meshery-consul": map[string]interface{}{
"enabled": false,
},
"meshery-kuma": map[string]interface{}{
"enabled": false,
},
"meshery-nsm": map[string]interface{}{
"enabled": false,
},
"meshery-nginx-sm": map[string]interface{}{
"enabled": false,
},
"meshery-traefik-mesh": map[string]interface{}{
"enabled": false,
},
"meshery-app-mesh": map[string]interface{}{
"enabled": false,
},
}
for _, adapter := range installedAdapters {
if _, ok := overrideValues[adapter]; ok {
overrideValues[adapter] = map[string]interface{}{
"enabled": true,
}
}
}
// based on deploy/undeploy action change the status of adapter override
if _, ok := overrideValues[strings.Split(adapter.Location, ":")[0]]; ok {
overrideValues[strings.Split(adapter.Location, ":")[0]] = map[string]interface{}{
"enabled": install,
}
}
return overrideValues
}