@@ -14,19 +14,27 @@ class QueuedTask extends AppModel {
14
14
public $ rateHistory = array ();
15
15
16
16
public $ exit = false ;
17
+
18
+ public $ _findMethods = array (
19
+ 'progress ' => true
20
+ );
17
21
18
22
/**
19
23
* Add a new Job to the Queue
20
24
*
21
25
* @param string $jobName QueueTask name
22
26
* @param array $data any array
27
+ * @param string $group Used to group similar QueuedTasks
28
+ * @param string $reference any array
23
29
* @return bool success
24
30
*/
25
- public function createJob ($ jobName , $ data , $ notBefore = null ) {
31
+ public function createJob ($ jobName , $ data , $ notBefore = null , $ group = null , $ reference = null ) {
26
32
27
33
$ data = array (
28
34
'jobtype ' => $ jobName ,
29
- 'data ' => serialize ($ data )
35
+ 'data ' => serialize ($ data ),
36
+ 'group ' => $ group ,
37
+ 'reference ' => $ reference
30
38
);
31
39
if ($ notBefore != null ) {
32
40
$ data ['notbefore ' ] = date ('Y-m-d H:i:s ' , strtotime ($ notBefore ));
@@ -39,12 +47,14 @@ public function onError() {
39
47
}
40
48
41
49
/**
42
- * Look for a new job that can be processed with the current abilities.
50
+ * Look for a new job that can be processed with the current abilities and
51
+ * from the specified group (or any if null).
43
52
*
44
53
* @param array $capabilities Available QueueWorkerTasks.
54
+ * @param string $group Request a job from this group, (from any group if null)
45
55
* @return Array Taskdata.
46
56
*/
47
- public function requestJob ($ capabilities ) {
57
+ public function requestJob ($ capabilities, $ group = null ) {
48
58
$ idlist = array ();
49
59
$ wasFetched = array ();
50
60
@@ -64,6 +74,11 @@ public function requestJob($capabilities) {
64
74
),
65
75
'limit ' => 3
66
76
);
77
+
78
+ if (!is_null ($ group )) {
79
+ $ findConf ['conditions ' ]['group ' ] = $ group ;
80
+ }
81
+
67
82
// generate the task specific conditions.
68
83
foreach ($ capabilities as $ task ) {
69
84
$ tmp = array (
@@ -141,8 +156,10 @@ public function markJobDone($id) {
141
156
* Mark a job as Failed, Incrementing the failed-counter and Requeueing it.
142
157
*
143
158
* @param integer $id
159
+ * @param string $failureMessage Optional message to append to the
160
+ * failure_message field
144
161
*/
145
- public function markJobFailed ($ id ) {
162
+ public function markJobFailed ($ id, $ failureMessage = null ) {
146
163
$ findConf = array (
147
164
'conditions ' => array (
148
165
'id ' => $ id
@@ -151,6 +168,7 @@ public function markJobFailed($id) {
151
168
$ data = $ this ->find ('first ' , $ findConf );
152
169
if (is_array ($ data )) {
153
170
$ data [$ this ->name ]['failed ' ]++;
171
+ $ data [$ this ->name ]['failure_message ' ] .= $ failureMessage . "\n" ;
154
172
return (is_array ($ this ->save ($ data )));
155
173
}
156
174
return false ;
@@ -222,5 +240,51 @@ public function cleanOldJobs() {
222
240
223
241
}
224
242
243
+ protected function _findProgress ($ state , $ query = array (), $ results = array ()) {
244
+
245
+ if ($ state == 'before ' ) {
246
+
247
+ $ query ['fields ' ] = array (
248
+ 'QueuedTask.reference ' ,
249
+ '(CASE WHEN QueuedTask.notbefore > NOW() THEN \'NOT_READY \' WHEN QueuedTask.fetched IS NULL THEN \'NOT_STARTED \' WHEN QueuedTask.fetched IS NOT NULL AND QueuedTask.completed IS NULL AND QueuedTask.failed = 0 THEN \'IN_PROGRESS \' WHEN QueuedTask.fetched IS NOT NULL AND QueuedTask.completed IS NULL AND QueuedTask.failed > 0 THEN \'FAILED \' WHEN QueuedTask.fetched IS NOT NULL AND QueuedTask.completed IS NOT NULL THEN \'COMPLETED \' ELSE \'UNKNOWN \' END) AS status ' ,
250
+ 'QueuedTask.failure_message '
251
+ );
252
+
253
+ if (isset ($ query ['conditions ' ]['exclude ' ])) {
254
+ $ exclude = $ query ['conditions ' ]['exclude ' ];
255
+ unset($ query ['conditions ' ]['exclude ' ]);
256
+ $ exclude = trim ($ exclude , ', ' );
257
+ $ exclude = explode (', ' , $ exclude );
258
+ $ query ['conditions ' ][] = array (
259
+ 'NOT ' => array (
260
+ 'reference ' => $ exclude
261
+ )
262
+ );
263
+ }
264
+ if (isset ($ query ['conditions ' ]['group ' ])) {
265
+ $ query ['conditions ' ][]['QueuedTask.group ' ] = $ query ['conditions ' ]['group ' ];
266
+ unset($ query ['conditions ' ]['group ' ]);
267
+ }
268
+
269
+ return $ query ;
270
+
271
+ } else {
272
+
273
+ foreach ($ results as $ k => $ result ) {
274
+ $ results [$ k ] = array (
275
+ 'reference ' => $ result [$ this ->alias ]['reference ' ],
276
+ 'status ' => $ result [0 ]['status ' ]
277
+ );
278
+ if (!empty ($ result [$ this ->alias ]['failure_message ' ])) {
279
+ $ results [$ k ]['failure_message ' ] = $ result [$ this ->alias ]['failure_message ' ];
280
+ }
281
+ }
282
+
283
+ return $ results ;
284
+
285
+ }
286
+
287
+ }
288
+
225
289
}
226
290
?>
0 commit comments