From a15a9e7d02eb233079d1ea8be8def11b6af1e895 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sun, 5 Jun 2022 19:39:31 +0200 Subject: [PATCH 1/2] Support `iterable` type for `parallel()` + `series()` + `waterfall()` --- README.md | 6 ++--- src/functions.php | 24 ++++++++++++++----- tests/ParallelTest.php | 45 +++++++++++++++++++++++++++++++++++ tests/SeriesTest.php | 45 +++++++++++++++++++++++++++++++++++ tests/WaterfallTest.php | 52 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 163 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index f172404..b0887a5 100644 --- a/README.md +++ b/README.md @@ -208,7 +208,7 @@ $promise->then(function (int $bytes) { ### parallel() -The `parallel(array> $tasks): PromiseInterface,Exception>` function can be used +The `parallel(iterable> $tasks): PromiseInterface,Exception>` function can be used like this: ```php @@ -250,7 +250,7 @@ React\Async\parallel([ ### series() -The `series(array> $tasks): PromiseInterface,Exception>` function can be used +The `series(iterable> $tasks): PromiseInterface,Exception>` function can be used like this: ```php @@ -292,7 +292,7 @@ React\Async\series([ ### waterfall() -The `waterfall(array> $tasks): PromiseInterface` function can be used +The `waterfall(iterable> $tasks): PromiseInterface` function can be used like this: ```php diff --git a/src/functions.php b/src/functions.php index a3e0390..2eaad39 100644 --- a/src/functions.php +++ b/src/functions.php @@ -282,10 +282,10 @@ function coroutine(callable $function, ...$args): PromiseInterface } /** - * @param array> $tasks + * @param iterable> $tasks * @return PromiseInterface,Exception> */ -function parallel(array $tasks): PromiseInterface +function parallel(iterable $tasks): PromiseInterface { $pending = []; $deferred = new Deferred(function () use (&$pending) { @@ -299,6 +299,10 @@ function parallel(array $tasks): PromiseInterface $results = []; $errored = false; + if (!\is_array($tasks)) { + $tasks = \iterator_to_array($tasks); + } + $numTasks = count($tasks); if (0 === $numTasks) { $deferred->resolve($results); @@ -340,10 +344,10 @@ function parallel(array $tasks): PromiseInterface } /** - * @param array> $tasks + * @param iterable> $tasks * @return PromiseInterface,Exception> */ -function series(array $tasks): PromiseInterface +function series(iterable $tasks): PromiseInterface { $pending = null; $deferred = new Deferred(function () use (&$pending) { @@ -354,6 +358,10 @@ function series(array $tasks): PromiseInterface }); $results = []; + if (!\is_array($tasks)) { + $tasks = \iterator_to_array($tasks); + } + /** @var callable():void $next */ $taskCallback = function ($result) use (&$results, &$next) { $results[] = $result; @@ -380,10 +388,10 @@ function series(array $tasks): PromiseInterface } /** - * @param array> $tasks + * @param iterable> $tasks * @return PromiseInterface */ -function waterfall(array $tasks): PromiseInterface +function waterfall(iterable $tasks): PromiseInterface { $pending = null; $deferred = new Deferred(function () use (&$pending) { @@ -393,6 +401,10 @@ function waterfall(array $tasks): PromiseInterface $pending = null; }); + if (!\is_array($tasks)) { + $tasks = \iterator_to_array($tasks); + } + /** @var callable $next */ $next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) { if (0 === count($tasks)) { diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index b77a3ca..284ccc0 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -17,6 +17,19 @@ public function testParallelWithoutTasks() $promise->then($this->expectCallableOnceWith(array())); } + public function testParallelWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray() + { + $tasks = (function () { + if (false) { + yield; + } + })(); + + $promise = React\Async\parallel($tasks); + + $promise->then($this->expectCallableOnceWith([])); + } + public function testParallelWithTasks() { $tasks = array( @@ -49,6 +62,38 @@ function () { $timer->assertInRange(0.1, 0.2); } + public function testParallelWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues() + { + $tasks = (function () { + yield function () { + return new Promise(function ($resolve) { + Loop::addTimer(0.1, function () use ($resolve) { + $resolve('foo'); + }); + }); + }; + yield function () { + return new Promise(function ($resolve) { + Loop::addTimer(0.11, function () use ($resolve) { + $resolve('bar'); + }); + }); + }; + })(); + + $promise = React\Async\parallel($tasks); + + $promise->then($this->expectCallableOnceWith(array('foo', 'bar'))); + + $timer = new Timer($this); + $timer->start(); + + Loop::run(); + + $timer->stop(); + $timer->assertInRange(0.1, 0.2); + } + public function testParallelWithErrorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() { $called = 0; diff --git a/tests/SeriesTest.php b/tests/SeriesTest.php index 7cedf91..38937eb 100644 --- a/tests/SeriesTest.php +++ b/tests/SeriesTest.php @@ -17,6 +17,19 @@ public function testSeriesWithoutTasks() $promise->then($this->expectCallableOnceWith(array())); } + public function testSeriesWithoutTasksFromEmptyGeneratorResolvesWithEmptyArray() + { + $tasks = (function () { + if (false) { + yield; + } + })(); + + $promise = React\Async\series($tasks); + + $promise->then($this->expectCallableOnceWith([])); + } + public function testSeriesWithTasks() { $tasks = array( @@ -49,6 +62,38 @@ function () { $timer->assertInRange(0.10, 0.20); } + public function testSeriesWithTasksFromGeneratorResolvesWithArrayOfFulfillmentValues() + { + $tasks = (function () { + yield function () { + return new Promise(function ($resolve) { + Loop::addTimer(0.051, function () use ($resolve) { + $resolve('foo'); + }); + }); + }; + yield function () { + return new Promise(function ($resolve) { + Loop::addTimer(0.051, function () use ($resolve) { + $resolve('bar'); + }); + }); + }; + })(); + + $promise = React\Async\series($tasks); + + $promise->then($this->expectCallableOnceWith(array('foo', 'bar'))); + + $timer = new Timer($this); + $timer->start(); + + Loop::run(); + + $timer->stop(); + $timer->assertInRange(0.10, 0.20); + } + public function testSeriesWithError() { $called = 0; diff --git a/tests/WaterfallTest.php b/tests/WaterfallTest.php index b0c5c3c..70f1ee6 100644 --- a/tests/WaterfallTest.php +++ b/tests/WaterfallTest.php @@ -17,6 +17,19 @@ public function testWaterfallWithoutTasks() $promise->then($this->expectCallableOnceWith(null)); } + public function testWaterfallWithoutTasksFromEmptyGeneratorResolvesWithNull() + { + $tasks = (function () { + if (false) { + yield; + } + })(); + + $promise = React\Async\waterfall($tasks); + + $promise->then($this->expectCallableOnceWith(null)); + } + public function testWaterfallWithTasks() { $tasks = array( @@ -56,6 +69,45 @@ function ($bar) { $timer->assertInRange(0.15, 0.30); } + public function testWaterfallWithTasksFromGeneratorResolvesWithFinalFulfillmentValue() + { + $tasks = (function () { + yield function ($foo = 'foo') { + return new Promise(function ($resolve) use ($foo) { + Loop::addTimer(0.05, function () use ($resolve, $foo) { + $resolve($foo); + }); + }); + }; + yield function ($foo) { + return new Promise(function ($resolve) use ($foo) { + Loop::addTimer(0.05, function () use ($resolve, $foo) { + $resolve($foo . 'bar'); + }); + }); + }; + yield function ($bar) { + return new Promise(function ($resolve) use ($bar) { + Loop::addTimer(0.05, function () use ($resolve, $bar) { + $resolve($bar . 'baz'); + }); + }); + }; + })(); + + $promise = React\Async\waterfall($tasks); + + $promise->then($this->expectCallableOnceWith('foobarbaz')); + + $timer = new Timer($this); + $timer->start(); + + Loop::run(); + + $timer->stop(); + $timer->assertInRange(0.15, 0.30); + } + public function testWaterfallWithError() { $called = 0; From 9660313007018542e10c4f19c8d8c8a99f26d2f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sat, 11 Jun 2022 14:22:27 +0200 Subject: [PATCH 2/2] Take advantage of iterators instead of converting to array first --- src/functions.php | 63 ++++++++++++++++++++++++----------------- tests/ParallelTest.php | 20 +++++++++++++ tests/SeriesTest.php | 42 +++++++++++++++++++++++++++ tests/WaterfallTest.php | 42 +++++++++++++++++++++++++++ 4 files changed, 141 insertions(+), 26 deletions(-) diff --git a/src/functions.php b/src/functions.php index 2eaad39..02fbd88 100644 --- a/src/functions.php +++ b/src/functions.php @@ -297,19 +297,10 @@ function parallel(iterable $tasks): PromiseInterface $pending = []; }); $results = []; - $errored = false; + $continue = true; - if (!\is_array($tasks)) { - $tasks = \iterator_to_array($tasks); - } - - $numTasks = count($tasks); - if (0 === $numTasks) { - $deferred->resolve($results); - } - - $taskErrback = function ($error) use (&$pending, $deferred, &$errored) { - $errored = true; + $taskErrback = function ($error) use (&$pending, $deferred, &$continue) { + $continue = false; $deferred->reject($error); foreach ($pending as $promise) { @@ -321,25 +312,31 @@ function parallel(iterable $tasks): PromiseInterface }; foreach ($tasks as $i => $task) { - $taskCallback = function ($result) use (&$results, &$pending, $numTasks, $i, $deferred) { + $taskCallback = function ($result) use (&$results, &$pending, &$continue, $i, $deferred) { $results[$i] = $result; + unset($pending[$i]); - if (count($results) === $numTasks) { + if (!$pending && !$continue) { $deferred->resolve($results); } }; - $promise = call_user_func($task); + $promise = \call_user_func($task); assert($promise instanceof PromiseInterface); $pending[$i] = $promise; $promise->then($taskCallback, $taskErrback); - if ($errored) { + if (!$continue) { break; } } + $continue = false; + if (!$pending) { + $deferred->resolve($results); + } + return $deferred->promise(); } @@ -358,8 +355,9 @@ function series(iterable $tasks): PromiseInterface }); $results = []; - if (!\is_array($tasks)) { - $tasks = \iterator_to_array($tasks); + if ($tasks instanceof \IteratorAggregate) { + $tasks = $tasks->getIterator(); + assert($tasks instanceof \Iterator); } /** @var callable():void $next */ @@ -369,13 +367,19 @@ function series(iterable $tasks): PromiseInterface }; $next = function () use (&$tasks, $taskCallback, $deferred, &$results, &$pending) { - if (0 === count($tasks)) { + if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) { $deferred->resolve($results); return; } - $task = array_shift($tasks); - $promise = call_user_func($task); + if ($tasks instanceof \Iterator) { + $task = $tasks->current(); + $tasks->next(); + } else { + $task = \array_shift($tasks); + } + + $promise = \call_user_func($task); assert($promise instanceof PromiseInterface); $pending = $promise; @@ -401,19 +405,26 @@ function waterfall(iterable $tasks): PromiseInterface $pending = null; }); - if (!\is_array($tasks)) { - $tasks = \iterator_to_array($tasks); + if ($tasks instanceof \IteratorAggregate) { + $tasks = $tasks->getIterator(); + assert($tasks instanceof \Iterator); } /** @var callable $next */ $next = function ($value = null) use (&$tasks, &$next, $deferred, &$pending) { - if (0 === count($tasks)) { + if ($tasks instanceof \Iterator ? !$tasks->valid() : !$tasks) { $deferred->resolve($value); return; } - $task = array_shift($tasks); - $promise = call_user_func_array($task, func_get_args()); + if ($tasks instanceof \Iterator) { + $task = $tasks->current(); + $tasks->next(); + } else { + $task = \array_shift($tasks); + } + + $promise = \call_user_func_array($task, func_get_args()); assert($promise instanceof PromiseInterface); $pending = $promise; diff --git a/tests/ParallelTest.php b/tests/ParallelTest.php index 284ccc0..1a5759b 100644 --- a/tests/ParallelTest.php +++ b/tests/ParallelTest.php @@ -5,6 +5,7 @@ use React; use React\EventLoop\Loop; use React\Promise\Promise; +use function React\Promise\reject; class ParallelTest extends TestCase { @@ -126,6 +127,25 @@ function () use (&$called) { $this->assertSame(2, $called); } + public function testParallelWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() + { + $called = 0; + + $tasks = (function () use (&$called) { + while (true) { + yield function () use (&$called) { + return reject(new \RuntimeException('Rejected ' . ++$called)); + }; + } + })(); + + $promise = React\Async\parallel($tasks); + + $promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1'))); + + $this->assertSame(1, $called); + } + public function testParallelWithErrorWillCancelPendingPromises() { $cancelled = 0; diff --git a/tests/SeriesTest.php b/tests/SeriesTest.php index 38937eb..2583639 100644 --- a/tests/SeriesTest.php +++ b/tests/SeriesTest.php @@ -5,6 +5,7 @@ use React; use React\EventLoop\Loop; use React\Promise\Promise; +use function React\Promise\reject; class SeriesTest extends TestCase { @@ -125,6 +126,47 @@ function () use (&$called) { $this->assertSame(1, $called); } + public function testSeriesWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() + { + $called = 0; + + $tasks = (function () use (&$called) { + while (true) { + yield function () use (&$called) { + return reject(new \RuntimeException('Rejected ' . ++$called)); + }; + } + })(); + + $promise = React\Async\series($tasks); + + $promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1'))); + + $this->assertSame(1, $called); + } + + public function testSeriesWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() + { + $tasks = new class() implements \IteratorAggregate { + public $called = 0; + + public function getIterator(): \Iterator + { + while (true) { + yield function () { + return reject(new \RuntimeException('Rejected ' . ++$this->called)); + }; + } + } + }; + + $promise = React\Async\series($tasks); + + $promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1'))); + + $this->assertSame(1, $tasks->called); + } + public function testSeriesWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise() { $cancelled = 0; diff --git a/tests/WaterfallTest.php b/tests/WaterfallTest.php index 70f1ee6..ace1877 100644 --- a/tests/WaterfallTest.php +++ b/tests/WaterfallTest.php @@ -5,6 +5,7 @@ use React; use React\EventLoop\Loop; use React\Promise\Promise; +use function React\Promise\reject; class WaterfallTest extends TestCase { @@ -139,6 +140,47 @@ function () use (&$called) { $this->assertSame(1, $called); } + public function testWaterfallWithErrorFromInfiniteGeneratorReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() + { + $called = 0; + + $tasks = (function () use (&$called) { + while (true) { + yield function () use (&$called) { + return reject(new \RuntimeException('Rejected ' . ++$called)); + }; + } + })(); + + $promise = React\Async\waterfall($tasks); + + $promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1'))); + + $this->assertSame(1, $called); + } + + public function testWaterfallWithErrorFromInfiniteIteratorAggregateReturnsPromiseRejectedWithExceptionFromTaskAndStopsCallingAdditionalTasks() + { + $tasks = new class() implements \IteratorAggregate { + public $called = 0; + + public function getIterator(): \Iterator + { + while (true) { + yield function () { + return reject(new \RuntimeException('Rejected ' . ++$this->called)); + }; + } + } + }; + + $promise = React\Async\waterfall($tasks); + + $promise->then(null, $this->expectCallableOnceWith(new \RuntimeException('Rejected 1'))); + + $this->assertSame(1, $tasks->called); + } + public function testWaterfallWillCancelFirstPendingPromiseWhenCallingCancelOnResultingPromise() { $cancelled = 0;