diff --git a/README.md b/README.md index 838b63c..d5f8bb4 100644 --- a/README.md +++ b/README.md @@ -204,6 +204,29 @@ $promise->then(function (int $bytes) { }); ``` +Promises returned by `async()` can be cancelled, and when done any currently and future awaited promise inside that and +any nested fibers with their awaited promises will also be cancelled. As such the following example will only output +`ab` as the [`sleep()`](https://reactphp.org/promise-timer/#sleep) between `a` and `b` is cancelled throwing a timeout +exception that bubbles up through the fibers ultimately to the end user through the [`await()`](#await) on the last line +of the example. + +```php +$promise = async(static function (): int { + echo 'a'; + await(async(static function(): void { + echo 'b'; + await(sleep(2)); + echo 'c'; + })()); + echo 'd'; + + return time(); +})(); + +$promise->cancel(); +await($promise); +``` + ### await() The `await(PromiseInterface $promise): mixed` function can be used to diff --git a/composer.json b/composer.json index d749726..5d93ed4 100644 --- a/composer.json +++ b/composer.json @@ -31,7 +31,8 @@ "react/promise": "^2.8 || ^1.2.1" }, "require-dev": { - "phpunit/phpunit": "^9.3" + "phpunit/phpunit": "^9.3", + "react/promise-timer": "^1.8" }, "autoload": { "psr-4": { diff --git a/src/FiberMap.php b/src/FiberMap.php new file mode 100644 index 0000000..7ee07d1 --- /dev/null +++ b/src/FiberMap.php @@ -0,0 +1,55 @@ +cancel(); + * await($promise); + * ``` + * * @param callable(mixed ...$args):mixed $function * @return callable(): PromiseInterface * @since 4.0.0 @@ -155,17 +180,37 @@ */ function async(callable $function): callable { - return static fn (mixed ...$args): PromiseInterface => new Promise(function (callable $resolve, callable $reject) use ($function, $args): void { - $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args): void { - try { - $resolve($function(...$args)); - } catch (\Throwable $exception) { - $reject($exception); + return static function (mixed ...$args) use ($function): PromiseInterface { + $fiber = null; + $promise = new Promise(function (callable $resolve, callable $reject) use ($function, $args, &$fiber): void { + $fiber = new \Fiber(function () use ($resolve, $reject, $function, $args, &$fiber): void { + try { + $resolve($function(...$args)); + } catch (\Throwable $exception) { + $reject($exception); + } finally { + FiberMap::unregister($fiber); + } + }); + + FiberMap::register($fiber); + + $fiber->start(); + }, function () use (&$fiber): void { + FiberMap::cancel($fiber); + $promise = FiberMap::getPromise($fiber); + if ($promise instanceof CancellablePromiseInterface) { + $promise->cancel(); } }); - $fiber->start(); - }); + $lowLevelFiber = \Fiber::getCurrent(); + if ($lowLevelFiber !== null) { + FiberMap::setPromise($lowLevelFiber, $promise); + } + + return $promise; + }; } @@ -230,9 +275,18 @@ function await(PromiseInterface $promise): mixed $rejected = false; $resolvedValue = null; $rejectedThrowable = null; + $lowLevelFiber = \Fiber::getCurrent(); + + if ($lowLevelFiber !== null && FiberMap::isCancelled($lowLevelFiber) && $promise instanceof CancellablePromiseInterface) { + $promise->cancel(); + } $promise->then( - function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void { + function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber, $lowLevelFiber, $promise): void { + if ($lowLevelFiber !== null) { + FiberMap::unsetPromise($lowLevelFiber, $promise); + } + if ($fiber === null) { $resolved = true; $resolvedValue = $value; @@ -241,7 +295,11 @@ function (mixed $value) use (&$resolved, &$resolvedValue, &$fiber): void { $fiber->resume($value); }, - function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void { + function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber, $lowLevelFiber, $promise): void { + if ($lowLevelFiber !== null) { + FiberMap::unsetPromise($lowLevelFiber, $promise); + } + if (!$throwable instanceof \Throwable) { $throwable = new \UnexpectedValueException( 'Promise rejected with unexpected value of type ' . (is_object($throwable) ? get_class($throwable) : gettype($throwable)) @@ -285,6 +343,10 @@ function (mixed $throwable) use (&$rejected, &$rejectedThrowable, &$fiber): void throw $rejectedThrowable; } + if ($lowLevelFiber !== null) { + FiberMap::setPromise($lowLevelFiber, $promise); + } + $fiber = FiberFactory::create(); return $fiber->suspend(); diff --git a/tests/AsyncTest.php b/tests/AsyncTest.php index a4287fd..0d85302 100644 --- a/tests/AsyncTest.php +++ b/tests/AsyncTest.php @@ -11,6 +11,7 @@ use function React\Promise\all; use function React\Promise\reject; use function React\Promise\resolve; +use function React\Promise\Timer\sleep; class AsyncTest extends TestCase { @@ -185,4 +186,108 @@ public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsA $this->assertGreaterThan(0.1, $time); $this->assertLessThan(0.12, $time); } + + public function testCancel() + { + self::expectOutputString('a'); + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + await(sleep(2)); + echo 'b'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testCancelTryCatch() + { + self::expectOutputString('ab'); + + $promise = async(static function (): int { + echo 'a'; + try { + await(sleep(2)); + } catch (\Throwable) { + // No-Op + } + echo 'b'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testNestedCancel() + { + self::expectOutputString('abc'); + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + await(async(static function(): void { + echo 'b'; + await(async(static function(): void { + echo 'c'; + await(sleep(2)); + echo 'd'; + })()); + echo 'e'; + })()); + echo 'f'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testCancelFiberThatCatchesExceptions() + { + self::expectOutputString('ab'); + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Timer cancelled'); + + $promise = async(static function (): int { + echo 'a'; + try { + await(sleep(2)); + } catch (\Throwable) { + // No-Op + } + echo 'b'; + await(sleep(0.1)); + echo 'c'; + + return time(); + })(); + + $promise->cancel(); + await($promise); + } + + public function testNotAwaitedPromiseWillNotBeCanceled() + { + self::expectOutputString('acb'); + + async(static function (): int { + echo 'a'; + sleep(0.001)->then(static function (): void { + echo 'b'; + }); + echo 'c'; + + return time(); + })()->cancel(); + Loop::run(); + } } diff --git a/tests/AwaitTest.php b/tests/AwaitTest.php index 2dd8159..f2faceb 100644 --- a/tests/AwaitTest.php +++ b/tests/AwaitTest.php @@ -332,6 +332,42 @@ public function testNestedAwaits(callable $await) }))); } + /** + * @dataProvider provideAwaiters + */ + public function testResolvedPromisesShouldBeDetached(callable $await) + { + $await(async(function () use ($await): int { + $fiber = \Fiber::getCurrent(); + $await(React\Promise\Timer\sleep(0.01)); + $this->assertNull(React\Async\FiberMap::getPromise($fiber)); + + return time(); + })()); + } + + /** + * @dataProvider provideAwaiters + */ + public function testRejectedPromisesShouldBeDetached(callable $await) + { + $this->expectException(\Exception::class); + $this->expectExceptionMessage('Boom!'); + + $await(async(function () use ($await): int { + $fiber = \Fiber::getCurrent(); + try { + $await(React\Promise\reject(new \Exception('Boom!'))); + } catch (\Throwable $throwable) { + throw $throwable; + } finally { + $this->assertNull(React\Async\FiberMap::getPromise($fiber)); + } + + return time(); + })()); + } + public function provideAwaiters(): iterable { yield 'await' => [static fn (React\Promise\PromiseInterface $promise): mixed => React\Async\await($promise)];