-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathJobContext.php
197 lines (173 loc) · 6.71 KB
/
JobContext.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
<?php
namespace mle86\WQ\Job;
use mle86\WQ\WorkProcessor;
use mle86\WQ\WorkServerAdapter\WorkServerAdapter;
/**
* Contains metadata about the job currently being processed
* and various setters to attach one-off event handlers to the job.
*
* Instances of this class are available in job callbacks
* run by {@see WorkProcessor::processNextJob()}
* (their expected signature is `function(Job, JobContext): ?int|void`).
*/
final class JobContext
{
private $queueEntry;
private $workProcessor;
private $failureCallback;
private $temporaryFailureCallback;
private $successCallback;
/**
* @internal Only the {@see WorkProcessor} class should create instances.
*/
public function __construct(QueueEntry $queueEntry, WorkProcessor $workProcessor)
{
$this->queueEntry = $queueEntry;
$this->workProcessor = $workProcessor;
}
// Simple getters: ////////////////////////////////////////////////////////
/**
* @return QueueEntry The queue entry DTO containing the current job instance.
*/
public function getQueueEntry(): QueueEntry
{
return $this->queueEntry;
}
/**
* @return Job The job currently being executed.
*/
public function getJob(): Job
{
return $this->queueEntry->getJob();
}
/**
* @return string The name of the work queue from which the current job was received.
*/
public function getSourceQueue(): string
{
return $this->queueEntry->getWorkQueue();
}
/**
* @return WorkProcessor The WorkProcessor that created this instance.
*/
public function getWorkProcessor(): WorkProcessor
{
return $this->workProcessor;
}
/**
* @return WorkServerAdapter The work server from which the current job was received.
*/
public function getWorkServer(): WorkServerAdapter
{
return $this->workProcessor->getWorkServerAdapter();
}
// Callback accessors: ////////////////////////////////////////////////////
/**
* Sets up a callback that will be called once
* if and when the current job is being re-queued
* because it failed and should be re-tried.
*
* This happens if {@see WorkProcessor::WP_ENABLE_RETRY} is set,
* if {@see Job::jobCanRetry()} is true,
* and if the job handler returned {@see JobResult::FAILED} or threw a {@see \RuntimeException}.
* (Any such exception will be passed to the callback
* in its third argument.)
*
* (This callback will be run by the {@see WorkProcessor}
* after it calls its internal {@see WorkProcessor::onJobRequeue()} hook,
* immediately before calling {@see WorkServerAdapter::requeueEntry()}.)
*
* @param callable|null $callback Expected signature:
* function({@see Job}, {@see JobContext}, ?Throwable): void.
* @return $this
*/
public function onTemporaryFailure(?callable $callback): self
{
$this->temporaryFailureCallback = $callback;
return $this;
}
/**
* Sets up a callback that will be called once
* if and when the current job is being buried/deleted
* because it failed and should not (or cannot) be re-tried later.
*
* This happens if {@see WorkProcessor::WP_ENABLE_RETRY} is not set
* or if {@see Job::jobCanRetry()} returns false
* and if the job handler returned {@see JobResult::ABORT}
* or threw a non-{@see \RuntimeException Runtime} exception.
* (Any such exception will be passed to the callback
* in its third argument.)
*
* (This callback will be run by the {@see WorkProcessor}
* after it calls its internal {@see WorkProcessor::onFailedJob()} hook,
* immediately before calling {@see WorkServerAdapter::buryEntry()}/{@see WorkServerAdapter::deleteEntry() deleteEntry()}.)
*
* @param callable|null $callback Expected signature:
* function({@see Job}, {@see JobContext}, ?Throwable): void.
* @return $this
*/
public function onFailure(?callable $callback): self
{
$this->failureCallback = $callback;
return $this;
}
/**
* Sets up a callback that will be called once
* if and when the current job is being deleted/movied
* because it succeeded!
*
* This happens if the job handler returns {@see JobResult::SUCCESS}/null/void.
*
* (This callback will be run by the {@see WorkProcessor}
* after it calls its internal {@see WorkProcessor::onSuccessfulJob()} hook,
* immediately before calling {@see WorkServerAdapter::deleteEntry()}/{@see WorkServerAdapter::requeueEntry() requeueEntry()}.)
*
* @param callable|null $callback Expected signature:
* function({@see Job}, {@see JobContext}): void.
* @return $this
*/
public function onSuccess(?callable $callback): self
{
$this->successCallback = $callback;
return $this;
}
// Event entry points: ////////////////////////////////////////////////////
/**
* Runs the {@see onTemporaryFailure()} callback.
* @internal Only {@see WorkProcessor::processNextJob()} should call this!
* @param Job $currentJob
* @param JobContext $currentContext
* @param \Throwable|null $cause What caused the temporary failure. Will be null if the failure was caused by {@see JobResult::FAILED}/{@see JobResult::ABORT ABORT}.
*/
public function handleTemporaryFailure(Job $currentJob, JobContext $currentContext, ?\Throwable $cause): void
{
if ($this->temporaryFailureCallback) {
($this->temporaryFailureCallback)($currentJob, $currentContext, $cause);
}
}
/**
* Runs the {@see onFailure()} callback.
* @internal Only {@see WorkProcessor::processNextJob()} should call this!
* @param Job $currentJob
* @param JobContext $currentContext
* @param \Throwable|null $cause What caused the failure. Will be null if the failure was caused by {@see JobResult::FAILED}/{@see JobResult::ABORT ABORT}.
*/
public function handleFailure(Job $currentJob, JobContext $currentContext, ?\Throwable $cause): void
{
if ($this->failureCallback) {
($this->failureCallback)($currentJob, $currentContext, $cause);
}
}
/**
* Runs the {@see onSuccess()} callback.
* @internal Only {@see WorkProcessor::processNextJob()} should call this!
* @param Job $currentJob
* @param JobContext $currentContext
*/
public function handleSuccess(Job $currentJob, JobContext $currentContext): void
{
if ($this->successCallback) {
($this->successCallback)($currentJob, $currentContext);
}
}
}