Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve async() by making its promises cancellable #20

Merged
merged 1 commit into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,29 @@ $promise->then(function (int $bytes) {
});
```

Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and
any nested fibers with their awaited promises will also be cancelled. As such the following example will only output
`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout
exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line
of the example.

```php
$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(sleep(2));
echo 'c';
})());
echo 'd';

return time();
})();

$promise->cancel();
await($promise);
```

### await()

The `await(PromiseInterface $promise): mixed` function can be used to
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"react/promise": "^2.8 || ^1.2.1"
},
"require-dev": {
"phpunit/phpunit": "^9.3"
"phpunit/phpunit": "^9.3",
"react/promise-timer": "^1.8"
},
"autoload": {
"psr-4": {
Expand Down
55 changes: 55 additions & 0 deletions src/FiberMap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

namespace React\Async;

use React\Promise\PromiseInterface;

/**
* @internal
*/
final class FiberMap
{
private static array $status = [];
private static array $map = [];

public static function register(\Fiber $fiber): void
{
self::$status[\spl_object_id($fiber)] = false;
self::$map[\spl_object_id($fiber)] = [];
}

public static function cancel(\Fiber $fiber): void
{
self::$status[\spl_object_id($fiber)] = true;
}

public static function isCancelled(\Fiber $fiber): bool
{
return self::$status[\spl_object_id($fiber)];
}

public static function setPromise(\Fiber $fiber, PromiseInterface $promise): void
{
self::$map[\spl_object_id($fiber)] = $promise;
}

public static function unsetPromise(\Fiber $fiber, PromiseInterface $promise): void
{
unset(self::$map[\spl_object_id($fiber)]);
}

public static function has(\Fiber $fiber): bool
{
return array_key_exists(\spl_object_id($fiber), self::$map);
}

public static function getPromise(\Fiber $fiber): ?PromiseInterface
{
return self::$map[\spl_object_id($fiber)] ?? null;
}

public static function unregister(\Fiber $fiber): void
{
unset(self::$status[\spl_object_id($fiber)], self::$map[\spl_object_id($fiber)]);
}
}
82 changes: 72 additions & 10 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,69 @@
* });
* ```
*
* Promises returned by `async()` can be cancelled, and when done any currently
* and future awaited promise inside that and any nested fibers with their
* awaited promises will also be cancelled. As such the following example will
* only output `ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep)
* between `a` and `b` is cancelled throwing a timeout exception that bubbles up
* through the fibers ultimately to the end user through the [`await()`](#await)
* on the last line of the example.
*
* ```php
* $promise = async(static function (): int {
* echo 'a';
* await(async(static function(): void {
* echo 'b';
* await(sleep(2));
* echo 'c';
* })());
* echo 'd';
*
* return time();
* })();
*
* $promise->cancel();
* await($promise);
* ```
*
* @param callable(mixed ...$args):mixed $function
* @return callable(): PromiseInterface<mixed>
* @since 4.0.0
* @see coroutine()
*/
function async(callable $function): callable
{
return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
return static function (mixed ...$args) use ($function): PromiseInterface {
$fiber = null;
$promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void {
try {
$resolve($function(...$args));
} catch (\Throwable $exception) {
$reject($exception);
} finally {
FiberMap::unregister($fiber);
}
});

FiberMap::register($fiber);

$fiber->start();
}, function () use (&$fiber): void {
FiberMap::cancel($fiber);
$promise = FiberMap::getPromise($fiber);
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
});

$fiber->start();
});
$lowLevelFiber = \Fiber::getCurrent();
if ($lowLevelFiber !== null) {
FiberMap::setPromise($lowLevelFiber, $promise);
}

return $promise;
};
}


Expand Down Expand Up @@ -230,9 +275,18 @@ function await(PromiseInterface $promise): mixed
$rejected = false;
$resolvedValue = null;
$rejectedThrowable = null;
$lowLevelFiber = \Fiber::getCurrent();

if ($lowLevelFiber !== null && FiberMap::isCancelled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}

$promise->then(
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {
function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber, $lowLevelFiber, $promise): void {
if ($lowLevelFiber !== null) {
FiberMap::unsetPromise($lowLevelFiber, $promise);
}

if ($fiber === null) {
$resolved = true;
$resolvedValue = $value;
Expand All @@ -241,7 +295,11 @@ function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void {

$fiber->resume($value);
},
function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void {
function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber, $lowLevelFiber, $promise): void {
if ($lowLevelFiber !== null) {
FiberMap::unsetPromise($lowLevelFiber, $promise);
}

if (!$throwable instanceof \Throwable) {
$throwable = new \UnexpectedValueException(
'Promise rejected with unexpected value of type ' . (is_object($throwable) ? get_class($throwable) : gettype($throwable))
Expand Down Expand Up @@ -285,6 +343,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void
throw $rejectedThrowable;
}

if ($lowLevelFiber !== null) {
FiberMap::setPromise($lowLevelFiber, $promise);
}

$fiber = FiberFactory::create();

return $fiber->suspend();
Expand Down
105 changes: 105 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use function React\Promise\all;
use function React\Promise\reject;
use function React\Promise\resolve;
use function React\Promise\Timer\sleep;

class AsyncTest extends TestCase
{
Expand Down Expand Up @@ -185,4 +186,108 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA
$this->assertGreaterThan(0.1, $time);
$this->assertLessThan(0.12, $time);
}

public function testCancel()
{
self::expectOutputString('a');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(sleep(2));
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelTryCatch()
{
self::expectOutputString('ab');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNestedCancel()
{
self::expectOutputString('abc');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
await(async(static function(): void {
echo 'b';
await(async(static function(): void {
echo 'c';
await(sleep(2));
echo 'd';
})());
echo 'e';
})());
echo 'f';

return time();
})();

$promise->cancel();
await($promise);
}

public function testCancelFiberThatCatchesExceptions()
{
self::expectOutputString('ab');
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Timer cancelled');

$promise = async(static function (): int {
echo 'a';
try {
await(sleep(2));
} catch (\Throwable) {
// No-Op
}
echo 'b';
await(sleep(0.1));
echo 'c';

return time();
})();

$promise->cancel();
await($promise);
}

public function testNotAwaitedPromiseWillNotBeCanceled()
{
self::expectOutputString('acb');

async(static function (): int {
echo 'a';
sleep(0.001)->then(static function (): void {
echo 'b';
});
echo 'c';

return time();
})()->cancel();
Loop::run();
}
}
36 changes: 36 additions & 0 deletions tests/AwaitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,42 @@ public function testNestedAwaits(callable $await)
})));
}

/**
* @dataProvider provideAwaiters
*/
public function testResolvedPromisesShouldBeDetached(callable $await)
{
$await(async(function () use ($await): int {
$fiber = \Fiber::getCurrent();
$await(React\Promise\Timer\sleep(0.01));
$this->assertNull(React\Async\FiberMap::getPromise($fiber));

return time();
})());
}

/**
* @dataProvider provideAwaiters
*/
public function testRejectedPromisesShouldBeDetached(callable $await)
{
$this->expectException(\Exception::class);
$this->expectExceptionMessage('Boom!');

$await(async(function () use ($await): int {
$fiber = \Fiber::getCurrent();
try {
$await(React\Promise\reject(new \Exception('Boom!')));
} catch (\Throwable $throwable) {
throw $throwable;
} finally {
$this->assertNull(React\Async\FiberMap::getPromise($fiber));
}

return time();
})());
}

public function provideAwaiters(): iterable
{
yield 'await' => [static fn (React\Promise\PromiseInterface $promise): mixed => React\Async\await($promise)];
Expand Down