Skip to content

Commit 0f06d20

Browse files
authored
Merge pull request #25 from M6Web/feature/throw-from-unwatched-promise
Throwing exceptions from background asynchronous functions
2 parents ce918e2 + 2cbff81 commit 0f06d20

File tree

15 files changed

+302
-122
lines changed

15 files changed

+302
-122
lines changed

src/Adapter/Amp/EventLoop.php

Lines changed: 39 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public function wait(Promise $promise)
2222
switch ($error->getMessage()) {
2323
case 'Loop stopped without resolving the promise':
2424
throw new \Error('Impossible to resolve the promise, no more task to execute.', 0, $error);
25+
case 'Loop exceptionally stopped without resolving the promise':
26+
throw $error->getPrevious() ?? $error;
2527
default:
2628
throw $error;
2729
}
@@ -33,31 +35,48 @@ public function wait(Promise $promise)
3335
*/
3436
public function async(\Generator $generator): Promise
3537
{
36-
$wrapper = function (\Generator $generator): \Generator {
37-
while ($generator->valid()) {
38-
$blockingPromise = Internal\PromiseWrapper::fromGenerator($generator)->getAmpPromise();
39-
40-
// Forwards promise value/exception to underlying generator
41-
$blockingPromiseValue = null;
42-
$blockingPromiseException = null;
43-
try {
44-
$blockingPromiseValue = yield $blockingPromise;
45-
} catch (\Throwable $throwable) {
46-
$blockingPromiseException = $throwable;
47-
}
48-
if ($blockingPromiseException) {
49-
$generator->throw($blockingPromiseException);
50-
} else {
51-
$generator->send($blockingPromiseValue);
38+
$wrapper = function (\Generator $generator, callable $fnSuccess, callable $fnFailure): \Generator {
39+
try {
40+
while ($generator->valid()) {
41+
$blockingPromise = Internal\PromiseWrapper::fromGenerator($generator)->getAmpPromise();
42+
43+
// Forwards promise value/exception to underlying generator
44+
$blockingPromiseValue = null;
45+
$blockingPromiseException = null;
46+
try {
47+
$blockingPromiseValue = yield $blockingPromise;
48+
} catch (\Throwable $throwable) {
49+
$blockingPromiseException = $throwable;
50+
}
51+
if ($blockingPromiseException) {
52+
$generator->throw($blockingPromiseException);
53+
} else {
54+
$generator->send($blockingPromiseValue);
55+
}
5256
}
57+
} catch (\Throwable $throwable) {
58+
$fnFailure($throwable);
5359
}
5460

55-
return $generator->getReturn();
61+
$fnSuccess($generator->getReturn());
5662
};
5763

58-
return new Internal\PromiseWrapper(
59-
new \Amp\Coroutine($wrapper($generator))
60-
);
64+
$deferred = new Internal\Deferred();
65+
new \Amp\Coroutine($wrapper(
66+
$generator,
67+
[$deferred, 'resolve'],
68+
function (\Throwable $throwable) use ($deferred) {
69+
if ($deferred->getPromiseWrapper()->hasBeenYielded()) {
70+
$deferred->reject($throwable);
71+
} else {
72+
\Amp\Loop::defer(function () use ($throwable) {
73+
throw $throwable;
74+
});
75+
}
76+
}
77+
));
78+
79+
return $deferred->getPromise();
6180
}
6281

6382
/**

src/Adapter/Amp/Internal/Deferred.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ public function getPromise(): Promise
3434
return $this->promise;
3535
}
3636

37+
public function getPromiseWrapper(): PromiseWrapper
38+
{
39+
return $this->promise;
40+
}
41+
3742
/**
3843
* {@inheritdoc}
3944
*/

src/Adapter/Amp/Internal/PromiseWrapper.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ class PromiseWrapper implements Promise
1515
*/
1616
private $ampPromise;
1717

18+
private $hasBeenYielded = false;
19+
1820
public function __construct(\Amp\Promise $ampPromise)
1921
{
2022
$this->ampPromise = $ampPromise;
@@ -38,8 +40,15 @@ public static function fromGenerator(\Generator $generator): self
3840
if (!$promise instanceof self) {
3941
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
4042
}
43+
$promise = self::downcast($promise);
44+
$promise->hasBeenYielded = true;
4145

42-
return self::downcast($promise);
46+
return $promise;
47+
}
48+
49+
public function hasBeenYielded(): bool
50+
{
51+
return $this->hasBeenYielded;
4352
}
4453

4554
/**

src/Adapter/ReactPhp/EventLoop.php

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,37 +58,49 @@ function ($result) use (&$value, &$isRejected, &$promiseSettled) {
5858
*/
5959
public function async(\Generator $generator): Promise
6060
{
61-
$fnWrapGenerator = function (\Generator $generator, Deferred $deferred) use (&$fnWrapGenerator) {
61+
$fnWrapGenerator = function (\Generator $generator, callable $fnSuccess, callable $fnFailure) use (&$fnWrapGenerator) {
6262
try {
6363
if (!$generator->valid()) {
64-
return $deferred->resolve($generator->getReturn());
64+
return $fnSuccess($generator->getReturn());
6565
}
6666
Internal\PromiseWrapper::fromGenerator($generator)
6767
->getReactPromise()->then(
68-
function ($result) use ($generator, $deferred, $fnWrapGenerator) {
68+
function ($result) use ($generator, $fnSuccess, $fnFailure, $fnWrapGenerator) {
6969
try {
7070
$generator->send($result);
71-
$fnWrapGenerator($generator, $deferred);
71+
$fnWrapGenerator($generator, $fnSuccess, $fnFailure);
7272
} catch (\Throwable $throwable) {
73-
$deferred->reject($throwable);
73+
$fnFailure($throwable);
7474
}
7575
},
76-
function ($reason) use ($generator, $deferred, $fnWrapGenerator) {
76+
function ($reason) use ($generator, $fnSuccess, $fnFailure, $fnWrapGenerator) {
7777
try {
7878
$generator->throw($reason);
79-
$fnWrapGenerator($generator, $deferred);
79+
$fnWrapGenerator($generator, $fnSuccess, $fnFailure);
8080
} catch (\Throwable $throwable) {
81-
$deferred->reject($throwable);
81+
$fnFailure($throwable);
8282
}
8383
}
8484
);
8585
} catch (\Throwable $throwable) {
86-
$deferred->reject($throwable);
86+
$fnFailure($throwable);
8787
}
8888
};
8989

90-
$deferred = $this->deferred();
91-
$fnWrapGenerator($generator, $deferred);
90+
$deferred = new Internal\Deferred();
91+
$fnWrapGenerator(
92+
$generator,
93+
[$deferred, 'resolve'],
94+
function (\Throwable $throwable) use ($deferred) {
95+
if ($deferred->getPromiseWrapper()->hasBeenYielded()) {
96+
$deferred->reject($throwable);
97+
} else {
98+
$this->reactEventLoop->futureTick(function () use ($throwable) {
99+
throw $throwable;
100+
});
101+
}
102+
}
103+
);
92104

93105
return $deferred->getPromise();
94106
}

src/Adapter/ReactPhp/Internal/Deferred.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@ public function getPromise(): Promise
3434
return $this->promise;
3535
}
3636

37+
public function getPromiseWrapper(): PromiseWrapper
38+
{
39+
return $this->promise;
40+
}
41+
3742
/**
3843
* {@inheritdoc}
3944
*/

src/Adapter/ReactPhp/Internal/PromiseWrapper.php

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ class PromiseWrapper implements Promise
1515
*/
1616
private $reactPromise;
1717

18+
private $hasBeenYielded = false;
19+
1820
public function __construct(\React\Promise\PromiseInterface $reactPromise)
1921
{
2022
$this->reactPromise = $reactPromise;
@@ -39,7 +41,15 @@ public static function fromGenerator(\Generator $generator): self
3941
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
4042
}
4143

42-
return self::downcast($promise);
44+
$promise = self::downcast($promise);
45+
$promise->hasBeenYielded = true;
46+
47+
return $promise;
48+
}
49+
50+
public function hasBeenYielded(): bool
51+
{
52+
return $this->hasBeenYielded;
4353
}
4454

4555
/**

src/Adapter/Tornado/EventLoop.php

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ class EventLoop implements \M6Web\Tornado\EventLoop
1111
* @var Internal\StreamEventLoop
1212
*/
1313
private $streamLoop;
14+
15+
/**
16+
* @var Internal\Task[]
17+
*/
1418
private $tasks = [];
1519

1620
public function __construct()
@@ -41,40 +45,55 @@ function (\Throwable $throwable) use (&$finalAction, &$promiseIsPending) {
4145
return count($this->tasks) !== 0;
4246
};
4347

48+
$fnThrowIfNotNull = function (?\Throwable $throwable) {
49+
if ($throwable !== null) {
50+
throw $throwable;
51+
}
52+
};
53+
54+
$globalException = null;
55+
// Returns a callback to propagate a value to a generator via $function
56+
$fnSafeGeneratorCallback = function (Internal\Task $task, string $function) use (&$globalException) {
57+
return function ($value) use ($task, $function, &$globalException) {
58+
try {
59+
$task->getGenerator()->$function($value);
60+
$this->tasks[] = $task;
61+
} catch (\Throwable $exception) {
62+
if ($task->getPromise()->hasBeenYielded()) {
63+
$task->getPromise()->reject($exception);
64+
} else {
65+
$globalException = $exception;
66+
}
67+
}
68+
};
69+
};
70+
4471
do {
4572
// Copy tasks list to safely allow tasks addition by tasks themselves
4673
$allTasks = $this->tasks;
4774
$this->tasks = [];
4875
foreach ($allTasks as $task) {
4976
try {
50-
if (!$task->generator->valid()) {
51-
$task->promise->resolve($task->generator->getReturn());
77+
if (!$task->getGenerator()->valid()) {
78+
$task->getPromise()->resolve($task->getGenerator()->getReturn());
5279
// This task is finished
5380
continue;
5481
}
5582

56-
$blockingPromise = Internal\PendingPromise::fromGenerator($task->generator);
83+
$blockingPromise = Internal\PendingPromise::fromGenerator($task->getGenerator());
5784
$blockingPromise->addCallbacks(
58-
function ($value) use ($task) {
59-
try {
60-
$task->generator->send($value);
61-
$this->tasks[] = $task;
62-
} catch (\Throwable $exception) {
63-
$task->promise->reject($exception);
64-
}
65-
},
66-
function (\Throwable $throwable) use ($task) {
67-
try {
68-
$task->generator->throw($throwable);
69-
$this->tasks[] = $task;
70-
} catch (\Throwable $exception) {
71-
$task->promise->reject($exception);
72-
}
73-
}
85+
$fnSafeGeneratorCallback($task, 'send'),
86+
$fnSafeGeneratorCallback($task, 'throw')
7487
);
7588
} catch (\Throwable $exception) {
76-
$task->promise->reject($exception);
89+
if ($task->getPromise()->hasBeenYielded()) {
90+
$task->getPromise()->reject($exception);
91+
} else {
92+
throw $exception;
93+
}
7794
}
95+
96+
$fnThrowIfNotNull($globalException);
7897
}
7998
} while ($promiseIsPending && $somethingToDo());
8099

@@ -86,15 +105,9 @@ function (\Throwable $throwable) use ($task) {
86105
*/
87106
public function async(\Generator $generator): Promise
88107
{
89-
$task = new class() {
90-
public $generator;
91-
public $promise;
92-
};
93-
$task->generator = $generator;
94-
$task->promise = new Internal\PendingPromise();
95-
$this->tasks[] = $task;
108+
$this->tasks[] = ($task = new Internal\Task($generator));
96109

97-
return $task->promise;
110+
return $task->getPromise();
98111
}
99112

100113
/**

src/Adapter/Tornado/Internal/PendingPromise.php

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class PendingPromise implements Promise
1414
private $throwable;
1515
private $callbacks = [];
1616
private $isSettled = false;
17+
private $hasBeenYielded = false;
1718

1819
public static function downcast(Promise $promise): self
1920
{
@@ -29,7 +30,15 @@ public static function fromGenerator(\Generator $generator): self
2930
throw new \Error('Asynchronous function is yielding a ['.gettype($promise).'] instead of a Promise.');
3031
}
3132

32-
return self::downcast($promise);
33+
$promise = self::downcast($promise);
34+
$promise->hasBeenYielded = true;
35+
36+
return $promise;
37+
}
38+
39+
public function hasBeenYielded(): bool
40+
{
41+
return $this->hasBeenYielded;
3342
}
3443

3544
public function resolve($value): self

src/Adapter/Tornado/Internal/StreamEventLoop.php

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,23 +40,27 @@ private function internalLoop(EventLoop $eventLoop): \Generator
4040

4141
$read = $this->readStreams;
4242
$write = $this->writeStreams;
43-
stream_select($read, $write, $except, 0);
43+
$nbStreams = @\stream_select($read, $write, $except, 0);
4444

45-
foreach ($read as $stream) {
46-
$streamId = (int) $stream;
47-
$pendingPromise = $this->pendingPromises[$streamId];
48-
unset($this->readStreams[$streamId]);
49-
unset($this->pendingPromises[$streamId]);
50-
$pendingPromise->resolve($stream);
51-
}
45+
if ($nbStreams !== false) {
46+
foreach ($read as $stream) {
47+
$streamId = (int) $stream;
48+
$pendingPromise = $this->pendingPromises[$streamId];
49+
unset($this->readStreams[$streamId]);
50+
unset($this->pendingPromises[$streamId]);
51+
$pendingPromise->resolve($stream);
52+
}
5253

53-
foreach ($write as $stream) {
54-
$streamId = (int) $stream;
55-
$pendingPromise = $this->pendingPromises[$streamId];
56-
unset($this->writeStreams[$streamId]);
57-
unset($this->pendingPromises[$streamId]);
58-
$pendingPromise->resolve($stream);
54+
foreach ($write as $stream) {
55+
$streamId = (int) $stream;
56+
$pendingPromise = $this->pendingPromises[$streamId];
57+
unset($this->writeStreams[$streamId]);
58+
unset($this->pendingPromises[$streamId]);
59+
$pendingPromise->resolve($stream);
60+
}
5961
}
62+
63+
yield $eventLoop->idle();
6064
}
6165
}
6266

0 commit comments

Comments
 (0)