Skip to content

Commit

Permalink
Merge pull request #45 from clue-labs/iterable-v3
Browse files Browse the repository at this point in the history
Support iterable type for `parallel()` + `series()` + `waterfall()`
  • Loading branch information
WyriHaximus authored Jun 13, 2022
2 parents 843ebc2 + 9660313 commit 87eabc0
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 27 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ $promise->then(function (int $bytes) {

### parallel()

The `parallel(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
The `parallel(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
like this:

```php
Expand Down Expand Up @@ -250,7 +250,7 @@ React\Async\parallel([

### series()

The `series(array<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
The `series(iterable<callable():PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<array<mixed>,Exception>` function can be used
like this:

```php
Expand Down Expand Up @@ -292,7 +292,7 @@ React\Async\series([

### waterfall()

The `waterfall(array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
The `waterfall(iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks): PromiseInterface<mixed,Exception>` function can be used
like this:

```php
Expand Down
71 changes: 47 additions & 24 deletions src/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,10 @@ function coroutine(callable $function, ...$args): PromiseInterface
}

/**
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<array<mixed>,Exception>
*/
function parallel(array $tasks): PromiseInterface
function parallel(iterable $tasks): PromiseInterface
{
$pending = [];
$deferred = new Deferred(function () use (&$pending) {
Expand All @@ -296,15 +296,10 @@ function parallel(array $tasks): PromiseInterface
$pending = [];
});
$results = [];
$errored = false;
$continue = true;

$numTasks = count($tasks);
if (0 === $numTasks) {
$deferred->resolve($results);
}

$taskErrback = function ($error) use (&$pending, $deferred, &$errored) {
$errored = true;
$taskErrback = function ($error) use (&$pending, $deferred, &$continue) {
$continue = false;
$deferred->reject($error);

foreach ($pending as $promise) {
Expand All @@ -316,33 +311,39 @@ function parallel(array $tasks): PromiseInterface
};

foreach ($tasks as $i => $task) {
$taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) {
$taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) {
$results[$i] = $result;
unset($pending[$i]);

if (count($results) === $numTasks) {
if (!$pending && !$continue) {
$deferred->resolve($results);
}
};

$promise = call_user_func($task);
$promise = \call_user_func($task);
assert($promise instanceof PromiseInterface);
$pending[$i] = $promise;

$promise->then($taskCallback, $taskErrback);

if ($errored) {
if (!$continue) {
break;
}
}

$continue = false;
if (!$pending) {
$deferred->resolve($results);
}

return $deferred->promise();
}

/**
* @param array<callable():PromiseInterface<mixed,Exception>> $tasks
* @param iterable<callable():PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<array<mixed>,Exception>
*/
function series(array $tasks): PromiseInterface
function series(iterable $tasks): PromiseInterface
{
$pending = null;
$deferred = new Deferred(function () use (&$pending) {
Expand All @@ -353,20 +354,31 @@ function series(array $tasks): PromiseInterface
});
$results = [];

if ($tasks instanceof \IteratorAggregate) {
$tasks = $tasks->getIterator();
assert($tasks instanceof \Iterator);
}

/** @var callable():void $next */
$taskCallback = function ($result) use (&$results, &$next) {
$results[] = $result;
$next();
};

$next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) {
if (0 === count($tasks)) {
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
$deferred->resolve($results);
return;
}

$task = array_shift($tasks);
$promise = call_user_func($task);
if ($tasks instanceof \Iterator) {
$task = $tasks->current();
$tasks->next();
} else {
$task = \array_shift($tasks);
}

$promise = \call_user_func($task);
assert($promise instanceof PromiseInterface);
$pending = $promise;

Expand All @@ -379,10 +391,10 @@ function series(array $tasks): PromiseInterface
}

/**
* @param array<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
* @param iterable<callable(mixed=):PromiseInterface<mixed,Exception>> $tasks
* @return PromiseInterface<mixed,Exception>
*/
function waterfall(array $tasks): PromiseInterface
function waterfall(iterable $tasks): PromiseInterface
{
$pending = null;
$deferred = new Deferred(function () use (&$pending) {
Expand All @@ -392,15 +404,26 @@ function waterfall(array $tasks): PromiseInterface
$pending = null;
});

if ($tasks instanceof \IteratorAggregate) {
$tasks = $tasks->getIterator();
assert($tasks instanceof \Iterator);
}

/** @var callable $next */
$next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) {
if (0 === count($tasks)) {
if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) {
$deferred->resolve($value);
return;
}

$task = array_shift($tasks);
$promise = call_user_func_array($task, func_get_args());
if ($tasks instanceof \Iterator) {
$task = $tasks->current();
$tasks->next();
} else {
$task = \array_shift($tasks);
}

$promise = \call_user_func_array($task, func_get_args());
assert($promise instanceof PromiseInterface);
$pending = $promise;

Expand Down
65 changes: 65 additions & 0 deletions tests/ParallelTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React;
use React\EventLoop\Loop;
use React\Promise\Promise;
use function React\Promise\reject;

class ParallelTest extends TestCase
{
Expand All @@ -17,6 +18,19 @@ public function testParallelWithoutTasks()
$promise->then($this->expectCallableOnceWith(array()));
}

public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
{
$tasks = (function () {
if (false) {
yield;
}
})();

$promise = React\Async\parallel($tasks);

$promise->then($this->expectCallableOnceWith([]));
}

public function testParallelWithTasks()
{
$tasks = array(
Expand Down Expand Up @@ -49,6 +63,38 @@ function () {
$timer->assertInRange(0.1, 0.2);
}

public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
{
$tasks = (function () {
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.1, function () use ($resolve) {
$resolve('foo');
});
});
};
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.11, function () use ($resolve) {
$resolve('bar');
});
});
};
})();

$promise = React\Async\parallel($tasks);

$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));

$timer = new Timer($this);
$timer->start();

Loop::run();

$timer->stop();
$timer->assertInRange(0.1, 0.2);
}

public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;
Expand Down Expand Up @@ -81,6 +127,25 @@ function () use (&$called) {
$this->assertSame(2, $called);
}

public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;

$tasks = (function () use (&$called) {
while (true) {
yield function () use (&$called) {
return reject(new \RuntimeException('Rejected ' . ++$called));
};
}
})();

$promise = React\Async\parallel($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));

$this->assertSame(1, $called);
}

public function testParallelWithErrorWillCancelPendingPromises()
{
$cancelled = 0;
Expand Down
87 changes: 87 additions & 0 deletions tests/SeriesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use React;
use React\EventLoop\Loop;
use React\Promise\Promise;
use function React\Promise\reject;

class SeriesTest extends TestCase
{
Expand All @@ -17,6 +18,19 @@ public function testSeriesWithoutTasks()
$promise->then($this->expectCallableOnceWith(array()));
}

public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray()
{
$tasks = (function () {
if (false) {
yield;
}
})();

$promise = React\Async\series($tasks);

$promise->then($this->expectCallableOnceWith([]));
}

public function testSeriesWithTasks()
{
$tasks = array(
Expand Down Expand Up @@ -49,6 +63,38 @@ function () {
$timer->assertInRange(0.10, 0.20);
}

public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues()
{
$tasks = (function () {
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.051, function () use ($resolve) {
$resolve('foo');
});
});
};
yield function () {
return new Promise(function ($resolve) {
Loop::addTimer(0.051, function () use ($resolve) {
$resolve('bar');
});
});
};
})();

$promise = React\Async\series($tasks);

$promise->then($this->expectCallableOnceWith(array('foo', 'bar')));

$timer = new Timer($this);
$timer->start();

Loop::run();

$timer->stop();
$timer->assertInRange(0.10, 0.20);
}

public function testSeriesWithError()
{
$called = 0;
Expand Down Expand Up @@ -80,6 +126,47 @@ function () use (&$called) {
$this->assertSame(1, $called);
}

public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$called = 0;

$tasks = (function () use (&$called) {
while (true) {
yield function () use (&$called) {
return reject(new \RuntimeException('Rejected ' . ++$called));
};
}
})();

$promise = React\Async\series($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));

$this->assertSame(1, $called);
}

public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks()
{
$tasks = new class() implements \IteratorAggregate {
public $called = 0;

public function getIterator(): \Iterator
{
while (true) {
yield function () {
return reject(new \RuntimeException('Rejected ' . ++$this->called));
};
}
}
};

$promise = React\Async\series($tasks);

$promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1')));

$this->assertSame(1, $tasks->called);
}

public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise()
{
$cancelled = 0;
Expand Down
Loading

0 comments on commit 87eabc0

Please sign in to comment.