Skip to content

[Prometheus] Add kuberay_cluster_provisioned_duration_seconds metric #3212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions ray-operator/controllers/ray/metrics/ray_cluster_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,33 @@ import (

type RayClusterMetricCollector struct {
// Metrics
rayClusterProvisionedDuration *prometheus.GaugeVec
}

func NewRayClusterMetricCollector() *RayClusterMetricCollector {
collector := &RayClusterMetricCollector{}
collector := &RayClusterMetricCollector{
rayClusterProvisionedDuration: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "kuberay_cluster_provisioned_duration_seconds",
Help: "The time, in seconds, when a RayCluster's `RayClusterProvisioned` status transitions from false (or unset) to true",
},
[]string{"name", "namespace"},
),
}
return collector
}

// Describe implements prometheus.Collector interface Describe method.
func (c *RayClusterMetricCollector) Describe(_ chan<- *prometheus.Desc) {
func (c *RayClusterMetricCollector) Describe(ch chan<- *prometheus.Desc) {
c.rayClusterProvisionedDuration.Describe(ch)
}

// Collect implements prometheus.Collector interface Collect method.
func (c *RayClusterMetricCollector) Collect(_ chan<- prometheus.Metric) {
func (c *RayClusterMetricCollector) Collect(ch chan<- prometheus.Metric) {
c.rayClusterProvisionedDuration.Collect(ch)
}

// ObserveRayClusterProvisionedDuration observes the duration of RayCluster from creation to provisioned
func (c *RayClusterMetricCollector) ObserveRayClusterProvisionedDuration(name, namespace string, duration float64) {
c.rayClusterProvisionedDuration.WithLabelValues(name, namespace).Set(duration)
}
15 changes: 15 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1608,9 +1608,24 @@ func (r *RayClusterReconciler) updateRayClusterStatus(ctx context.Context, origi
if err != nil {
logger.Info("Error updating status", "name", originalRayClusterInstance.Name, "error", err, "RayCluster", newInstance)
}
collectRayClusterMetrics(r.options.RayClusterMetricCollector, newInstance.Name, newInstance.Namespace, newInstance.Status, originalRayClusterInstance.Status, originalRayClusterInstance.CreationTimestamp.Time)

return inconsistent, err
}

// collectRayClusterMetrics records metrics related to the RayCluster.
func collectRayClusterMetrics(collector *metrics.RayClusterMetricCollector, name, namespace string, newClusterStatus, oldClusterStatus rayv1.RayClusterStatus, creationTimestamp time.Time) {
if collector == nil {
return
}

// Record `kuberay_cluster_provisioned_duration_seconds` metric if just provisioned
if meta.IsStatusConditionTrue(newClusterStatus.Conditions, string(rayv1.RayClusterProvisioned)) &&
!meta.IsStatusConditionTrue(oldClusterStatus.Conditions, string(rayv1.RayClusterProvisioned)) {
collector.ObserveRayClusterProvisionedDuration(name, namespace, time.Since(creationTimestamp).Seconds())
}
}

// sumGPUs sums the GPUs in the given resource list.
func sumGPUs(resources map[corev1.ResourceName]resource.Quantity) resource.Quantity {
totalGPUs := resource.Quantity{}
Expand Down
78 changes: 78 additions & 0 deletions ray-operator/controllers/ray/raycluster_controller_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

. "github.com/onsi/ginkgo/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
batchv1 "k8s.io/api/batch/v1"
Expand All @@ -51,6 +52,7 @@ import (
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/expectations"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/client/clientset/versioned/scheme"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
Expand Down Expand Up @@ -3568,3 +3570,79 @@ func Test_ReconcileManagedBy(t *testing.T) {
})
}
}

func TestCollectRayClusterMetrics(t *testing.T) {
tests := []struct {
creationTime time.Time
name string
clusterName string
clusterNamespace string
newClusterStatus rayv1.RayClusterStatus
oldClusterStatus rayv1.RayClusterStatus
expectMetric bool
}{
{
name: "Transition from not provisioned to provisioned (should emit metric)",
clusterName: "test",
clusterNamespace: "default",
creationTime: time.Now(),
expectMetric: true,
newClusterStatus: rayv1.RayClusterStatus{
Conditions: []metav1.Condition{
{Type: string(rayv1.RayClusterProvisioned), Status: metav1.ConditionTrue},
},
},
oldClusterStatus: rayv1.RayClusterStatus{
Conditions: []metav1.Condition{
{Type: string(rayv1.RayClusterProvisioned), Status: metav1.ConditionFalse},
},
},
},
{
name: "No transition, both provisioned (should not emit metric)",
clusterName: "test",
clusterNamespace: "default",
creationTime: time.Now(),
expectMetric: false,
newClusterStatus: rayv1.RayClusterStatus{
Conditions: []metav1.Condition{
{Type: string(rayv1.RayClusterProvisioned), Status: metav1.ConditionTrue},
},
},
oldClusterStatus: rayv1.RayClusterStatus{
Conditions: []metav1.Condition{
{Type: string(rayv1.RayClusterProvisioned), Status: metav1.ConditionTrue},
},
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
registry := prometheus.NewRegistry()
collector := metrics.NewRayClusterMetricCollector()
require.NoError(t, registry.Register(collector))

collectRayClusterMetrics(collector, tc.clusterName, tc.clusterNamespace, tc.newClusterStatus, tc.oldClusterStatus, tc.creationTime)

metricFamilies, err := registry.Gather()
require.NoError(t, err)
found := false
for _, mf := range metricFamilies {
if mf.GetName() == "kuberay_cluster_provisioned_duration_seconds" {
found = true
if tc.expectMetric {
require.Len(t, mf.Metric, 1)
assert.Equal(t, tc.clusterName, mf.Metric[0].GetLabel()[0].GetValue())
assert.Equal(t, tc.clusterNamespace, mf.Metric[0].GetLabel()[1].GetValue())
} else {
assert.Empty(t, mf.Metric, "should not emit metric")
}
}
}
if tc.expectMetric && !found {
t.Errorf("expected metric 'kuberay_cluster_provisioned_duration_seconds' not found")
}
})
}
}
Loading