Skip to content

Commit cf52579

Browse files
committed
feat(resend): resend the message if something is wrong
1 parent 8053e3c commit cf52579

File tree

2 files changed

+41
-41
lines changed

2 files changed

+41
-41
lines changed

Diff for: handlers/handler.go

+41-40
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ func (handler *SQSHandler) StartServer() error {
4848

4949
glog.Info("Starting a new server ...")
5050

51-
go handler.StartConsumingProcess(handler.QueueURL)
51+
go handler.StartConsumingProcess()
5252
go handler.StartMonitoringProcess()
53-
//go handler.RemoveCompletedJobsProcess()
53+
go handler.RemoveCompletedJobsProcess()
5454

5555
glog.Info("The server is started")
5656

@@ -59,14 +59,14 @@ func (handler *SQSHandler) StartServer() error {
5959
}
6060

6161
// StartConsumingProcess starts consumming the queue
62-
func (handler *SQSHandler) StartConsumingProcess(queueURL string) error {
62+
func (handler *SQSHandler) StartConsumingProcess() error {
6363
newClient, err := NewSQSClient()
6464
if err != nil {
6565
return err
6666
}
6767

6868
receiveParams := &sqs.ReceiveMessageInput{
69-
QueueUrl: aws.String(queueURL),
69+
QueueUrl: aws.String(handler.QueueURL),
7070
MaxNumberOfMessages: aws.Int64(1),
7171
VisibilityTimeout: aws.Int64(30),
7272
WaitTimeSeconds: aws.Int64(20),
@@ -79,49 +79,53 @@ func (handler *SQSHandler) StartConsumingProcess(queueURL string) error {
7979
}
8080

8181
for _, message := range receiveResp.Messages {
82-
// err := handler.HandleSQSMessage(queueURL, message)
83-
// if err != nil {
84-
// glog.Errorf("Can not process the message. Error %s. Message %s", err, *message.Body)
85-
// continue
86-
// }
87-
88-
glog.Info("message:", *message.Body)
89-
90-
glog.Infof("Start proceess meessage %s", message.MessageId)
91-
time.Sleep(5)
92-
93-
handler.RemoveSQSMessage(queueURL, message)
94-
95-
sentMessageInput := &sqs.SendMessageInput{
96-
MessageBody: message.Body,
97-
QueueUrl: aws.String(queueURL),
82+
err := handler.HandleSQSMessage(message)
83+
if err != nil {
84+
glog.Errorf("Can not process the message. Error %s. Message %s", err, *message.Body)
85+
continue
9886
}
99-
newClient.SendMessageRequest(sentMessageInput)
87+
88+
handler.RemoveSQSMessage(message)
10089

10190
}
10291

10392
}
10493
}
10594

10695
// RemoveSQSMessage removes SQS message
107-
func (handler *SQSHandler) RemoveSQSMessage(queueURL string, message *sqs.Message) error {
96+
func (handler *SQSHandler) RemoveSQSMessage(message *sqs.Message) error {
10897
newClient, err := NewSQSClient()
10998
if err != nil {
11099
return err
111100
}
112101
deleteParams := &sqs.DeleteMessageInput{
113-
QueueUrl: aws.String(queueURL), // Required
114-
ReceiptHandle: message.ReceiptHandle, // Required
102+
QueueUrl: aws.String(handler.QueueURL), // Required
103+
ReceiptHandle: message.ReceiptHandle, // Required
115104
}
116105
_, err = newClient.DeleteMessage(deleteParams) // No response returned when successed.
117106
if err != nil {
118107
glog.Error(err)
119108
return err
120109
}
121-
glog.Infof("[Delete message] \nMessage ID: %s has beed deleted.\n\n", *message.MessageId)
110+
glog.Infof("Message ID: %s has beed deleted.\n\n", *message.MessageId)
122111
return nil
123112
}
124113

114+
// ResendSQSMessage resends the message
115+
func (handler *SQSHandler) ResendSQSMessage(queueURL string, message *sqs.Message) error {
116+
newClient, err := NewSQSClient()
117+
if err != nil {
118+
return err
119+
}
120+
sentMessageInput := &sqs.SendMessageInput{
121+
MessageBody: message.Body,
122+
QueueUrl: aws.String(queueURL),
123+
}
124+
_, err = newClient.SendMessage(sentMessageInput)
125+
return err
126+
127+
}
128+
125129
// StartMonitoringProcess starts the process to monitor the created job
126130
func (handler *SQSHandler) StartMonitoringProcess() {
127131
for {
@@ -130,14 +134,12 @@ func (handler *SQSHandler) StartMonitoringProcess() {
130134
for _, jobInfo := range handler.MonitoredJobs {
131135
k8sJob, err := GetJobStatusByID(jobInfo.UID)
132136
if err != nil {
133-
glog.Errorf("Can not get k8s job %s. Detail %s", jobInfo.Name, err)
137+
glog.Errorf("Can not get k8s job %s. Detail %s. Resend the message to the queue", jobInfo.Name, err)
138+
handler.ResendSQSMessage(handler.QueueURL, k8sJob.SQSMessage)
134139
} else {
135140
glog.Infof("%s: %s", k8sJob.Name, k8sJob.Status)
136141
if k8sJob.Status == "Unknown" || k8sJob.Status == "Running" {
137142
nextMonitoredJobs = append(nextMonitoredJobs, jobInfo)
138-
} else if k8sJob.Status == "Completed" {
139-
deleteJobByID(k8sJob.UID, GRACE_PERIOD)
140-
handler.RemoveSQSMessage(k8sJob.QueueURL, k8sJob.SQSMessage)
141143
}
142144
}
143145

@@ -151,13 +153,13 @@ func (handler *SQSHandler) StartMonitoringProcess() {
151153
}
152154

153155
// RemoveCompletedJobsProcess starts the process to remove completed jobs
154-
// func (handler *SQSHandler) RemoveCompletedJobsProcess() {
155-
// for {
156-
// time.Sleep(300 * time.Second)
157-
// glog.Info("Start to remove completed jobs")
158-
// RemoveCompletedJobs()
159-
// }
160-
// }
156+
func (handler *SQSHandler) RemoveCompletedJobsProcess() {
157+
for {
158+
time.Sleep(300 * time.Second)
159+
glog.Info("Start to remove completed jobs")
160+
RemoveCompletedJobs()
161+
}
162+
}
161163

162164
/*
163165
getObjectFromSQSMessage returns s3 object from sqs message
@@ -243,7 +245,7 @@ to the queue and retry later (handled by `md` library). That makes sure
243245
the message is properly handle before it actually deleted
244246
245247
*/
246-
func (handler *SQSHandler) HandleSQSMessage(queueURL string, message *sqs.Message) error {
248+
func (handler *SQSHandler) HandleSQSMessage(message *sqs.Message) error {
247249

248250
jsonBody := *message.Body
249251
objectPaths := getObjectsFromSQSMessage(jsonBody)
@@ -270,7 +272,6 @@ func (handler *SQSHandler) HandleSQSMessage(queueURL string, message *sqs.Messag
270272
for GetNumberRunningJobs() > GetMaxJobConfig() {
271273
time.Sleep(5 * time.Second)
272274
}
273-
glog.Info("Processing: ", objectPath)
274275
jobInfo, err := CreateK8sJob(objectPath, jobConfig)
275276
if err != nil {
276277
glog.Infof("Error :%s", err)
@@ -283,9 +284,8 @@ func (handler *SQSHandler) HandleSQSMessage(queueURL string, message *sqs.Messag
283284
glog.Errorln(err)
284285
return err
285286
}
286-
glog.Info(string(out))
287287
jobInfo.SQSMessage = message
288-
jobInfo.QueueURL = queueURL
288+
glog.Info(string(out))
289289
handler.Mu.Lock()
290290
handler.MonitoredJobs = append(handler.MonitoredJobs, jobInfo)
291291
handler.Mu.Unlock()
@@ -345,5 +345,6 @@ func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error {
345345
"Message" : "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"s3SchemaVersion\":\"1.0\",\"bucket\":{\"name\":\"%s\"},\"object\":{\"key\":\"%s\"}}}]}"}`, retryMessage.Bucket, retryMessage.Key)
346346
sqsMessage := sqs.Message{}
347347
sqsMessage.SetBody(str)
348+
348349
return handler.HandleSQSMessage(&sqsMessage)
349350
}

Diff for: handlers/jobs.go

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ type JobInfo struct {
3030
UID string `json:"uid"`
3131
Name string `json:"name"`
3232
Status string `json:"status"`
33-
QueueURL string `json:"queueURL"`
3433
SQSMessage *sqs.Message
3534
}
3635

0 commit comments

Comments
 (0)