Skip to content

Commit 9d736ee

Browse files
authored
Merge pull request #25 from uc-cdis/feat/job_status
Feat/job status
2 parents f1f17d6 + cb125dc commit 9d736ee

File tree

5 files changed

+217
-11
lines changed

5 files changed

+217
-11
lines changed

README.md

+5
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ JobConfig is an json-base string to register an job. An example might be somethi
3131
}
3232
}
3333
```
34+
## API Documentation
35+
36+
[OpenAPI documentation available here.](http://petstore.swagger.io/?url=https://raw.githubusercontent.com/uc-cdis/ssjdispatcher/master/openapis/openapi.yaml)
37+
38+
YAML file for the OpenAPI documentation is found in the `openapis` folder (in the root directory); see the README in that folder for more details.
3439

3540
## Setup
3641

handlers/handler.go

+22-9
Original file line numberDiff line numberDiff line change
@@ -132,17 +132,21 @@ func (handler *SQSHandler) StartMonitoringProcess() {
132132
var nextMonitoredJobs []*JobInfo
133133

134134
for _, jobInfo := range handler.MonitoredJobs {
135-
k8sJob, err := GetJobStatusByID(jobInfo.UID)
136-
if err != nil {
137-
glog.Errorf("Can not get k8s job %s. Detail %s. Resend the message to the queue", jobInfo.Name, err)
138-
handler.ResendSQSMessage(handler.QueueURL, jobInfo.SQSMessage)
135+
if jobInfo.Status == "Completed" {
136+
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
139137
} else {
140-
glog.Infof("%s: %s", k8sJob.Name, k8sJob.Status)
141-
if k8sJob.Status == "Unknown" || k8sJob.Status == "Running" {
142-
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
138+
k8sJob, err := GetJobStatusByID(jobInfo.UID)
139+
if err != nil {
140+
glog.Errorf("Can not get k8s job %s. Detail %s. Resend the message to the queue", jobInfo.Name, err)
141+
handler.ResendSQSMessage(handler.QueueURL, jobInfo.SQSMessage)
142+
} else {
143+
glog.Infof("%s: %s", k8sJob.Name, k8sJob.Status)
144+
if k8sJob.Status == "Unknown" || k8sJob.Status == "Running" || k8sJob.Status == "Completed" {
145+
jobInfo.Status = k8sJob.Status
146+
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
147+
}
143148
}
144149
}
145-
146150
}
147151
handler.Mu.Lock()
148152
handler.MonitoredJobs = nextMonitoredJobs
@@ -157,7 +161,7 @@ func (handler *SQSHandler) RemoveCompletedJobsProcess() {
157161
for {
158162
time.Sleep(300 * time.Second)
159163
glog.Info("Start to remove completed jobs")
160-
RemoveCompletedJobs()
164+
RemoveCompletedJobs(handler.MonitoredJobs)
161165
}
162166
}
163167

@@ -348,3 +352,12 @@ func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error {
348352

349353
return handler.HandleSQSMessage(&sqsMessage)
350354
}
355+
356+
func (handler *SQSHandler) getJobStatusByCheckingMonitoredJobs(url string) string {
357+
for _, jobInfo := range handler.MonitoredJobs {
358+
if jobInfo.URL == url {
359+
return jobInfo.Status
360+
}
361+
}
362+
return ""
363+
}

handlers/handler_api.go

+25
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
func (handler *SQSHandler) RegisterSQSHandler() {
1111
http.HandleFunc("/jobConfig", handler.HandleJobConfig)
1212
http.HandleFunc("/dispatchJob", handler.HandleDispatchJob)
13+
http.HandleFunc("/indexingJobStatus", handler.GetIndexingJobStatus)
1314
}
1415

1516
// HandleJobConfig handles job config endpoints
@@ -97,3 +98,27 @@ func (handler *SQSHandler) dispatchJob(w http.ResponseWriter, r *http.Request) {
9798
}
9899
fmt.Fprintf(w, "Successfully dispatch a new job!")
99100
}
101+
102+
// GetIndexingJobStatus get indexing job status
103+
func (handler *SQSHandler) GetIndexingJobStatus(w http.ResponseWriter, r *http.Request) {
104+
if r.Method == "GET" {
105+
handler.getIndexingJobStatus(w, r)
106+
}
107+
}
108+
109+
// getIndexingJobStatus get indexing job status
110+
func (handler *SQSHandler) getIndexingJobStatus(w http.ResponseWriter, r *http.Request) {
111+
if r.Method != "GET" {
112+
http.Error(w, "Not supported request method.", 405)
113+
return
114+
}
115+
// get object url
116+
url := r.URL.Query().Get("url")
117+
if url != "" {
118+
status := handler.getJobStatusByCheckingMonitoredJobs(url)
119+
fmt.Fprintf(w, status)
120+
} else {
121+
http.Error(w, "Missing url argument", 300)
122+
return
123+
}
124+
}

handlers/jobs.go

+13-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type JobInfo struct {
3030
UID string `json:"uid"`
3131
Name string `json:"name"`
3232
Status string `json:"status"`
33+
URL string `json:"url"`
3334
SQSMessage *sqs.Message
3435
}
3536

@@ -139,12 +140,21 @@ func jobStatusToString(status *batchv1.JobStatus) string {
139140
}
140141

141142
// RemoveCompletedJobs removes all completed k8s jobs dispatched by the service
142-
func RemoveCompletedJobs() {
143+
func RemoveCompletedJobs(monitoredJobs []*JobInfo) {
143144
jobs := listJobs(getJobClient())
144145
for i := 0; i < len(jobs.JobInfo); i++ {
145146
job := jobs.JobInfo[i]
146147
if job.Status == "Completed" {
147-
deleteJobByID(job.UID, GRACE_PERIOD)
148+
isMonitoredJob := false
149+
for _, jobInfo := range monitoredJobs {
150+
if job.UID == jobInfo.UID {
151+
isMonitoredJob = true
152+
break
153+
}
154+
}
155+
if isMonitoredJob == true {
156+
deleteJobByID(job.UID, GRACE_PERIOD)
157+
}
148158
}
149159
}
150160
}
@@ -287,6 +297,7 @@ func CreateK8sJob(inputURL string, jobConfig JobConfig) (*JobInfo, error) {
287297
ji := JobInfo{}
288298
ji.Name = newJob.Name
289299
ji.UID = string(newJob.GetUID())
300+
ji.URL = inputURL
290301
ji.Status = jobStatusToString(&newJob.Status)
291302
return &ji, nil
292303
}

openapis/openapi.yaml

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
openapi: 3.0.1
2+
info:
3+
title: Ssjdispatcher API
4+
description: job dispatcher
5+
license:
6+
name: Apache 2.0
7+
url: http://www.apache.org/licenses/LICENSE-2.0.html
8+
version: 1.0.0
9+
contact:
10+
name: Giang Bui
11+
12+
url: https://ctds.uchicago.edu
13+
servers:
14+
- url: https://qa-dcp.planx-pla.net/sower/
15+
tags:
16+
- name: ssjdispatcher
17+
description: Operations on job dispatch
18+
security:
19+
- bearerAuth: []
20+
paths:
21+
/dispatchJob:
22+
post:
23+
tags:
24+
- ssjdispatcher
25+
summary: DispatchAJob
26+
operationId: dispatch
27+
parameters:
28+
- in: "query"
29+
name: "body"
30+
description: "Input JSON"
31+
required: true
32+
schema:
33+
type: "object"
34+
properties:
35+
bucket:
36+
type: "string"
37+
key:
38+
type: "string"
39+
responses:
40+
200:
41+
description: job successfully dispatched
42+
content:
43+
application/json:
44+
schema:
45+
$ref: "#/components/schemas/JobInfo"
46+
/indexingJobStatus:
47+
post:
48+
tags:
49+
- ssjdispatcher
50+
summary: get indexing job status
51+
operationId: indexingJobStatus
52+
parameters:
53+
- name: "url"
54+
in: path
55+
description: "S3 object url"
56+
required: true
57+
type: "string"
58+
responses:
59+
200:
60+
description: job status
61+
/status:
62+
get:
63+
tags:
64+
- ssjdispatcher
65+
summary: Get the status of a job
66+
operationId: status
67+
parameters:
68+
- in: "query"
69+
name: "uid"
70+
description: "Job UID"
71+
required: true
72+
schema:
73+
type: "integer"
74+
responses:
75+
200:
76+
description: job status
77+
content:
78+
application/json:
79+
schema:
80+
$ref: "#/components/schemas/JobInfo"
81+
/list:
82+
get:
83+
tags:
84+
- ssjdispatcher
85+
summary: List all running jobs
86+
operationId: list
87+
responses:
88+
200:
89+
description: successful operation
90+
content:
91+
application/json:
92+
schema:
93+
type: array
94+
items:
95+
$ref: "#/components/schemas/JobInfo"
96+
/output:
97+
get:
98+
tags:
99+
- ssjdispatcher
100+
summary: Get the output of the job run
101+
operationId: output
102+
responses:
103+
200:
104+
description: result of the job execution
105+
content:
106+
application/json:
107+
schema:
108+
$ref: "#/components/schemas/Output"
109+
/_status:
110+
get:
111+
tags:
112+
- ssjdispatcher
113+
summary: Get the health status of sower
114+
operationId: _status
115+
responses:
116+
200:
117+
description: successful operation
118+
content:
119+
application/text:
120+
schema:
121+
$ref: "#/components/schemas/Status"
122+
components:
123+
securitySchemes:
124+
bearerAuth: # arbitrary name for the security scheme
125+
type: http
126+
scheme: bearer
127+
bearerFormat: JWT
128+
schemas:
129+
JobInfo:
130+
type: object
131+
properties:
132+
uid:
133+
type: string
134+
description: Unique identifier for the Job
135+
name:
136+
type: string
137+
description: Name of the Job
138+
status:
139+
type: string
140+
description: Running status for the Job
141+
Output:
142+
type: object
143+
properties:
144+
output:
145+
type: string
146+
description: Output from the running Job
147+
Status:
148+
type: string
149+
enum: [Healthy]
150+
description: >
151+
Value:
152+
* `Healthy` - The sower is running

0 commit comments

Comments
 (0)