Skip to content

Commit 800c92c

Browse files
committed
prometheusUsageClient
1 parent 59e9554 commit 800c92c

File tree

3 files changed

+241
-19
lines changed

3 files changed

+241
-19
lines changed

Diff for: pkg/framework/plugins/nodeutilization/lownodeutilization_test.go

+55-19
Original file line numberDiff line numberDiff line change
@@ -25,27 +25,28 @@ import (
2525
"testing"
2626
"time"
2727

28-
"sigs.k8s.io/descheduler/pkg/api"
29-
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
30-
frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing"
31-
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
32-
3328
v1 "k8s.io/api/core/v1"
3429
policy "k8s.io/api/policy/v1"
3530
"k8s.io/apimachinery/pkg/api/resource"
3631
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/client-go/informers"
3733
"k8s.io/client-go/kubernetes/fake"
34+
fakeclientset "k8s.io/client-go/kubernetes/fake"
3835
core "k8s.io/client-go/testing"
3936
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
4037
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
38+
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
4139

40+
"sigs.k8s.io/descheduler/pkg/api"
4241
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
4342
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
43+
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
44+
frameworktesting "sigs.k8s.io/descheduler/pkg/framework/testing"
45+
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
4446
"sigs.k8s.io/descheduler/pkg/utils"
4547
"sigs.k8s.io/descheduler/test"
4648

4749
promapi "github.com/prometheus/client_golang/api"
48-
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
4950
"github.com/prometheus/common/config"
5051
"github.com/prometheus/common/model"
5152
)
@@ -1370,7 +1371,7 @@ func TestLowNodeUtilizationWithTaints(t *testing.T) {
13701371
}
13711372
}
13721373

1373-
func TestLowNodeUtilizationWithMetrics(t *testing.T) {
1374+
func TestLowNodeUtilizationWithMetricsReal(t *testing.T) {
13741375
return
13751376
roundTripper := &http.Transport{
13761377
Proxy: http.ProxyFromEnvironment,
@@ -1382,29 +1383,64 @@ func TestLowNodeUtilizationWithMetrics(t *testing.T) {
13821383
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
13831384
}
13841385

1385-
AuthToken := "eyJhbGciOiJSUzI1NiIsImtpZCI6IkNoTW9tT2w2cWtzR2V0dURZdjBqdnBSdmdWM29lWmc3dWpfNW0yaDc2NHMifQ.eyJhdWQiOlsiaHR0cHM6Ly9rdWJlcm5ldGVzLmRlZmF1bHQuc3ZjIl0sImV4cCI6MTcyODk5MjY3NywiaWF0IjoxNzI4OTg5MDc3LCJpc3MiOiJodHRwczovL2t1YmVybmV0ZXMuZGVmYXVsdC5zdmMiLCJqdGkiOiJkNDY3ZjVmMy0xNGVmLTRkMjItOWJkNC1jMGM1Mzk3NzYyZDgiLCJrdWJlcm5ldGVzLmlvIjp7Im5hbWVzcGFjZSI6Im9wZW5zaGlmdC1tb25pdG9yaW5nIiwic2VydmljZWFjY291bnQiOnsibmFtZSI6InByb21ldGhldXMtazhzIiwidWlkIjoiNjY4NDllMGItYTAwZC00NjUzLWE5NTItNThiYTE1MTk4NTlkIn19LCJuYmYiOjE3Mjg5ODkwNzcsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpvcGVuc2hpZnQtbW9uaXRvcmluZzpwcm9tZXRoZXVzLWs4cyJ9.J1i6-oRAC9J8mqrlZPKGA-CU5PbUzhm2QxAWFnu65-NXR3e252mesybwtjkwxUtTLKrsYHQXwEsG5rGcQsvMcGK9RC9y5z33DFj8tPPwOGLJYJ-s5cTImTqKtWRXzTlcrsrUYTYApfrOsEyXwyfDow4PCslZjR3cd5FMRbvXNqHLg26nG_smApR4wc6kXy7xxlRuGhxu-dUiscQP56njboOK61JdTG8F3FgOayZnKk1jGeVdIhXClqGWJyokk-ZM3mMK1MxzGXY0tLbe37V4B7g3NDiH651BUcicfDSky46yfcAYxMDbZgpK2TByWApAllN0wixz2WsFfyBVu_Q5xtZ9Gi9BUHSa5ioRiBK346W4Bdmr9ala5ldIXDa59YE7UB34DsCHyqvzRx_Sj76hLzy2jSOk7RsL0fM8sDoJL4ROdi-3Jtr5uPY593I8H8qeQvFS6PQfm0bUZqVKrrLoCK_uk9guH4a6K27SlD-Utk3dpsjbmrwcjBxm-zd_LE9YyQ734My00Pcy9D5eNio3gESjGsHqGFc_haq4ZCiVOCkbdmABjpPEL6K7bs1GMZbHt1CONL0-LzymM8vgGNj0grjpG8-5AF8ZuSqR7pbZSV_NO2nKkmrwpILCw0Joqp6V3C9pP9nXWHIDyVMxMK870zxzt_qCoPRLCAujQQn6e0U"
1386-
client, err := promapi.NewClient(promapi.Config{
1387-
Address: "https://prometheus-k8s-openshift-monitoring.apps.jchaloup-20241015-3.group-b.devcluster.openshift.com",
1386+
AuthToken := "XXXX"
1387+
promClient, err := promapi.NewClient(promapi.Config{
1388+
Address: "https://prometheus-k8s-openshift-monitoring.apps.jchaloup-20241106.group-b.devcluster.openshift.com",
13881389
RoundTripper: config.NewAuthorizationCredentialsRoundTripper("Bearer", config.NewInlineSecret(AuthToken), roundTripper),
13891390
})
13901391
if err != nil {
13911392
t.Fatalf("prom client error: %v", err)
13921393
}
13931394

1395+
n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil)
1396+
n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil)
1397+
n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil)
1398+
1399+
nodes := []*v1.Node{n1, n2, n3}
1400+
1401+
p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
1402+
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
1403+
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
1404+
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)
1405+
1406+
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)
1407+
1408+
ctx := context.TODO()
1409+
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
1410+
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
1411+
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
1412+
if err != nil {
1413+
t.Fatalf("Build get pods assigned to node function error: %v", err)
1414+
}
1415+
1416+
sharedInformerFactory.Start(ctx.Done())
1417+
sharedInformerFactory.WaitForCacheSync(ctx.Done())
1418+
1419+
prometheusUsageClient := newPrometheusUsageSnapshot(podsAssignedToNode, promClient)
1420+
err = prometheusUsageClient.capture(nodes)
1421+
if err != nil {
1422+
t.Fatalf("unable to capture prometheus metrics: %v", err)
1423+
}
1424+
1425+
for _, node := range nodes {
1426+
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
1427+
fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil)
1428+
}
1429+
13941430
// pod:container_cpu_usage:sum
13951431
// container_memory_usage_bytes
13961432

1397-
v1api := promv1.NewAPI(client)
1398-
ctx := context.TODO()
13991433
// promQuery := "avg_over_time(kube_pod_container_resource_requests[1m])"
1400-
promQuery := "kube_pod_container_resource_requests"
1401-
results, warnings, err := v1api.Query(ctx, promQuery, time.Now())
1402-
fmt.Printf("results: %#v\n", results)
1403-
for _, sample := range results.(model.Vector) {
1404-
fmt.Printf("sample: %#v\n", sample)
1434+
nodeThresholds := NodeThresholds{
1435+
lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{
1436+
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(300), resource.DecimalSI),
1437+
},
1438+
highResourceThreshold: map[v1.ResourceName]*resource.Quantity{
1439+
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(500), resource.DecimalSI),
1440+
},
14051441
}
1406-
fmt.Printf("warnings: %v\n", warnings)
1407-
fmt.Printf("err: %v\n", err)
1442+
1443+
fmt.Printf("nodeThresholds: %#v\n", nodeThresholds)
14081444

14091445
result := model.Value(
14101446
&model.Vector{

Diff for: pkg/framework/plugins/nodeutilization/usageclients.go

+118
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,15 @@ package nodeutilization
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
23+
"net/http"
24+
"net/url"
25+
"time"
2226

27+
promapi "github.com/prometheus/client_golang/api"
28+
promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
29+
"github.com/prometheus/common/model"
2330
v1 "k8s.io/api/core/v1"
2431
"k8s.io/apimachinery/pkg/api/resource"
2532
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -200,3 +207,114 @@ func (client *actualUsageClient) capture(nodes []*v1.Node) error {
200207

201208
return nil
202209
}
210+
211+
type prometheusUsageClient struct {
212+
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
213+
promClient promapi.Client
214+
promQuery string
215+
216+
_nodes []*v1.Node
217+
_pods map[string][]*v1.Pod
218+
_nodeUtilization map[string]map[v1.ResourceName]*resource.Quantity
219+
}
220+
221+
var _ usageClient = &actualUsageClient{}
222+
223+
func newPrometheusUsageSnapshot(
224+
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc,
225+
promClient promapi.Client,
226+
) *prometheusUsageClient {
227+
return &prometheusUsageClient{
228+
getPodsAssignedToNode: getPodsAssignedToNode,
229+
promClient: promClient,
230+
promQuery: "instance:node_cpu:rate:sum",
231+
}
232+
}
233+
234+
func (client *prometheusUsageClient) nodeUtilization(node string) map[v1.ResourceName]*resource.Quantity {
235+
return client._nodeUtilization[node]
236+
}
237+
238+
func (client *prometheusUsageClient) nodes() []*v1.Node {
239+
return client._nodes
240+
}
241+
242+
func (client *prometheusUsageClient) pods(node string) []*v1.Pod {
243+
return client._pods[node]
244+
}
245+
246+
func (client *prometheusUsageClient) podUsage(pod *v1.Pod) (map[v1.ResourceName]*resource.Quantity, error) {
247+
return nil, nil
248+
}
249+
250+
type fakePromClient struct {
251+
result interface{}
252+
}
253+
254+
type fakePayload struct {
255+
Status string `json:"status"`
256+
Data queryResult `json:"data"`
257+
}
258+
259+
type queryResult struct {
260+
Type model.ValueType `json:"resultType"`
261+
Result interface{} `json:"result"`
262+
}
263+
264+
func (client *fakePromClient) URL(ep string, args map[string]string) *url.URL {
265+
return &url.URL{}
266+
}
267+
func (client *fakePromClient) Do(ctx context.Context, request *http.Request) (*http.Response, []byte, error) {
268+
jsonData, err := json.Marshal(fakePayload{
269+
Status: "success",
270+
Data: queryResult{
271+
Type: model.ValVector,
272+
Result: client.result,
273+
},
274+
})
275+
276+
return &http.Response{StatusCode: 200}, jsonData, err
277+
}
278+
279+
func (client *prometheusUsageClient) capture(nodes []*v1.Node) error {
280+
client._nodeUtilization = make(map[string]map[v1.ResourceName]*resource.Quantity)
281+
client._pods = make(map[string][]*v1.Pod)
282+
capturedNodes := []*v1.Node{}
283+
284+
results, warnings, err := promv1.NewAPI(client.promClient).Query(context.TODO(), client.promQuery, time.Now())
285+
if err != nil {
286+
return fmt.Errorf("unable to capture prometheus metrics: %v", err)
287+
}
288+
if len(warnings) > 0 {
289+
klog.Infof("prometheus metrics warnings: %v", warnings)
290+
}
291+
292+
nodeUsages := make(map[string]map[v1.ResourceName]*resource.Quantity)
293+
for _, sample := range results.(model.Vector) {
294+
// fmt.Printf("sample: %#v\n", sample)
295+
nodeName := string(sample.Metric["instance"])
296+
nodeUsages[nodeName] = map[v1.ResourceName]*resource.Quantity{
297+
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(sample.Value*1000), resource.DecimalSI),
298+
}
299+
}
300+
301+
for _, node := range nodes {
302+
if _, exists := nodeUsages[node.Name]; !exists {
303+
return fmt.Errorf("unable to find metric entry for %v", node.Name)
304+
}
305+
pods, err := podutil.ListPodsOnANode(node.Name, client.getPodsAssignedToNode, nil)
306+
if err != nil {
307+
klog.V(2).InfoS("Node will not be processed, error accessing its pods", "node", klog.KObj(node), "err", err)
308+
continue
309+
}
310+
311+
// store the snapshot of pods from the same (or the closest) node utilization computation
312+
client._pods[node.Name] = pods
313+
client._nodeUtilization[node.Name] = nodeUsages[node.Name]
314+
capturedNodes = append(capturedNodes, node)
315+
}
316+
317+
client._nodes = capturedNodes
318+
319+
return nil
320+
}

Diff for: pkg/framework/plugins/nodeutilization/usageclients_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
3333
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
3434
"sigs.k8s.io/descheduler/test"
35+
"github.com/prometheus/common/model"
3536
)
3637

3738
var gvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodemetricses"}
@@ -133,3 +134,70 @@ func TestActualUsageClient(t *testing.T) {
133134
metricsClientset, collector, usageSnapshot, nodes, n2.Name, n2metrics,
134135
)
135136
}
137+
138+
func sample(metricName, nodeName string, value float64) model.Sample {
139+
return model.Sample{
140+
Metric: model.Metric{
141+
"__name__": model.LabelValue(metricName),
142+
"instance": model.LabelValue(nodeName),
143+
},
144+
Value: model.SampleValue(value),
145+
Timestamp: 1728991761711,
146+
}
147+
}
148+
149+
func TestPrometheusUsageClient(t *testing.T) {
150+
n1 := test.BuildTestNode("ip-10-0-17-165.ec2.internal", 2000, 3000, 10, nil)
151+
n2 := test.BuildTestNode("ip-10-0-51-101.ec2.internal", 2000, 3000, 10, nil)
152+
n3 := test.BuildTestNode("ip-10-0-94-25.ec2.internal", 2000, 3000, 10, nil)
153+
154+
nodes := []*v1.Node{n1, n2, n3}
155+
156+
p1 := test.BuildTestPod("p1", 400, 0, n1.Name, nil)
157+
p21 := test.BuildTestPod("p21", 400, 0, n2.Name, nil)
158+
p22 := test.BuildTestPod("p22", 400, 0, n2.Name, nil)
159+
p3 := test.BuildTestPod("p3", 400, 0, n3.Name, nil)
160+
161+
pClient := &fakePromClient{
162+
result: []model.Sample{
163+
sample("instance:node_cpu:rate:sum", "ip-10-0-51-101.ec2.internal", 0.20381818181818104),
164+
sample("instance:node_cpu:rate:sum", "ip-10-0-17-165.ec2.internal", 0.4245454545454522),
165+
sample("instance:node_cpu:rate:sum", "ip-10-0-94-25.ec2.internal", 0.5695757575757561),
166+
},
167+
}
168+
169+
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3, p1, p21, p22, p3)
170+
171+
ctx := context.TODO()
172+
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
173+
podInformer := sharedInformerFactory.Core().V1().Pods().Informer()
174+
podsAssignedToNode, err := podutil.BuildGetPodsAssignedToNodeFunc(podInformer)
175+
if err != nil {
176+
t.Fatalf("Build get pods assigned to node function error: %v", err)
177+
}
178+
179+
sharedInformerFactory.Start(ctx.Done())
180+
sharedInformerFactory.WaitForCacheSync(ctx.Done())
181+
182+
prometheusUsageClient := newPrometheusUsageSnapshot(podsAssignedToNode, pClient)
183+
err = prometheusUsageClient.capture(nodes)
184+
if err != nil {
185+
t.Fatalf("unable to capture prometheus metrics: %v", err)
186+
}
187+
188+
for _, node := range nodes {
189+
nodeUtil := prometheusUsageClient.nodeUtilization(node.Name)
190+
fmt.Printf("nodeUtil[%v]: %v\n", node.Name, nodeUtil)
191+
}
192+
193+
nodeThresholds := NodeThresholds{
194+
lowResourceThreshold: map[v1.ResourceName]*resource.Quantity{
195+
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(300), resource.DecimalSI),
196+
},
197+
highResourceThreshold: map[v1.ResourceName]*resource.Quantity{
198+
v1.ResourceName("MetricResource"): resource.NewQuantity(int64(500), resource.DecimalSI),
199+
},
200+
}
201+
202+
fmt.Printf("nodeThresholds: %#v\n", nodeThresholds)
203+
}

0 commit comments

Comments
 (0)