Skip to content

Commit 9ba06d6

Browse files
authored
Merge pull request #15 from uc-cdis/feat/dispatch_job_api
Feat/dispatch job api
2 parents 0729bb4 + b9125b0 commit 9ba06d6

File tree

2 files changed

+52
-5
lines changed

2 files changed

+52
-5
lines changed

handlers/handler.go

+27-5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ type JobConfig struct {
3434
RequestMem string `request_mem`
3535
}
3636

37+
type RetryMessage struct {
38+
Bucket string `bucket`
39+
Key string `key`
40+
}
41+
3742
// NewSQSHandler creates new SQSHandler instance
3843
func NewSQSHandler(queueURL string) *SQSHandler {
3944
sqsHandler := new(SQSHandler)
@@ -56,7 +61,7 @@ func (handler *SQSHandler) StartServer() error {
5661

5762
glog.Info("Starting a new server...")
5863
handler.Server = mq.NewServer(handler.QueueURL, mq.HandlerFunc(func(m *mq.Message) error {
59-
return handler.HandleSQSMessage(m)
64+
return handler.HandleSQSMessage(aws.StringValue(m.SQSMessage.Body))
6065
}), mq.WithClient(newClient))
6166
handler.Server.Start()
6267
glog.Info("The server is started")
@@ -129,10 +134,10 @@ The format of a SQS message body:
129134
}
130135
*/
131136

132-
func getObjectsFromSQSMessage(m *mq.Message) []string {
137+
func getObjectsFromSQSMessage(msgBody string) []string {
133138
objectPaths := make([]string, 0)
134139
mapping := make(map[string][]interface{})
135-
msgBody := aws.StringValue(m.SQSMessage.Body)
140+
//msgBody := aws.StringValue(m.SQSMessage.Body)
136141

137142
msgBodyInf, err := GetValueFromJSON([]byte(msgBody), []string{"Message"})
138143
if err != nil {
@@ -189,8 +194,9 @@ to the queue and retry later (handled by `md` library). That makes sure
189194
the message is properly handle before it actually deleted
190195
191196
*/
192-
func (handler *SQSHandler) HandleSQSMessage(m *mq.Message) error {
193-
objectPaths := getObjectsFromSQSMessage(m)
197+
func (handler *SQSHandler) HandleSQSMessage(jsonBody string) error {
198+
199+
objectPaths := getObjectsFromSQSMessage(jsonBody)
194200

195201
jobNameList := make([]string, 0)
196202
for _, jobConfig := range handler.JobConfigs {
@@ -268,3 +274,19 @@ func (handler *SQSHandler) handleListJobConfigs() (string, error) {
268274
}
269275
return "[" + str + "]", nil
270276
}
277+
278+
/*
279+
RetryCreateIndexingJob creates manually job
280+
*/
281+
func (handler *SQSHandler) RetryCreateIndexingJob(jsonBytes []byte) error {
282+
283+
retryMessage := RetryMessage{}
284+
if err := json.Unmarshal(jsonBytes, &retryMessage); err != nil {
285+
return err
286+
}
287+
288+
str := fmt.Sprintf(`{
289+
"Type" : "Notification",
290+
"Message" : "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"s3SchemaVersion\":\"1.0\",\"bucket\":{\"name\":\"%s\"},\"object\":{\"key\":\"%s\"}}}]}"}`, retryMessage.Bucket, retryMessage.Key)
291+
return handler.HandleSQSMessage(str)
292+
}

handlers/handler_api.go

+25
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ func (handler *SQSHandler) RegisterSQSHandler() {
1111
http.HandleFunc("/server", handler.ServiceHandler)
1212
http.HandleFunc("/sqs", handler.SQSHandler)
1313
http.HandleFunc("/jobConfig", handler.HandleJobConfig)
14+
http.HandleFunc("/dispatchJob", handler.HandleDispatchJob)
1415
}
1516

1617
// ServiceHandler handles stop/start/status the SQS querrying service
@@ -138,3 +139,27 @@ func (handler *SQSHandler) listJobConfigs(w http.ResponseWriter, r *http.Request
138139
}
139140
fmt.Fprintf(w, str)
140141
}
142+
143+
// HandleDispatchJob dispatch an job
144+
func (handler *SQSHandler) HandleDispatchJob(w http.ResponseWriter, r *http.Request) {
145+
if r.Method == "POST" {
146+
handler.dispatchJob(w, r)
147+
}
148+
}
149+
150+
// addJob adds an job config
151+
func (handler *SQSHandler) dispatchJob(w http.ResponseWriter, r *http.Request) {
152+
// Try to read the request body.
153+
body, err := ioutil.ReadAll(r.Body)
154+
if err != nil {
155+
msg := fmt.Sprintf("failed to read request body; encountered error: %s", err)
156+
http.Error(w, msg, http.StatusBadRequest)
157+
return
158+
}
159+
if err = handler.RetryCreateIndexingJob(body); err != nil {
160+
msg := fmt.Sprintf("failed to dispatch an job; encountered error: %s", err)
161+
http.Error(w, msg, http.StatusBadRequest)
162+
return
163+
}
164+
fmt.Fprintf(w, "Successfully dispatch a new job!")
165+
}

0 commit comments

Comments
 (0)