Skip to content

Commit b9e06b8

Browse files
authored
Merge pull request #10 from uc-cdis/feat/job_status
Feat/job status
2 parents f33b599 + 488247f commit b9e06b8

File tree

4 files changed

+54
-16
lines changed

4 files changed

+54
-16
lines changed

handlers/handler.go

+46-11
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@ import (
66
"errors"
77
"fmt"
88
"regexp"
9+
"sync"
910
"time"
1011

1112
"github.com/aws/aws-sdk-go/aws"
1213
"github.com/golang/glog"
1314
mq "github.com/remind101/mq-go"
1415
)
1516

17+
const MAX_RETRIES = 3
18+
1619
type SQSHandler struct {
17-
QueueURL string
18-
Start bool
19-
JobConfigs []JobConfig
20-
Server *mq.Server
20+
QueueURL string
21+
Start bool
22+
JobConfigs []JobConfig
23+
Server *mq.Server
24+
MonitoredJobs []*JobInfo
25+
Mu sync.Mutex
2126
}
2227

2328
type JobConfig struct {
@@ -44,21 +49,51 @@ func (handler *SQSHandler) StartServer() error {
4449
return nil
4550
}
4651

47-
fmt.Println("Start a new server")
4852
newClient, err := NewSQSClient()
4953
if err != nil {
5054
return err
5155
}
5256

57+
glog.Info("Starting a new server...")
5358
handler.Server = mq.NewServer(handler.QueueURL, mq.HandlerFunc(func(m *mq.Message) error {
5459
return handler.HandleSQSMessage(m)
5560
}), mq.WithClient(newClient))
5661
handler.Server.Start()
62+
glog.Info("The server is started")
63+
64+
go handler.StartMonitoringProcess()
5765

5866
return nil
5967

6068
}
6169

70+
// StartMonitoringProcess starts the process to monitor the created job
71+
func (handler *SQSHandler) StartMonitoringProcess() {
72+
for {
73+
var nextMonitoredJobs []*JobInfo
74+
75+
for _, jobInfo := range handler.MonitoredJobs {
76+
k8sJob, err := GetJobStatusByID(jobInfo.UID)
77+
if err != nil {
78+
glog.Errorf("Can not get k8s job %s. Detail %s", jobInfo.Name, err)
79+
} else {
80+
glog.Infof("%s: %s", k8sJob.Name, k8sJob.Status)
81+
if k8sJob.Status == "Unknown" || k8sJob.Status == "Running" {
82+
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
83+
}
84+
}
85+
86+
}
87+
handler.Mu.Lock()
88+
handler.MonitoredJobs = nextMonitoredJobs
89+
handler.Mu.Unlock()
90+
91+
RemoveCompletedJobs()
92+
93+
time.Sleep(30 * time.Second)
94+
}
95+
}
96+
6297
// ShutdownServer shutdowns a server
6398
func (handler *SQSHandler) ShutdownServer() error {
6499
fmt.Println("Shutdown the server")
@@ -162,9 +197,6 @@ func (handler *SQSHandler) HandleSQSMessage(m *mq.Message) error {
162197
jobNameList = append(jobNameList, jobConfig.Name)
163198
}
164199

165-
// remove completed jobs
166-
RemoveCompletedJobs(jobNameList)
167-
168200
jobMap := make(map[string]JobConfig)
169201
for _, objectPath := range objectPaths {
170202
for _, jobConfig := range handler.JobConfigs {
@@ -178,21 +210,24 @@ func (handler *SQSHandler) HandleSQSMessage(m *mq.Message) error {
178210
glog.Infof("Start to run %d jobs", len(jobMap))
179211

180212
for objectPath, jobConfig := range jobMap {
181-
for GetNumberRunningJobs(jobNameList) > GetMaxJobConfig() {
213+
for GetNumberRunningJobs() > GetMaxJobConfig() {
182214
time.Sleep(5 * time.Second)
183215
}
184216
glog.Info("Processing: ", objectPath)
185-
result, err := CreateK8sJob(objectPath, jobConfig)
217+
jobInfo, err := CreateK8sJob(objectPath, jobConfig)
186218
if err != nil {
187219
glog.Errorln(err)
188220
return err
189221
}
190-
out, err := json.Marshal(result)
222+
out, err := json.Marshal(jobInfo)
191223
if err != nil {
192224
glog.Errorln(err)
193225
return err
194226
}
195227
glog.Info(string(out))
228+
handler.Mu.Lock()
229+
handler.MonitoredJobs = append(handler.MonitoredJobs, jobInfo)
230+
handler.Mu.Unlock()
196231
}
197232

198233
return nil

handlers/jobs.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ func getJobByID(jc batchtypev1.JobInterface, jobid string) (*batchv1.Job, error)
5959
return nil, fmt.Errorf("job with jobid %s not found", jobid)
6060
}
6161

62-
func getJobStatusByID(jobid string) (*JobInfo, error) {
62+
//GetJobStatusByID returns job status given job id
63+
func GetJobStatusByID(jobid string) (*JobInfo, error) {
6364
job, err := getJobByID(getJobClient(), jobid)
6465
if err != nil {
6566
return nil, err
@@ -136,7 +137,7 @@ func jobStatusToString(status *batchv1.JobStatus) string {
136137
}
137138

138139
// RemoveCompletedJobs removes all completed k8s jobs dispatched by the service
139-
func RemoveCompletedJobs(prefixList []string) {
140+
func RemoveCompletedJobs() {
140141
jobs := listJobs(getJobClient())
141142
for i := 0; i < len(jobs.JobInfo); i++ {
142143
job := jobs.JobInfo[i]
@@ -147,7 +148,7 @@ func RemoveCompletedJobs(prefixList []string) {
147148
}
148149

149150
// GetNumberRunningJobs returns number of k8s running jobs dispatched by the service
150-
func GetNumberRunningJobs(prefixList []string) int {
151+
func GetNumberRunningJobs() int {
151152
jobs := listJobs(getJobClient())
152153
nRunningJobs := 0
153154
for i := 0; i < len(jobs.JobInfo); i++ {

handlers/jobs_api.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func status(w http.ResponseWriter, r *http.Request) {
1919
}
2020
UID := r.URL.Query().Get("UID")
2121
if UID != "" {
22-
result, errUID := getJobStatusByID(UID)
22+
result, errUID := GetJobStatusByID(UID)
2323
if errUID != nil {
2424
http.Error(w, errUID.Error(), 500)
2525
return

main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ func main() {
6868

6969
// start an SQSHandler instance
7070
SQSHandler := handlers.NewSQSHandler(sqsURL)
71-
SQSHandler.StartServer()
71+
if err := SQSHandler.StartServer(); err != nil {
72+
glog.Errorf("Can not start the server. Detail %s", err)
73+
}
7274
defer SQSHandler.Server.Shutdown(context.Background())
7375

7476
SQSHandler.JobConfigs = jobConfigs

0 commit comments

Comments
 (0)