Skip to content

Commit 2049ec6

Browse files
committed
* The queued_task model can now trigger a queueworker exit.
* queued_task model will trigger exit on Datasource error. closes #6
1 parent ebc3e97 commit 2049ec6

File tree

3 files changed

+34
-23
lines changed

3 files changed

+34
-23
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
1+
.svn
22
*.bak

models/queued_task.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ class QueuedTask extends AppModel {
1313

1414
public $rateHistory = array();
1515

16+
public $exit = false;
17+
1618
/**
1719
* Add a new Job to the Queue
1820
*
@@ -32,6 +34,10 @@ public function createJob($jobName, $data, $notBefore = null) {
3234
return ($this->save($this->create($data)));
3335
}
3436

37+
public function onError() {
38+
$this->exit = true;
39+
}
40+
3541
/**
3642
* Look for a new job that can be processed with the current abilities.
3743
*

vendors/shells/queue.php

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -116,35 +116,40 @@ public function add() {
116116
public function runworker() {
117117
$exit = false;
118118
$starttime = time();
119+
119120
while (!$exit) {
120121
$this->out('Looking for Job....');
121122
$data = $this->QueuedTask->requestJob($this->getTaskConf());
122-
if ($data != false) {
123-
$this->out('Running Job of type "' . $data['jobtype'] . '"');
124-
$taskname = 'queue_' . strtolower($data['jobtype']);
125-
$return = $this->{$taskname}->run(unserialize($data['data']));
126-
if ($return == true) {
127-
$this->QueuedTask->markJobDone($data['id']);
128-
$this->out('Job Finished.');
123+
if ($this->QueuedTask->exit === true) {
124+
$exit = true;
125+
} else {
126+
if ($data !== false) {
127+
$this->out('Running Job of type "' . $data['jobtype'] . '"');
128+
$taskname = 'queue_' . strtolower($data['jobtype']);
129+
$return = $this->{$taskname}->run(unserialize($data['data']));
130+
if ($return == true) {
131+
$this->QueuedTask->markJobDone($data['id']);
132+
$this->out('Job Finished.');
133+
} else {
134+
$this->QueuedTask->markJobFailed($data['id']);
135+
$this->out('Job did not finish, requeued.');
136+
}
129137
} else {
130-
$this->QueuedTask->markJobFailed($data['id']);
131-
$this->out('Job did not finish, requeued.');
138+
$this->out('nothing to do, sleeping.');
139+
sleep(Configure::read('queue.sleeptime'));
132140
}
133-
} else {
134-
$this->out('nothing to do, sleeping.');
135-
sleep(Configure::read('queue.sleeptime'));
136-
}
137141

138-
// check if we are over the maximum runtime and end processing if so.
139-
if (Configure::read('queue.workermaxruntime') != 0 && (time() - $starttime) >= Configure::read('queue.workermaxruntime')) {
140-
$exit = true;
141-
$this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('queue.workermaxruntime') . '), terminating.');
142-
}
143-
if ($exit || rand(0, 100) > (100 - Configure::read('queue.gcprop'))) {
144-
$this->out('Performing Old job cleanup.');
145-
$this->QueuedTask->cleanOldJobs();
142+
// check if we are over the maximum runtime and end processing if so.
143+
if (Configure::read('queue.workermaxruntime') != 0 && (time() - $starttime) >= Configure::read('queue.workermaxruntime')) {
144+
$exit = true;
145+
$this->out('Reached runtime of ' . (time() - $starttime) . ' Seconds (Max ' . Configure::read('queue.workermaxruntime') . '), terminating.');
146+
}
147+
if ($exit || rand(0, 100) > (100 - Configure::read('queue.gcprop'))) {
148+
$this->out('Performing Old job cleanup.');
149+
$this->QueuedTask->cleanOldJobs();
150+
}
151+
$this->hr();
146152
}
147-
$this->hr();
148153
}
149154
}
150155

0 commit comments

Comments
 (0)