Skip to content

Commit c40354b

Browse files
committed
Collect and expose prometheus metrics #42
Collect counter metrics for total number of successful and failed deletes of pods and jobs. Signed-off-by: Sergey Nuzhdin <[email protected]>
1 parent daec120 commit c40354b

31 files changed

+2178
-23
lines changed

cmd/main.go

+40-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"log"
8+
"net/http"
89
"os"
910
"os/signal"
1011
"path/filepath"
@@ -13,6 +14,7 @@ import (
1314
"syscall"
1415
"time"
1516

17+
"github.com/VictoriaMetrics/metrics"
1618
"k8s.io/client-go/kubernetes"
1719
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // TODO: Add all auth providers
1820
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
@@ -36,6 +38,7 @@ func setupLogging() {
3638
func main() {
3739
runOutsideCluster := flag.Bool("run-outside-cluster", false, "Set this flag when running outside of the cluster.")
3840
namespace := flag.String("namespace", "", "Limit scope to a single namespaces")
41+
listenAddr := flag.String("listen-addr", "0.0.0.0:7000", "Address to expose metrics.")
3942

4043
deleteSuccessAfter := flag.Duration("delete-successful-after", 15*time.Minute, "Delete jobs and pods in successful state after X duration (golang duration format, e.g 5m), 0 - never delete")
4144
deleteFailedAfter := flag.Duration("delete-failed-after", 0, "Delete jobs and pods in failed state after X duration (golang duration format, e.g 5m), 0 - never delete")
@@ -92,7 +95,7 @@ func main() {
9295
if err != nil {
9396
log.Fatal(err.Error())
9497
}
95-
ctx, cancel := context.WithCancel(context.Background())
98+
ctx := context.Background()
9699

97100
wg.Add(1)
98101
go func() {
@@ -105,7 +108,8 @@ func main() {
105108
*legacyKeepSuccessHours,
106109
*legacyKeepFailedHours,
107110
*legacyKeepPendingHours,
108-
).Run(stopCh)
111+
stopCh,
112+
).Run()
109113
} else {
110114
controller.NewKleaner(
111115
ctx,
@@ -117,15 +121,46 @@ func main() {
117121
*deletePendingAfter,
118122
*deleteOrphanedAfter,
119123
*deleteEvictedAfter,
120-
).Run(stopCh)
124+
stopCh,
125+
).Run()
121126
}
122127
wg.Done()
123128
}()
124129
log.Printf("Controller started...")
125130

131+
server := http.Server{Addr: *listenAddr}
132+
wg.Add(1)
133+
go func() {
134+
// Expose the registered metrics at `/metrics` path.
135+
http.HandleFunc("/metrics", func(w http.ResponseWriter, req *http.Request) {
136+
metrics.WritePrometheus(w, true)
137+
})
138+
err := server.ListenAndServe()
139+
if err != nil {
140+
log.Fatalf("failed to ListenAndServe metrics server: %v\n", err)
141+
}
142+
wg.Done()
143+
}()
144+
log.Printf("Listening at %s", *listenAddr)
145+
146+
wg.Add(1)
147+
go func() {
148+
for {
149+
select {
150+
case <-stopCh:
151+
log.Println("shutting http server down")
152+
err := server.Shutdown(ctx)
153+
if err != nil {
154+
log.Printf("failed to shutdown metrics server: %v\n", err)
155+
}
156+
wg.Done()
157+
return
158+
}
159+
}
160+
}()
161+
126162
<-sigsCh // Wait for signals (this hangs until a signal arrives)
127-
log.Printf("Shutting down...")
128-
cancel()
163+
log.Printf("got termination signal...")
129164
close(stopCh) // Tell goroutines to stopCh themselves
130165
wg.Wait() // Wait for all to be stopped
131166
}

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/lwolf/kube-cleanup-operator
33
go 1.14
44

55
require (
6+
github.com/VictoriaMetrics/metrics v1.11.3
67
github.com/imdario/mergo v0.3.9 // indirect
78
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
89
golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 // indirect

go.sum

+6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
1414
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
1515
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
1616
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
17+
github.com/VictoriaMetrics/metrics v1.11.3 h1:eSfXc0CrquKa1VTNUvhP+dhNjLUZHQGTFfp19mYCQWE=
18+
github.com/VictoriaMetrics/metrics v1.11.3/go.mod h1:LU2j9qq7xqZYXz8tF3/RQnB2z2MbZms5TDiIg9/NHiQ=
1719
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
1820
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1921
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -103,6 +105,10 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
103105
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
104106
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
105107
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
108+
github.com/valyala/fastrand v1.0.0 h1:LUKT9aKer2dVQNUi3waewTbKV+7H17kvWFNKs2ObdkI=
109+
github.com/valyala/fastrand v1.0.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
110+
github.com/valyala/histogram v1.0.1 h1:FzA7n2Tz/wKRMejgu3PV1vw3htAklTjjuoI6z3d4KDg=
111+
github.com/valyala/histogram v1.0.1/go.mod h1:lQy0xA4wUz2+IUnf97SivorsJIp8FxsnRd6x25q7Mto=
106112
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
107113
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
108114
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=

pkg/controller/controller.go

+40-14
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package controller
22

33
import (
44
"context"
5+
"fmt"
56
"log"
67
"reflect"
78
"time"
89

10+
"github.com/VictoriaMetrics/metrics"
911
batchv1 "k8s.io/api/batch/v1"
1012
corev1 "k8s.io/api/core/v1"
1113
apierrs "k8s.io/apimachinery/pkg/api/errors"
@@ -23,10 +25,20 @@ func ignoreNotFound(err error) error {
2325
return err
2426
}
2527

26-
const resyncPeriod = time.Second * 30
28+
func metricName(name string, namespace string) string {
29+
return fmt.Sprintf(`%s{namespace=%q}`, name, namespace)
30+
}
31+
32+
const (
33+
resyncPeriod = time.Second * 30
34+
podDeletedMetric = "pods_deleted_total"
35+
podDeletedFailedMetric = "pods_deleted_failed_total"
36+
jobDeletedFailedMetric = "jobs_deleted_failed_total"
37+
jobDeletedMetric = "jobs_deleted_total"
38+
)
2739

28-
// Kleaner watches the kubernetes api for changes to Pods and
29-
// delete completed Pods without specific annotation
40+
// Kleaner watches the kubernetes api for changes to Pods and Jobs and
41+
// delete those according to configured timeouts
3042
type Kleaner struct {
3143
podInformer cache.SharedIndexInformer
3244
jobInformer cache.SharedIndexInformer
@@ -40,11 +52,12 @@ type Kleaner struct {
4052

4153
dryRun bool
4254
ctx context.Context
55+
stopCh <-chan struct{}
4356
}
4457

4558
// NewKleaner creates a new NewKleaner
4659
func NewKleaner(ctx context.Context, kclient *kubernetes.Clientset, namespace string, dryRun bool, deleteSuccessfulAfter,
47-
deleteFailedAfter, deletePendingAfter, deleteOrphanedAfter, deleteEvictedAfter time.Duration) *Kleaner {
60+
deleteFailedAfter, deletePendingAfter, deleteOrphanedAfter, deleteEvictedAfter time.Duration, stopCh <-chan struct{}) *Kleaner {
4861
jobInformer := cache.NewSharedIndexInformer(
4962
&cache.ListWatch{
5063
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@@ -76,6 +89,7 @@ func NewKleaner(ctx context.Context, kclient *kubernetes.Clientset, namespace st
7689
dryRun: dryRun,
7790
kclient: kclient,
7891
ctx: ctx,
92+
stopCh: stopCh,
7993
deleteSuccessfulAfter: deleteSuccessfulAfter,
8094
deleteFailedAfter: deleteFailedAfter,
8195
deletePendingAfter: deletePendingAfter,
@@ -104,27 +118,33 @@ func NewKleaner(ctx context.Context, kclient *kubernetes.Clientset, namespace st
104118
}
105119

106120
func (c *Kleaner) periodicCacheCheck() {
121+
ticker := time.NewTicker(2 * resyncPeriod)
107122
for {
108-
for _, job := range c.jobInformer.GetStore().List() {
109-
c.Process(job)
110-
}
111-
for _, obj := range c.podInformer.GetStore().List() {
112-
c.Process(obj)
123+
select {
124+
case <-c.stopCh:
125+
ticker.Stop()
126+
return
127+
case <-ticker.C:
128+
for _, job := range c.jobInformer.GetStore().List() {
129+
c.Process(job)
130+
}
131+
for _, obj := range c.podInformer.GetStore().List() {
132+
c.Process(obj)
133+
}
113134
}
114-
time.Sleep(2 * resyncPeriod)
115135
}
116136
}
117137

118138
// Run starts the process for listening for pod changes and acting upon those changes.
119-
func (c *Kleaner) Run(stopCh <-chan struct{}) {
139+
func (c *Kleaner) Run() {
120140
log.Printf("Listening for changes...")
121141

122-
go c.podInformer.Run(stopCh)
123-
go c.jobInformer.Run(stopCh)
142+
go c.podInformer.Run(c.stopCh)
143+
go c.jobInformer.Run(c.stopCh)
124144

125145
go c.periodicCacheCheck()
126146

127-
<-stopCh
147+
<-c.stopCh
128148
}
129149

130150
func (c *Kleaner) Process(obj interface{}) {
@@ -198,7 +218,10 @@ func (c *Kleaner) deleteJobs(job *batchv1.Job) {
198218
jo := metav1.DeleteOptions{PropagationPolicy: &propagation}
199219
if err := c.kclient.BatchV1().Jobs(job.Namespace).Delete(c.ctx, job.Name, jo); ignoreNotFound(err) != nil {
200220
log.Printf("failed to delete job '%s:%s': %v", job.Namespace, job.Name, err)
221+
metrics.GetOrCreateCounter(metricName(jobDeletedFailedMetric, job.Namespace)).Inc()
222+
return
201223
}
224+
metrics.GetOrCreateCounter(metricName(jobDeletedMetric, job.Namespace)).Inc()
202225
}
203226

204227
func (c *Kleaner) deletePods(pod *corev1.Pod) {
@@ -209,7 +232,10 @@ func (c *Kleaner) deletePods(pod *corev1.Pod) {
209232
var po metav1.DeleteOptions
210233
if err := c.kclient.CoreV1().Pods(pod.Namespace).Delete(c.ctx, pod.Name, po); ignoreNotFound(err) != nil {
211234
log.Printf("failed to delete pod '%s:%s': %v", pod.Namespace, pod.Name, err)
235+
metrics.GetOrCreateCounter(metricName(podDeletedFailedMetric, pod.Namespace)).Inc()
236+
return
212237
}
238+
metrics.GetOrCreateCounter(metricName(podDeletedMetric, pod.Namespace)).Inc()
213239
}
214240

215241
func (c *Kleaner) maybeDeletePod(podPhase corev1.PodPhase, timeSinceFinish time.Duration) bool {

pkg/controller/controller_legacy.go

+18-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"strconv"
1010
"time"
1111

12+
"github.com/VictoriaMetrics/metrics"
1213
corev1 "k8s.io/api/core/v1"
1314
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1415
"k8s.io/apimachinery/pkg/runtime"
@@ -30,6 +31,7 @@ type PodController struct {
3031
dryRun bool
3132
isLegacySystem bool
3233
ctx context.Context
34+
stopCh <-chan struct{}
3335
}
3436

3537
// CreatedByAnnotation type used to match pods created by job
@@ -70,7 +72,7 @@ func isLegacySystem(v version.Info) bool {
7072

7173
// NewPodController creates a new NewPodController
7274
func NewPodController(ctx context.Context, kclient *kubernetes.Clientset, namespace string, dryRun bool, keepSuccessHours,
73-
keepFailedHours, keepPendingHours int64) *PodController {
75+
keepFailedHours, keepPendingHours int64, stopCh <-chan struct{}) *PodController {
7476

7577
serverVersion, err := kclient.ServerVersion()
7678
if err != nil {
@@ -84,6 +86,7 @@ func NewPodController(ctx context.Context, kclient *kubernetes.Clientset, namesp
8486
dryRun: dryRun,
8587
isLegacySystem: isLegacySystem(*serverVersion),
8688
ctx: ctx,
89+
stopCh: stopCh,
8790
}
8891
// Create informer for watching Namespaces
8992
podInformer := cache.NewSharedIndexInformer(
@@ -126,17 +129,22 @@ func (c *PodController) periodicCacheCheck() {
126129
}
127130

128131
// Run starts the process for listening for pod changes and acting upon those changes.
129-
func (c *PodController) Run(stopCh <-chan struct{}) {
132+
func (c *PodController) Run() {
130133
log.Printf("Listening for changes...")
131134

132-
go c.podInformer.Run(stopCh)
135+
go c.podInformer.Run(c.stopCh)
133136
go c.periodicCacheCheck()
134137

135-
<-stopCh
138+
<-c.stopCh
136139
}
137140

138141
func (c *PodController) Process(obj interface{}) {
139142
podObj := obj.(*corev1.Pod)
143+
// skip pods that are already in the deleting process
144+
if !podObj.DeletionTimestamp.IsZero() {
145+
return
146+
}
147+
140148
parentJobName := c.getParentJobName(podObj)
141149
// if we couldn't find a prent job name, ignore this pod
142150
if parentJobName == "" {
@@ -182,6 +190,9 @@ func (c *PodController) deleteObjects(podObj *corev1.Pod, parentJobName string)
182190
var jo metav1.DeleteOptions
183191
if err := c.kclient.BatchV1().Jobs(podObj.Namespace).Delete(c.ctx, parentJobName, jo); ignoreNotFound(err) != nil {
184192
log.Printf("failed to delete job %s: %v", parentJobName, err)
193+
metrics.GetOrCreateCounter(metricName(jobDeletedFailedMetric, podObj.Namespace)).Inc()
194+
} else {
195+
metrics.GetOrCreateCounter(metricName(jobDeletedMetric, podObj.Namespace)).Inc()
185196
}
186197
} else {
187198
log.Printf("dry-run: Job '%s' would have been deleted", parentJobName)
@@ -192,6 +203,9 @@ func (c *PodController) deleteObjects(podObj *corev1.Pod, parentJobName string)
192203
var po metav1.DeleteOptions
193204
if err := c.kclient.CoreV1().Pods(podObj.Namespace).Delete(c.ctx, podObj.Name, po); ignoreNotFound(err) != nil {
194205
log.Printf("failed to delete job's pod %s: %v", parentJobName, err)
206+
metrics.GetOrCreateCounter(metricName(podDeletedFailedMetric, podObj.Namespace)).Inc()
207+
} else {
208+
metrics.GetOrCreateCounter(metricName(podDeletedMetric, podObj.Namespace)).Inc()
195209
}
196210
} else {
197211
log.Printf("dry-run: Pod '%s' would have been deleted", podObj.Name)

vendor/github.com/VictoriaMetrics/metrics/LICENSE

+22
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)