Skip to content

Commit 5c1893e

Browse files
authored
Merge pull request #34 from uc-cdis/chore/monitored_jobs
Chore/monitored jobs
2 parents d711728 + 528f35f commit 5c1893e

File tree

4 files changed

+28
-9
lines changed

4 files changed

+28
-9
lines changed

Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM golang:1.14 as build-deps
1+
FROM quay.io/cdis/golang:1.15 as build-deps
22

33
# Build static binary
44
RUN mkdir -p /go/src/github.com/uc-cdis/ssjdispatcher

handlers/configuration.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func GetMaxJobConfig() int {
2929
func GetCleanupTime() int {
3030
cleanupTime, err := strconv.Atoi(os.Getenv("CLEANUP_TIME"))
3131
if err != nil {
32-
cleanupTime = 60
32+
cleanupTime = 120
3333
}
3434
return cleanupTime
3535
}

handlers/handler.go

+22-6
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,26 @@ func (handler *SQSHandler) ResendSQSMessage(queueURL string, message *sqs.Messag
132132
func (handler *SQSHandler) StartMonitoringProcess() {
133133
for {
134134
var nextMonitoredJobs []*JobInfo
135-
135+
handler.Mu.Lock()
136136
for _, jobInfo := range handler.MonitoredJobs {
137137
if jobInfo.Status == "Completed" {
138138
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
139139
} else {
140140
k8sJob, err := GetJobStatusByID(jobInfo.UID)
141141
if err != nil {
142-
glog.Errorf("Can not get k8s job %s. Detail %s. Resend the message to the queue", jobInfo.Name, err)
143-
handler.ResendSQSMessage(handler.QueueURL, jobInfo.SQSMessage)
142+
jobInfo.Status = "Unknown"
144143
} else {
145144
glog.Infof("%s: %s", k8sJob.Name, k8sJob.Status)
146145
if k8sJob.Status == "Unknown" || k8sJob.Status == "Running" || k8sJob.Status == "Completed" {
147146
jobInfo.Status = k8sJob.Status
148-
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
147+
} else if k8sJob.Status == "Failed" {
148+
glog.Errorf("The k8s job %s failed. Detail %s. Resend the message to the queue", jobInfo.Name, err)
149+
handler.ResendSQSMessage(handler.QueueURL, jobInfo.SQSMessage)
149150
}
150151
}
152+
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
151153
}
152154
}
153-
handler.Mu.Lock()
154155
handler.MonitoredJobs = nextMonitoredJobs
155156
handler.Mu.Unlock()
156157

@@ -163,7 +164,22 @@ func (handler *SQSHandler) RemoveCompletedJobsProcess() {
163164
for {
164165
time.Sleep(time.Duration(GetCleanupTime()) * time.Second)
165166
glog.Info("Start to remove completed jobs")
166-
RemoveCompletedJobs(handler.MonitoredJobs)
167+
handler.Mu.Lock()
168+
var tmp []*JobInfo
169+
deletedJobs := RemoveCompletedJobs(handler.MonitoredJobs)
170+
for _, jobInfo := range handler.MonitoredJobs {
171+
var isDeleted = false
172+
for _, jobid := range deletedJobs {
173+
if jobInfo.UID == jobid {
174+
isDeleted = true
175+
}
176+
}
177+
if isDeleted == false {
178+
tmp = append(tmp, jobInfo)
179+
}
180+
}
181+
handler.MonitoredJobs = tmp
182+
handler.Mu.Unlock()
167183
}
168184
}
169185

handlers/jobs.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,9 @@ func jobStatusToString(status *batchv1.JobStatus) string {
140140
}
141141

142142
// RemoveCompletedJobs removes all completed k8s jobs dispatched by the service
143-
func RemoveCompletedJobs(monitoredJobs []*JobInfo) {
143+
func RemoveCompletedJobs(monitoredJobs []*JobInfo) []string {
144144
jobs := listJobs(getJobClient())
145+
var deletedJobs []string
145146
for i := 0; i < len(jobs.JobInfo); i++ {
146147
job := jobs.JobInfo[i]
147148
if job.Status == "Completed" {
@@ -154,9 +155,11 @@ func RemoveCompletedJobs(monitoredJobs []*JobInfo) {
154155
}
155156
if isMonitoredJob == true {
156157
deleteJobByID(job.UID, GRACE_PERIOD)
158+
deletedJobs = append(deletedJobs, job.UID)
157159
}
158160
}
159161
}
162+
return deletedJobs
160163
}
161164

162165
// GetNumberRunningJobs returns number of k8s running jobs dispatched by the service

0 commit comments

Comments
 (0)