Skip to content

Commit 79b7608

Browse files
authored
Merge pull request #20 from uc-cdis/feat/remove_mq
Feat/remove mq
2 parents f6546d0 + 7e74a98 commit 79b7608

File tree

5 files changed

+88
-182
lines changed

5 files changed

+88
-182
lines changed

handlers/handler.go

+82-32
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package handlers
22

33
import (
4-
"context"
54
"encoding/json"
65
"errors"
76
"fmt"
@@ -10,17 +9,14 @@ import (
109
"time"
1110

1211
"github.com/aws/aws-sdk-go/aws"
12+
"github.com/aws/aws-sdk-go/service/sqs"
1313
"github.com/golang/glog"
14-
mq "github.com/remind101/mq-go"
1514
)
1615

17-
const MAX_RETRIES = 3
18-
1916
type SQSHandler struct {
2017
QueueURL string
2118
Start bool
2219
JobConfigs []JobConfig
23-
Server *mq.Server
2420
MonitoredJobs []*JobInfo
2521
Mu sync.Mutex
2622
}
@@ -49,27 +45,84 @@ func NewSQSHandler(queueURL string) *SQSHandler {
4945

5046
// StartServer starts a server
5147
func (handler *SQSHandler) StartServer() error {
52-
// return nil if the server already start
53-
if handler.Server != nil {
54-
return nil
55-
}
5648

49+
glog.Info("Starting a new server ...")
50+
51+
go handler.StartConsumingProcess()
52+
go handler.StartMonitoringProcess()
53+
go handler.RemoveCompletedJobsProcess()
54+
55+
glog.Info("The server is started")
56+
57+
return nil
58+
59+
}
60+
61+
// StartConsumingProcess starts consumming the queue
62+
func (handler *SQSHandler) StartConsumingProcess() error {
5763
newClient, err := NewSQSClient()
5864
if err != nil {
5965
return err
6066
}
6167

62-
glog.Info("Starting a new server...")
63-
handler.Server = mq.NewServer(handler.QueueURL, mq.HandlerFunc(func(m *mq.Message) error {
64-
return handler.HandleSQSMessage(aws.StringValue(m.SQSMessage.Body))
65-
}), mq.WithClient(newClient))
66-
handler.Server.Start()
67-
glog.Info("The server is started")
68+
receiveParams := &sqs.ReceiveMessageInput{
69+
QueueUrl: aws.String(handler.QueueURL),
70+
MaxNumberOfMessages: aws.Int64(1),
71+
VisibilityTimeout: aws.Int64(30),
72+
WaitTimeSeconds: aws.Int64(20),
73+
}
74+
for {
75+
time.Sleep(1 * time.Second)
76+
receiveResp, err := newClient.ReceiveMessage(receiveParams)
77+
if err != nil {
78+
glog.Error(err)
79+
}
6880

69-
go handler.StartMonitoringProcess()
70-
go handler.RemoveCompletedJobsProcess()
81+
for _, message := range receiveResp.Messages {
82+
err := handler.HandleSQSMessage(message)
83+
if err != nil {
84+
glog.Errorf("Can not process the message. Error %s. Message %s", err, *message.Body)
85+
continue
86+
}
87+
88+
handler.RemoveSQSMessage(message)
89+
90+
}
7191

92+
}
93+
}
94+
95+
// RemoveSQSMessage removes SQS message
96+
func (handler *SQSHandler) RemoveSQSMessage(message *sqs.Message) error {
97+
newClient, err := NewSQSClient()
98+
if err != nil {
99+
return err
100+
}
101+
deleteParams := &sqs.DeleteMessageInput{
102+
QueueUrl: aws.String(handler.QueueURL), // Required
103+
ReceiptHandle: message.ReceiptHandle, // Required
104+
}
105+
_, err = newClient.DeleteMessage(deleteParams) // No response returned when successed.
106+
if err != nil {
107+
glog.Error(err)
108+
return err
109+
}
110+
glog.Infof("Message ID: %s has beed deleted.\n\n", *message.MessageId)
72111
return nil
112+
}
113+
114+
// ResendSQSMessage resends the message
115+
func (handler *SQSHandler) ResendSQSMessage(queueURL string, message *sqs.Message) error {
116+
newClient, err := NewSQSClient()
117+
if err != nil {
118+
return err
119+
}
120+
sentMessageInput := &sqs.SendMessageInput{
121+
MessageBody: message.Body,
122+
QueueUrl: aws.String(queueURL),
123+
}
124+
_, err = newClient.SendMessage(sentMessageInput)
125+
return err
73126

74127
}
75128

@@ -81,7 +134,8 @@ func (handler *SQSHandler) StartMonitoringProcess() {
81134
for _, jobInfo := range handler.MonitoredJobs {
82135
k8sJob, err := GetJobStatusByID(jobInfo.UID)
83136
if err != nil {
84-
glog.Errorf("Can not get k8s job %s. Detail %s", jobInfo.Name, err)
137+
glog.Errorf("Can not get k8s job %s. Detail %s. Resend the message to the queue", jobInfo.Name, err)
138+
handler.ResendSQSMessage(handler.QueueURL, jobInfo.SQSMessage)
85139
} else {
86140
glog.Infof("%s: %s", k8sJob.Name, k8sJob.Status)
87141
if k8sJob.Status == "Unknown" || k8sJob.Status == "Running" {
@@ -107,17 +161,6 @@ func (handler *SQSHandler) RemoveCompletedJobsProcess() {
107161
}
108162
}
109163

110-
// ShutdownServer shutdowns a server
111-
func (handler *SQSHandler) ShutdownServer() error {
112-
fmt.Println("Shutdown the server")
113-
if handler.Server == nil {
114-
return nil
115-
}
116-
err := handler.Server.Shutdown(context.Background())
117-
handler.Server = nil
118-
return err
119-
}
120-
121164
/*
122165
getObjectFromSQSMessage returns s3 object from sqs message
123166
@@ -202,8 +245,9 @@ to the queue and retry later (handled by `md` library). That makes sure
202245
the message is properly handle before it actually deleted
203246
204247
*/
205-
func (handler *SQSHandler) HandleSQSMessage(jsonBody string) error {
248+
func (handler *SQSHandler) HandleSQSMessage(message *sqs.Message) error {
206249

250+
jsonBody := *message.Body
207251
objectPaths := getObjectsFromSQSMessage(jsonBody)
208252

209253
jobNameList := make([]string, 0)
@@ -220,21 +264,24 @@ func (handler *SQSHandler) HandleSQSMessage(jsonBody string) error {
220264
}
221265
}
222266
}
267+
glog.Info("message:", jsonBody)
223268

224269
glog.Infof("Start to run %d jobs", len(jobMap))
225270

226271
for objectPath, jobConfig := range jobMap {
227272
for GetNumberRunningJobs() > GetMaxJobConfig() {
228273
time.Sleep(5 * time.Second)
229274
}
230-
glog.Info("Processing: ", objectPath)
231275
jobInfo, err := CreateK8sJob(objectPath, jobConfig)
232276
if err != nil {
277+
glog.Infof("Error :%s", err)
233278
glog.Errorln(err)
234279
return err
235280
}
281+
jobInfo.SQSMessage = message
236282
out, err := json.Marshal(jobInfo)
237283
if err != nil {
284+
glog.Infof("Error :%s", err)
238285
glog.Errorln(err)
239286
return err
240287
}
@@ -296,5 +343,8 @@ func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error {
296343
str := fmt.Sprintf(`{
297344
"Type" : "Notification",
298345
"Message" : "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"s3SchemaVersion\":\"1.0\",\"bucket\":{\"name\":\"%s\"},\"object\":{\"key\":\"%s\"}}}]}"}`, retryMessage.Bucket, retryMessage.Key)
299-
return handler.HandleSQSMessage(str)
346+
sqsMessage := sqs.Message{}
347+
sqsMessage.SetBody(str)
348+
349+
return handler.HandleSQSMessage(&sqsMessage)
300350
}

handlers/handler_api.go

-66
Original file line numberDiff line numberDiff line change
@@ -8,76 +8,10 @@ import (
88

99
// RegisterSQSHandler registers endpoints
1010
func (handler *SQSHandler) RegisterSQSHandler() {
11-
http.HandleFunc("/server", handler.ServiceHandler)
12-
http.HandleFunc("/sqs", handler.SQSHandler)
1311
http.HandleFunc("/jobConfig", handler.HandleJobConfig)
1412
http.HandleFunc("/dispatchJob", handler.HandleDispatchJob)
1513
}
1614

17-
// ServiceHandler handles stop/start/status the SQS querrying service
18-
// To start the server
19-
// curl -X POST http://localhost/server?start=true
20-
// To stop the server
21-
// curl -X POST http://localhost/server?start=false
22-
// To check the status
23-
// curl http://localhost/server
24-
func (handler *SQSHandler) ServiceHandler(w http.ResponseWriter, r *http.Request) {
25-
if r.Method == "PUT" {
26-
val := r.URL.Query().Get("start")
27-
if val == "true" {
28-
if err := handler.StartServer(); err != nil {
29-
fmt.Fprintf(w, fmt.Sprintf("Can not start a server. Detail %s", err))
30-
} else {
31-
fmt.Fprintf(w, "Successfully start a server")
32-
}
33-
} else if val == "false" {
34-
if err := handler.ShutdownServer(); err != nil {
35-
fmt.Fprintf(w, fmt.Sprintf("Can not shutdown the server. Detail %s", err))
36-
} else {
37-
fmt.Fprintf(w, "Successfully shutdown the server")
38-
}
39-
}
40-
41-
} else if r.Method == "GET" {
42-
if handler.Server != nil {
43-
fmt.Fprintf(w, "SQS server is running")
44-
} else {
45-
fmt.Fprintf(w, "SQS server is not running")
46-
}
47-
} else {
48-
http.Error(w, "Not supported request method.", 405)
49-
}
50-
51-
}
52-
53-
// SQSHandler handles update/get SQS URL string
54-
// to update SQS url
55-
// curl -X PUT http://localhost/sqs?url=http://sqs.aws.example
56-
// to get current sqs url
57-
// curl http://localhost/sqs
58-
func (handler *SQSHandler) SQSHandler(w http.ResponseWriter, r *http.Request) {
59-
if r.Method == "PUT" {
60-
handler.QueueURL = r.URL.Query().Get("url")
61-
62-
if err := handler.ShutdownServer(); err != nil {
63-
fmt.Fprintf(w, fmt.Sprintf("Can not shutdown the server. Detail %s", err))
64-
} else {
65-
fmt.Fprintf(w, "Successfully shutdown the server")
66-
}
67-
68-
if err := handler.StartServer(); err != nil {
69-
fmt.Fprintf(w, fmt.Sprintf("Can not start a server. Detail %s", err))
70-
} else {
71-
fmt.Fprintf(w, "Successfully start a server")
72-
}
73-
74-
} else if r.Method == "GET" {
75-
fmt.Fprintf(w, handler.QueueURL)
76-
} else {
77-
http.Error(w, "Not supported request method.", 405)
78-
}
79-
}
80-
8115
// HandleJobConfig handles job config endpoints
8216
// to add a jobConfig
8317
// curl -X POST http://localhost/jobConfig -d `{"name": "usersync", "pattern": "s3://bucket/usersync.yaml", "image": "quay.io/cdis/fence:master", "imageConfig":{}}`

handlers/handler_test.go

-79
This file was deleted.

handlers/jobs.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77

8+
"github.com/aws/aws-sdk-go/service/sqs"
89
"github.com/golang/glog"
910

1011
batchv1 "k8s.io/api/batch/v1"
@@ -26,9 +27,10 @@ type JobsArray struct {
2627
}
2728

2829
type JobInfo struct {
29-
UID string `json:"uid"`
30-
Name string `json:"name"`
31-
Status string `json:"status"`
30+
UID string `json:"uid"`
31+
Name string `json:"name"`
32+
Status string `json:"status"`
33+
SQSMessage *sqs.Message
3234
}
3335

3436
func getJobClient() batchtypev1.JobInterface {

main.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ curl -X POST http://localhost:8000/job -d '{"Name":"TEST3", "Pattern":"s3://test
1414
package main
1515

1616
import (
17-
"context"
1817
"encoding/json"
1918
"flag"
2019
"fmt"
@@ -68,10 +67,10 @@ func main() {
6867

6968
// start an SQSHandler instance
7069
SQSHandler := handlers.NewSQSHandler(sqsURL)
70+
7171
if err := SQSHandler.StartServer(); err != nil {
7272
glog.Errorf("Can not start the server. Detail %s", err)
7373
}
74-
defer SQSHandler.Server.Shutdown(context.Background())
7574

7675
SQSHandler.JobConfigs = jobConfigs
7776

0 commit comments

Comments
 (0)