Skip to content

Commit 9215fc2

Browse files
committed
Fix delayed cleanup logic. Fixes #20
1 parent 9ba5494 commit 9215fc2

File tree

3 files changed

+108
-86
lines changed

3 files changed

+108
-86
lines changed

.goreleaser.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ builds:
1313
goarch:
1414
- amd64
1515
- arm64
16+
- arm7
1617
ignore:
1718
- goos: darwin
18-
goarch: arm
19+
goarch: arm7
1920
- goos: darwin
2021
goarch: arm64
2122

cmd/main.go

+16-12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"os"
77
"os/signal"
88
"path/filepath"
9-
"strconv"
109
"sync"
1110
"syscall"
1211

@@ -40,23 +39,28 @@ func main() {
4039
clientset, err := newClientSet(*runOutsideCluster)
4140

4241
if err != nil {
43-
panic(err.Error())
42+
log.Fatal(err.Error())
4443
}
4544

46-
options := map[string]string{
47-
"namespace": *namespace,
48-
"keepSuccessHours": strconv.Itoa(*keepSuccessHours),
49-
"keepFailedHours": strconv.Itoa(*keepFailedHours),
50-
"keepPendingHours": strconv.Itoa(*keepPendingHours),
51-
"dryRun": strconv.FormatBool(*dryRun),
45+
options := map[string]float64{
46+
"keepSuccessHours": float64(*keepSuccessHours),
47+
"keepFailedHours": float64(*keepFailedHours),
48+
"keepPendingHours": float64(*keepPendingHours),
5249
}
5350
if *dryRun {
5451
log.Println("Performing dry run...")
5552
}
56-
log.Printf("Configured namespace: '%s', keepSuccessHours: %d, keepFailedHours: %d", options["namespace"], *keepSuccessHours, *keepFailedHours)
57-
log.Printf("Starting controller...")
58-
59-
go controller.NewPodController(clientset, options).Run(stop, wg)
53+
log.Printf(
54+
"Provided settings: namespace=%s, dryRun=%t, keepSuccessHours: %d, keepFailedHours: %d, keepPendingHours: %d",
55+
*namespace, *dryRun, *keepSuccessHours, *keepFailedHours, *keepPendingHours,
56+
)
57+
58+
go func() {
59+
wg.Add(1)
60+
defer wg.Done()
61+
controller.NewPodController(clientset, *namespace, *dryRun, options).Run(stop)
62+
}()
63+
log.Printf("Controller started...")
6064

6165
<-sigs // Wait for signals (this hangs until a signal arrives)
6266
log.Printf("Shutting down...")

pkg/controller/controller.go

+90-73
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"reflect"
77
"regexp"
88
"strconv"
9-
"sync"
109
"time"
1110

1211
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -18,11 +17,19 @@ import (
1817
"k8s.io/client-go/tools/cache"
1918
)
2019

20+
const resyncPeriod = time.Second * 30
21+
2122
// PodController watches the kubernetes api for changes to Pods and
2223
// delete completed Pods without specific annotation
2324
type PodController struct {
2425
podInformer cache.SharedIndexInformer
2526
kclient *kubernetes.Clientset
27+
28+
keepSuccessHours float64
29+
keepFailedHours float64
30+
keepPendingHours float64
31+
dryRun bool
32+
isLegacySystem bool
2633
}
2734

2835
// CreatedByAnnotation type used to match pods created by job
@@ -39,41 +46,63 @@ type CreatedByAnnotation struct {
3946
}
4047
}
4148

42-
// NewPodController creates a new NewPodController
43-
func NewPodController(kclient *kubernetes.Clientset, opts map[string]string) *PodController {
44-
podWatcher := &PodController{}
49+
func isLegacySystem(v version.Info) bool {
50+
oldVersion := false
51+
52+
major, _ := strconv.Atoi(v.Major)
4553

46-
keepSuccessHours, _ := strconv.Atoi(opts["keepSuccessHours"])
47-
keepFailedHours, _ := strconv.Atoi(opts["keepFailedHours"])
48-
keepPendingHours, _ := strconv.Atoi(opts["keepPendingHours"])
49-
dryRun, _ := strconv.ParseBool(opts["dryRun"])
50-
version, err := kclient.ServerVersion()
54+
var minor int
55+
re := regexp.MustCompile("[0-9]+")
56+
m := re.FindAllString(v.Minor, 1)
57+
if len(m) != 0 {
58+
minor, _ = strconv.Atoi(m[0])
59+
} else {
60+
log.Printf("failed to parse minor version %s", v.Minor)
61+
minor = 0
62+
}
5163

64+
if major < 2 && minor < 8 {
65+
oldVersion = true
66+
}
67+
68+
return oldVersion
69+
}
70+
71+
// NewPodController creates a new NewPodController
72+
func NewPodController(kclient *kubernetes.Clientset, namespace string, dryRun bool, opts map[string]float64) *PodController {
73+
serverVersion, err := kclient.ServerVersion()
5274
if err != nil {
53-
log.Fatalf("Failed to retrieve server version %v", err)
75+
log.Fatalf("Failed to retrieve server serverVersion %v", err)
5476
}
5577

78+
podWatcher := &PodController{
79+
keepSuccessHours: opts["keepSuccessHours"],
80+
keepFailedHours: opts["keepFailedHours"],
81+
keepPendingHours: opts["keepPendingHours"],
82+
dryRun: dryRun,
83+
isLegacySystem: isLegacySystem(*serverVersion),
84+
}
5685
// Create informer for watching Namespaces
5786
podInformer := cache.NewSharedIndexInformer(
5887
&cache.ListWatch{
5988
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
60-
return kclient.CoreV1().Pods(opts["namespace"]).List(options)
89+
return kclient.CoreV1().Pods(namespace).List(options)
6190
},
6291
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
63-
return kclient.CoreV1().Pods(opts["namespace"]).Watch(options)
92+
return kclient.CoreV1().Pods(namespace).Watch(options)
6493
},
6594
},
6695
&v1.Pod{},
67-
time.Second*30,
96+
resyncPeriod,
6897
cache.Indexers{},
6998
)
7099
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
71-
AddFunc: func(cur interface{}) {
72-
podWatcher.doTheMagic(cur, keepSuccessHours, keepFailedHours, keepPendingHours, dryRun, *version)
100+
AddFunc: func(obj interface{}) {
101+
podWatcher.Process(obj)
73102
},
74-
UpdateFunc: func(old, cur interface{}) {
75-
if !reflect.DeepEqual(old, cur) {
76-
podWatcher.doTheMagic(cur, keepSuccessHours, keepFailedHours, keepPendingHours, dryRun, *version)
103+
UpdateFunc: func(old, new interface{}) {
104+
if !reflect.DeepEqual(old, new) {
105+
podWatcher.Process(new)
77106
}
78107
},
79108
})
@@ -84,112 +113,100 @@ func NewPodController(kclient *kubernetes.Clientset, opts map[string]string) *Po
84113
return podWatcher
85114
}
86115

116+
func (c *PodController) periodicCacheCheck() {
117+
for {
118+
for _, obj := range c.podInformer.GetStore().List() {
119+
c.Process(obj)
120+
}
121+
time.Sleep(2 * resyncPeriod)
122+
}
123+
}
124+
87125
// Run starts the process for listening for pod changes and acting upon those changes.
88-
func (c *PodController) Run(stopCh <-chan struct{}, wg *sync.WaitGroup) {
126+
func (c *PodController) Run(stopCh <-chan struct{}) {
89127
log.Printf("Listening for changes...")
90-
// When this function completes, mark the go function as done
91-
defer wg.Done()
92-
93-
// Increment wait group as we're about to execute a go function
94-
wg.Add(1)
95128

96-
// Execute go function
97129
go c.podInformer.Run(stopCh)
130+
go c.periodicCacheCheck()
98131

99-
// Wait till we receive a stop signal
100132
<-stopCh
101133
}
102134

103-
func (c *PodController) doTheMagic(cur interface{}, keepSuccessHours int, keepFailedHours int, keepPendingHours int, dryRun bool, version version.Info) {
104-
105-
podObj := cur.(*v1.Pod)
106-
parentJobName := c.getParentJobName(podObj, version)
135+
func (c *PodController) Process(obj interface{}) {
136+
podObj := obj.(*v1.Pod)
137+
parentJobName := c.getParentJobName(podObj)
107138
// if we couldn't find a prent job name, ignore this pod
108139
if parentJobName == "" {
109-
log.Printf("Pod %s was not created by a job... ignoring", podObj.Name)
110140
return
111141
}
112142

113143
executionTimeHours := c.getExecutionTimeHours(podObj)
114-
log.Printf("Checking pod %s with %s status that was executed %f hours ago", podObj.Name, podObj.Status.Phase, executionTimeHours)
115144
switch podObj.Status.Phase {
116145
case v1.PodSucceeded:
117-
if keepSuccessHours == 0 || (keepSuccessHours > 0 && executionTimeHours > float32(keepSuccessHours)) {
118-
c.deleteObjects(podObj, parentJobName, dryRun)
146+
if c.keepSuccessHours == 0 || (c.keepSuccessHours > 0 && executionTimeHours > c.keepSuccessHours) {
147+
c.deleteObjects(podObj, parentJobName)
119148
}
120149
case v1.PodFailed:
121-
if keepFailedHours == 0 || (keepFailedHours > 0 && executionTimeHours > float32(keepFailedHours)) {
122-
c.deleteObjects(podObj, parentJobName, dryRun)
150+
if c.keepFailedHours == 0 || (c.keepFailedHours > 0 && executionTimeHours > c.keepFailedHours) {
151+
c.deleteObjects(podObj, parentJobName)
123152
}
124153
case v1.PodPending:
125-
if keepPendingHours > 0 && executionTimeHours > float32(keepPendingHours) {
126-
c.deleteObjects(podObj, parentJobName, dryRun)
154+
if c.keepPendingHours > 0 && executionTimeHours > c.keepPendingHours {
155+
c.deleteObjects(podObj, parentJobName)
127156
}
128157
default:
129158
return
130159
}
131160
}
132161

133162
// method to calculate the hours that passed since the pod's execution end time
134-
func (c *PodController) getExecutionTimeHours(podObj *v1.Pod) (executionTimeHours float32) {
135-
executionTimeHours = 0.0
136-
currentUnixTime := time.Now().Unix()
137-
podConditions := podObj.Status.Conditions
138-
var pc v1.PodCondition
139-
for _, pc = range podConditions {
163+
func (c *PodController) getExecutionTimeHours(podObj *v1.Pod) float64 {
164+
currentUnixTime := time.Now()
165+
for _, pc := range podObj.Status.Conditions {
140166
// Looking for the time when pod's condition "Ready" became "false" (equals end of execution)
141167
if pc.Type == v1.PodReady && pc.Status == v1.ConditionFalse {
142-
executionTimeUnix := pc.LastTransitionTime.Unix()
143-
executionTimeHours = (float32(currentUnixTime) - float32(executionTimeUnix)) / float32(3600)
168+
return currentUnixTime.Sub(pc.LastTransitionTime.Time).Hours()
144169
}
145170
}
146171

147-
return
172+
return 0.0
148173
}
149174

150-
func (c *PodController) deleteObjects(podObj *v1.Pod, parentJobName string, dryRun bool) {
175+
func (c *PodController) deleteObjects(podObj *v1.Pod, parentJobName string) {
151176
// Delete Pod
152-
if !dryRun {
177+
if !c.dryRun {
153178
log.Printf("Deleting pod '%s'", podObj.Name)
154179
var po metav1.DeleteOptions
155-
c.kclient.CoreV1().Pods(podObj.Namespace).Delete(podObj.Name, &po)
180+
err := c.kclient.CoreV1().Pods(podObj.Namespace).Delete(podObj.Name, &po)
181+
if err != nil {
182+
log.Printf("failed to delete job %s: %v", parentJobName, err)
183+
}
156184
} else {
157185
log.Printf("dry-run: Pod '%s' would have been deleted", podObj.Name)
158186
}
159187
// Delete Job itself
160-
if !dryRun {
188+
if !c.dryRun {
161189
log.Printf("Deleting job '%s'", parentJobName)
162190
var jo metav1.DeleteOptions
163-
c.kclient.BatchV1Client.Jobs(podObj.Namespace).Delete(parentJobName, &jo)
191+
err := c.kclient.BatchV1Client.Jobs(podObj.Namespace).Delete(parentJobName, &jo)
192+
if err != nil {
193+
log.Printf("failed to delete job %s: %v", parentJobName, err)
194+
}
164195
} else {
165196
log.Printf("dry-run: Job '%s' would have been deleted", parentJobName)
166197
}
167198
return
168199
}
169200

170-
func (c *PodController) getParentJobName(podObj *v1.Pod, version version.Info) (parentJobName string) {
171-
172-
oldVersion := false
173-
174-
major, _ := strconv.Atoi(version.Major)
175-
176-
var minor int
177-
re := regexp.MustCompile("[0-9]+")
178-
m := re.FindAllString(version.Minor, 1)
179-
if len(m) != 0 {
180-
minor, _ = strconv.Atoi(m[0])
181-
} else {
182-
log.Printf("failed to parse minor version %s", version.Minor)
183-
minor = 0
184-
}
185-
186-
if major < 2 && minor < 8 {
187-
oldVersion = true
188-
}
201+
func (c *PodController) getParentJobName(podObj *v1.Pod) (parentJobName string) {
189202

190-
if oldVersion {
203+
if c.isLegacySystem {
191204
var createdMeta CreatedByAnnotation
192-
json.Unmarshal([]byte(podObj.ObjectMeta.Annotations["kubernetes.io/created-by"]), &createdMeta)
205+
err := json.Unmarshal([]byte(podObj.ObjectMeta.Annotations["kubernetes.io/created-by"]), &createdMeta)
206+
if err != nil {
207+
log.Printf("failed to unmarshal annotations for pod %s. %v", podObj.Name, err)
208+
return
209+
}
193210
if createdMeta.Reference.Kind == "Job" {
194211
parentJobName = createdMeta.Reference.Name
195212
}

0 commit comments

Comments
 (0)