Skip to content

Commit

Permalink
Add Fiber-based await() function
Browse files Browse the repository at this point in the history
  • Loading branch information
clue authored and WyriHaximus committed Nov 22, 2021
1 parent 249f9f6 commit 6e08372
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 60 deletions.
10 changes: 2 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,8 @@ $result = React\Async\await($promise);
```

This function will only return after the given `$promise` has settled, i.e.
either fulfilled or rejected.

While the promise is pending, this function will assume control over the event
loop. Internally, it will `run()` the [default loop](https://github.com/reactphp/event-loop#loop)
until the promise settles and then calls `stop()` to terminate execution of the
loop. This means this function is more suited for short-lived promise executions
when using promise-based APIs is not feasible. For long-running applications,
using promise-based APIs by leveraging chained `then()` calls is usually preferable.
either fulfilled or rejected. While the promise is pending, this function will
suspend the fiber it's called from until the promise is settled.

Once the promise is fulfilled, this function will return whatever the promise
resolved to.
Expand Down
3 changes: 3 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
"phpunit/phpunit": "^9.3"
},
"autoload": {
"psr-4": {
"React\\Async\\": "src/"
},
"files": [
"src/functions_include.php"
]
Expand Down
64 changes: 64 additions & 0 deletions src/SimpleFiber.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

namespace React\Async;

use React\EventLoop\Loop;

/**
* @internal
*/
final class SimpleFiber
{
private static ?\Fiber $scheduler = null;
private ?\Fiber $fiber = null;

public function __construct()
{
$this->fiber = \Fiber::getCurrent();
}

public function resume(mixed $value): void
{
if ($this->fiber === null) {
Loop::futureTick(static fn() => \Fiber::suspend(static fn() => $value));
return;
}

Loop::futureTick(fn() => $this->fiber->resume($value));
}

public function throw(mixed $throwable): void
{
if (!$throwable instanceof \Throwable) {
$throwable = new \UnexpectedValueException(
'Promise rejected with unexpected value of type ' . (is_object($throwable) ? get_class($throwable) : gettype($throwable))
);
}

if ($this->fiber === null) {
Loop::futureTick(static fn() => \Fiber::suspend(static fn() => throw $throwable));
return;
}

Loop::futureTick(fn() => $this->fiber->throw($throwable));
}

public function suspend(): mixed
{
if ($this->fiber === null) {
if (self::$scheduler === null || self::$scheduler->isTerminated()) {
self::$scheduler = new \Fiber(static fn() => Loop::run());
// Run event loop to completion on shutdown.
\register_shutdown_function(static function (): void {
if (self::$scheduler->isSuspended()) {
self::$scheduler->resume();
}
});
}

return (self::$scheduler->isStarted() ? self::$scheduler->resume() : self::$scheduler->start())();
}

return \Fiber::suspend();
}
}
66 changes: 32 additions & 34 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,36 @@
use React\EventLoop\Loop;
use React\Promise\CancellablePromiseInterface;
use React\Promise\Deferred;
use React\Promise\Promise;
use React\Promise\PromiseInterface;
use function React\Promise\reject;
use function React\Promise\resolve;

/**
* Execute an async Fiber-based function to "await" promises.
*
* @param callable(mixed ...$args):mixed $function
* @param mixed ...$args Optional list of additional arguments that will be passed to the given `$function` as is
* @return PromiseInterface<mixed>
* @since 4.0.0
* @see coroutine()
*/
function async(callable $coroutine, mixed ...$args): PromiseInterface
{
return new Promise(function (callable $resolve, callable $reject) use ($coroutine, $args): void {
$fiber = new \Fiber(function () use ($resolve, $reject, $coroutine, $args): void {
try {
$resolve($coroutine(...$args));
} catch (\Throwable $exception) {
$reject($exception);
}
});

Loop::futureTick(static fn() => $fiber->start());
});
}


/**
* Block waiting for the given `$promise` to be fulfilled.
*
Expand Down Expand Up @@ -52,48 +78,20 @@
*/
function await(PromiseInterface $promise): mixed
{
$wait = true;
$resolved = null;
$exception = null;
$rejected = false;
$fiber = new SimpleFiber();

$promise->then(
function ($c) use (&$resolved, &$wait) {
$resolved = $c;
$wait = false;
Loop::stop();
function (mixed $value) use (&$resolved, $fiber): void {
$fiber->resume($value);
},
function ($error) use (&$exception, &$rejected, &$wait) {
$exception = $error;
$rejected = true;
$wait = false;
Loop::stop();
function (mixed $throwable) use (&$resolved, $fiber): void {
$fiber->throw($throwable);
}
);

// Explicitly overwrite argument with null value. This ensure that this
// argument does not show up in the stack trace in PHP 7+ only.
$promise = null;

while ($wait) {
Loop::run();
}

if ($rejected) {
// promise is rejected with an unexpected value (Promise API v1 or v2 only)
if (!$exception instanceof \Throwable) {
$exception = new \UnexpectedValueException(
'Promise rejected with unexpected value of type ' . (is_object($exception) ? get_class($exception) : gettype($exception))
);
}

throw $exception;
}

return $resolved;
return $fiber->suspend();
}


/**
* Execute a Generator-based coroutine to "await" promises.
*
Expand Down
87 changes: 87 additions & 0 deletions tests/AsyncTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

namespace React\Tests\Async;

use React;
use React\EventLoop\Loop;
use React\Promise\Promise;
use function React\Async\async;
use function React\Async\await;
use function React\Promise\all;

class AsyncTest extends TestCase
{
public function testAsyncReturnsPendingPromise()
{
$promise = async(function () {
return 42;
});

$promise->then($this->expectCallableNever(), $this->expectCallableNever());
}

public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturns()
{
$promise = async(function () {
return 42;
});

$value = await($promise);

$this->assertEquals(42, $value);
}

public function testAsyncReturnsPromiseThatRejectsWithExceptionWhenCallbackThrows()
{
$promise = async(function () {
throw new \RuntimeException('Foo', 42);
});

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('Foo');
$this->expectExceptionCode(42);
await($promise);
}

public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsAfterAwaitingPromise()
{
$promise = async(function () {
$promise = new Promise(function ($resolve) {
Loop::addTimer(0.001, fn () => $resolve(42));
});

return await($promise);
});

$value = await($promise);

$this->assertEquals(42, $value);
}

public function testAsyncReturnsPromiseThatFulfillsWithValueWhenCallbackReturnsAfterAwaitingTwoConcurrentPromises()
{
$promise1 = async(function () {
$promise = new Promise(function ($resolve) {
Loop::addTimer(0.11, fn () => $resolve(21));
});

return await($promise);
});

$promise2 = async(function () {
$promise = new Promise(function ($resolve) {
Loop::addTimer(0.11, fn () => $resolve(42));
});

return await($promise);
});

$time = microtime(true);
$values = await(all([$promise1, $promise2]));
$time = microtime(true) - $time;

$this->assertEquals([21, 42], $values);
$this->assertGreaterThan(0.1, $time);
$this->assertLessThan(0.12, $time);
}
}
Loading

0 comments on commit 6e08372

Please sign in to comment.