@@ -17,12 +17,14 @@ import (
17
17
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
18
18
)
19
19
20
+ // MetricSync syncs container and pod metrics to the database
20
21
type MetricSync struct {
21
22
metricsClientset * metricsv.Clientset
22
23
db * database.DB
23
24
logger * logging.Logger
24
25
}
25
26
27
+ // NewMetricSync creates new MetricSync initialized with metricsClientset, database and logger
26
28
func NewMetricSync (metricsClientset * metricsv.Clientset , db * database.DB , logger * logging.Logger ) * MetricSync {
27
29
return & MetricSync {
28
30
metricsClientset : metricsClientset ,
@@ -31,6 +33,31 @@ func NewMetricSync(metricsClientset *metricsv.Clientset, db *database.DB, logger
31
33
}
32
34
}
33
35
36
+ // podMetricUpsertStmt returns database upsert statement to upsert pod metrics
37
+ func (ms * MetricSync ) podMetricUpsertStmt () string {
38
+ return fmt .Sprintf (
39
+ "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s" ,
40
+ "pod_metric" ,
41
+ "reference_id, timestamp, cpu, memory, storage" ,
42
+ ":reference_id, :timestamp, :cpu, :memory, :storage" ,
43
+ "timestamp=VALUES(timestamp), cpu=VALUES(cpu), memory=VALUES(memory), storage=VALUES(storage)" ,
44
+ )
45
+ }
46
+
47
+ // containerMetricUpsertStmt returns database upsert statement to upsert container metrics
48
+ func (ms * MetricSync ) containerMetricUpsertStmt () string {
49
+ return fmt .Sprintf (
50
+ "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s" ,
51
+ "container_metric" ,
52
+ "container_reference_id, pod_reference_id, timestamp, cpu, memory, storage" ,
53
+ ":container_reference_id, :pod_reference_id, :timestamp, :cpu, :memory, :storage" ,
54
+ "timestamp=VALUES(timestamp), cpu=VALUES(cpu), memory=VALUES(memory), storage=VALUES(storage)" ,
55
+ )
56
+ }
57
+
58
+ // Run starts syncing the metrics to the database. Therefore, it gets a list of all pods
59
+ // and the belonging containers together with their metrics from the API every minute.
60
+ // The pod metrics are the container metrics summed up by pod.
34
61
func (ms * MetricSync ) Run (ctx context.Context ) error {
35
62
36
63
ms .logger .Info ("Starting sync" )
@@ -85,23 +112,23 @@ func (ms *MetricSync) Run(ctx context.Context) error {
85
112
select {
86
113
case <- ctx .Done ():
87
114
return ctx .Err ()
88
- case <- time .After (time .Second * 5 ):
89
- //case <-time.After(time.Minute):
115
+ case <- time .After (time .Minute ):
90
116
}
91
117
}
92
118
})
93
119
94
120
g .Go (func () error {
95
- return ms .db .UpsertStreamedWithStatement (ctx , upsertPodMetrics , ms .podMetricUpsertStmt (), 5 )
121
+ return ms .db .UpsertStreamed (ctx , upsertPodMetrics , database . WithStatement ( ms .podMetricUpsertStmt (), 5 ) )
96
122
})
97
123
98
124
g .Go (func () error {
99
- return ms .db .UpsertStreamedWithStatement (ctx , upsertContainerMetrics , ms .containerMetricUpsertStmt (), 6 )
125
+ return ms .db .UpsertStreamed (ctx , upsertContainerMetrics , database . WithStatement ( ms .containerMetricUpsertStmt (), 6 ) )
100
126
})
101
127
102
128
return g .Wait ()
103
129
}
104
130
131
+ // Clean deletes metrics from the database if the belonging pod is deleted
105
132
func (ms * MetricSync ) Clean (ctx context.Context , deleteChannel <- chan contracts.KDelete ) error {
106
133
107
134
g , ctx := errgroup .WithContext (ctx )
@@ -130,42 +157,24 @@ func (ms *MetricSync) Clean(ctx context.Context, deleteChannel <-chan contracts.
130
157
})
131
158
132
159
g .Go (func () error {
133
- return ms .db .DeleteStreamedByField (ctx , & schema.PodMetric {}, "reference_id" , deletesPod )
160
+ return ms .db .DeleteStreamed (ctx , & schema.PodMetric {}, deletesPod , database . ByColumn ( "reference_id" ) )
134
161
})
135
162
136
163
g .Go (func () error {
137
- return ms .db .DeleteStreamedByField (ctx , & schema.ContainerMetric {}, "pod_reference_id" , deletesContainer )
164
+ return ms .db .DeleteStreamed (ctx , & schema.ContainerMetric {}, deletesContainer , database . ByColumn ( "pod_reference_id" ) )
138
165
})
139
166
140
167
return g .Wait ()
141
168
}
142
169
143
- func (ms * MetricSync ) podMetricUpsertStmt () string {
144
- return fmt .Sprintf (
145
- "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s" ,
146
- "pod_metric" ,
147
- "reference_id, timestamp, cpu, memory, storage" ,
148
- ":reference_id, :timestamp, :cpu, :memory, :storage" ,
149
- "timestamp=VALUES(timestamp), cpu=VALUES(cpu), memory=VALUES(memory), storage=VALUES(storage)" ,
150
- )
151
- }
152
-
153
- func (ms * MetricSync ) containerMetricUpsertStmt () string {
154
- return fmt .Sprintf (
155
- "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s" ,
156
- "container_metric" ,
157
- "container_reference_id, pod_reference_id, timestamp, cpu, memory, storage" ,
158
- ":container_reference_id, :pod_reference_id, :timestamp, :cpu, :memory, :storage" ,
159
- "timestamp=VALUES(timestamp), cpu=VALUES(cpu), memory=VALUES(memory), storage=VALUES(storage)" ,
160
- )
161
- }
162
-
170
+ // NodeMetricSync syncs node metrics to the database
163
171
type NodeMetricSync struct {
164
172
metricsClientset * metricsv.Clientset
165
173
db * database.DB
166
174
logger * logging.Logger
167
175
}
168
176
177
+ // NewNodeMetricSync creates new NodeMetricSync initialized with metricsClientset, database and logger
169
178
func NewNodeMetricSync (metricClientset * metricsv.Clientset , db * database.DB , logger * logging.Logger ) * NodeMetricSync {
170
179
return & NodeMetricSync {
171
180
metricsClientset : metricClientset ,
@@ -174,6 +183,19 @@ func NewNodeMetricSync(metricClientset *metricsv.Clientset, db *database.DB, log
174
183
}
175
184
}
176
185
186
+ // nodeMetricUpsertStmt returns database upsert statement to upsert node metrics
187
+ func (nms * NodeMetricSync ) nodeMetricUpsertStmt () string {
188
+ return fmt .Sprintf (
189
+ "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s" ,
190
+ "node_metric" ,
191
+ "node_id, timestamp, cpu, memory, storage" ,
192
+ ":node_id, :timestamp, :cpu, :memory, :storage" ,
193
+ "timestamp=VALUES(timestamp), cpu=VALUES(cpu), memory=VALUES(memory), storage=VALUES(storage)" ,
194
+ )
195
+ }
196
+
197
+ // Run starts syncing the metrics to the database. Therefore, it gets a list of all nodes
198
+ // and the belonging metrics
177
199
func (nms * NodeMetricSync ) Run (ctx context.Context ) error {
178
200
179
201
g , ctx := errgroup .WithContext (ctx )
@@ -207,12 +229,13 @@ func (nms *NodeMetricSync) Run(ctx context.Context) error {
207
229
})
208
230
209
231
g .Go (func () error {
210
- return nms .db .UpsertStreamedWithStatement (ctx , upsertNodeMetrics , nms .nodeMetricUpsertStmt (), 5 )
232
+ return nms .db .UpsertStreamed (ctx , upsertNodeMetrics , database . WithStatement ( nms .nodeMetricUpsertStmt (), 5 ) )
211
233
})
212
234
213
235
return g .Wait ()
214
236
}
215
237
238
+ // Clean deletes metrics from the database if the belonging node is deleted
216
239
func (nms * NodeMetricSync ) Clean (ctx context.Context , deleteChannel <- chan contracts.KDelete ) error {
217
240
218
241
g , ctx := errgroup .WithContext (ctx )
@@ -238,18 +261,8 @@ func (nms *NodeMetricSync) Clean(ctx context.Context, deleteChannel <-chan contr
238
261
})
239
262
240
263
g .Go (func () error {
241
- return nms .db .DeleteStreamedByField (ctx , & schema.NodeMetric {}, "node_id" , deletes )
264
+ return nms .db .DeleteStreamed (ctx , & schema.NodeMetric {}, deletes , database . ByColumn ( "node_id" ) )
242
265
})
243
266
244
267
return g .Wait ()
245
268
}
246
-
247
- func (nms * NodeMetricSync ) nodeMetricUpsertStmt () string {
248
- return fmt .Sprintf (
249
- "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s" ,
250
- "node_metric" ,
251
- "node_id, timestamp, cpu, memory, storage" ,
252
- ":node_id, :timestamp, :cpu, :memory, :storage" ,
253
- "timestamp=VALUES(timestamp), cpu=VALUES(cpu), memory=VALUES(memory), storage=VALUES(storage)" ,
254
- )
255
- }
0 commit comments