diff --git a/src/Worker/DelegatingWorkerPool.php b/src/Worker/DelegatingWorkerPool.php new file mode 100644 index 0000000..abcedfd --- /dev/null +++ b/src/Worker/DelegatingWorkerPool.php @@ -0,0 +1,128 @@ + */ + private array $workerStorage = []; + + private int $pendingWorkerCount = 0; + + /** @var \SplQueue> */ + private readonly \SplQueue $waiting; + + /** + * @param int $limit Maximum number of workers to use from the delegate pool. + */ + public function __construct(private readonly WorkerPool $pool, private readonly int $limit) + { + $this->waiting = new \SplQueue(); + } + + public function isRunning(): bool + { + return $this->pool->isRunning(); + } + + public function isIdle(): bool + { + return $this->pool->isIdle(); + } + + public function submit(Task $task, ?Cancellation $cancellation = null): Execution + { + $worker = $this->selectWorker(); + + $execution = $worker->submit($task, $cancellation); + + $execution->getFuture()->finally(fn () => $this->push($worker))->ignore(); + + return $execution; + } + + private function selectWorker(): Worker + { + do { + if (\count($this->workerStorage) + $this->pendingWorkerCount < $this->limit) { + $this->pendingWorkerCount++; + + try { + $worker = $this->pool->getWorker(); + } finally { + $this->pendingWorkerCount--; + } + } else { + /** @var DeferredFuture $waiting */ + $waiting = new DeferredFuture(); + $this->waiting->push($waiting); + + $worker = $waiting->getFuture()->await(); + if (!$worker?->isRunning()) { + continue; + } + } + + $this->workerStorage[\spl_object_id($worker)] = $worker; + + return $worker; + } while (true); + } + + private function push(Worker $worker): void + { + unset($this->workerStorage[\spl_object_id($worker)]); + + if (!$this->waiting->isEmpty()) { + $deferredFuture = $this->waiting->dequeue(); + $deferredFuture->complete($worker->isRunning() ? $worker : null); + } + } + + public function shutdown(): void + { + if (!$this->waiting->isEmpty()) { + $exception = new WorkerException('The pool was shutdown before a worker could be obtained'); + $this->clearWaiting($exception); + } + + $this->pool->shutdown(); + } + + public function kill(): void + { + if (!$this->waiting->isEmpty()) { + $exception = new WorkerException('The pool was killed before a worker could be obtained'); + $this->clearWaiting($exception); + } + + $this->pool->kill(); + } + + private function clearWaiting(\Throwable $exception): void + { + while (!$this->waiting->isEmpty()) { + $deferredFuture = $this->waiting->dequeue(); + $deferredFuture->error($exception); + } + } + + public function getWorker(): Worker + { + return new PooledWorker($this->selectWorker(), $this->push(...)); + } + + public function getWorkerCount(): int + { + return \min($this->limit, $this->pool->getWorkerCount()); + } + + public function getIdleWorkerCount(): int + { + return \min($this->limit, $this->pool->getIdleWorkerCount()); + } +} diff --git a/test/Worker/AbstractPoolTest.php b/test/Worker/AbstractPoolTest.php index 76eb05a..7cf19f3 100644 --- a/test/Worker/AbstractPoolTest.php +++ b/test/Worker/AbstractPoolTest.php @@ -5,8 +5,6 @@ use Amp\Future; use Amp\Parallel\Context\StatusError; use Amp\Parallel\Test\Worker\Fixtures\TestTask; -use Amp\Parallel\Worker\ContextWorkerFactory; -use Amp\Parallel\Worker\ContextWorkerPool; use Amp\Parallel\Worker\Execution; use Amp\Parallel\Worker\Task; use Amp\Parallel\Worker\Worker; @@ -180,15 +178,8 @@ protected function createWorker(?string $autoloadPath = null): Worker return $this->createPool(autoloadPath: $autoloadPath); } - protected function createPool( + abstract protected function createPool( int $max = WorkerPool::DEFAULT_WORKER_LIMIT, - ?string $autoloadPath = null - ): WorkerPool { - $factory = new ContextWorkerFactory( - bootstrapPath: $autoloadPath, - contextFactory: $this->createContextFactory(), - ); - - return new ContextWorkerPool($max, $factory); - } + ?string $autoloadPath = null, + ): WorkerPool; } diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index d58cd87..2fc46da 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -6,10 +6,8 @@ use Amp\CancelledException; use Amp\DeferredCancellation; use Amp\Future; -use Amp\Parallel\Context\ContextFactory; use Amp\Parallel\Context\StatusError; use Amp\Parallel\Test\Worker\Fixtures\CommunicatingTask; -use Amp\Parallel\Worker\ContextWorkerFactory; use Amp\Parallel\Worker\Task; use Amp\Parallel\Worker\TaskCancelledException; use Amp\Parallel\Worker\TaskFailureError; @@ -382,15 +380,5 @@ public function testCommunicatingJob(): void self::assertSame('out', $execution->await($cancellation)); } - protected function createWorker(?string $autoloadPath = null): Worker - { - $factory = new ContextWorkerFactory( - bootstrapPath: $autoloadPath, - contextFactory: $this->createContextFactory(), - ); - - return $factory->create(); - } - - abstract protected function createContextFactory(): ContextFactory; + abstract protected function createWorker(?string $autoloadPath = null): Worker; } diff --git a/test/Worker/DelegatingWorkerPoolTest.php b/test/Worker/DelegatingWorkerPoolTest.php new file mode 100644 index 0000000..ddf9906 --- /dev/null +++ b/test/Worker/DelegatingWorkerPoolTest.php @@ -0,0 +1,24 @@ +start($script, cancellation: $cancellation); - } - }; + protected function createPool( + int $max = WorkerPool::DEFAULT_WORKER_LIMIT, + ?string $autoloadPath = null, + ): WorkerPool { + $factory = new ContextWorkerFactory( + bootstrapPath: $autoloadPath, + contextFactory: new ProcessContextFactory(), + ); + + return new ContextWorkerPool($max, $factory); } } diff --git a/test/Worker/ProcessWorkerTest.php b/test/Worker/ProcessWorkerTest.php index 02b88d2..ea570b7 100644 --- a/test/Worker/ProcessWorkerTest.php +++ b/test/Worker/ProcessWorkerTest.php @@ -2,20 +2,19 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Cancellation; -use Amp\Parallel\Context\Context; -use Amp\Parallel\Context\ContextFactory; use Amp\Parallel\Context\ProcessContextFactory; +use Amp\Parallel\Worker\ContextWorkerFactory; +use Amp\Parallel\Worker\Worker; class ProcessWorkerTest extends AbstractWorkerTest { - public function createContextFactory(): ContextFactory + protected function createWorker(?string $autoloadPath = null): Worker { - return new class implements ContextFactory { - public function start(array|string $script, ?Cancellation $cancellation = null): Context - { - return (new ProcessContextFactory())->start($script, cancellation: $cancellation); - } - }; + $factory = new ContextWorkerFactory( + bootstrapPath: $autoloadPath, + contextFactory: new ProcessContextFactory(), + ); + + return $factory->create(); } } diff --git a/test/Worker/ThreadPoolTest.php b/test/Worker/ThreadPoolTest.php index 02cb39f..c41f981 100644 --- a/test/Worker/ThreadPoolTest.php +++ b/test/Worker/ThreadPoolTest.php @@ -2,25 +2,27 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Cancellation; -use Amp\Parallel\Context\Context; -use Amp\Parallel\Context\ContextFactory; use Amp\Parallel\Context\ThreadContext; use Amp\Parallel\Context\ThreadContextFactory; +use Amp\Parallel\Worker\ContextWorkerFactory; +use Amp\Parallel\Worker\ContextWorkerPool; +use Amp\Parallel\Worker\WorkerPool; class ThreadPoolTest extends AbstractPoolTest { - public function createContextFactory(): ContextFactory - { + protected function createPool( + int $max = WorkerPool::DEFAULT_WORKER_LIMIT, + ?string $autoloadPath = null, + ): WorkerPool { if (!ThreadContext::isSupported()) { $this->markTestSkipped('ext-parallel required'); } - return new class implements ContextFactory { - public function start(array|string $script, ?Cancellation $cancellation = null): Context - { - return (new ThreadContextFactory())->start($script, cancellation: $cancellation); - } - }; + $factory = new ContextWorkerFactory( + bootstrapPath: $autoloadPath, + contextFactory: new ThreadContextFactory(), + ); + + return new ContextWorkerPool($max, $factory); } } diff --git a/test/Worker/ThreadWorkerTest.php b/test/Worker/ThreadWorkerTest.php index 0b79beb..ea0b546 100644 --- a/test/Worker/ThreadWorkerTest.php +++ b/test/Worker/ThreadWorkerTest.php @@ -2,25 +2,24 @@ namespace Amp\Parallel\Test\Worker; -use Amp\Cancellation; -use Amp\Parallel\Context\Context; -use Amp\Parallel\Context\ContextFactory; use Amp\Parallel\Context\ThreadContext; use Amp\Parallel\Context\ThreadContextFactory; +use Amp\Parallel\Worker\ContextWorkerFactory; +use Amp\Parallel\Worker\Worker; class ThreadWorkerTest extends AbstractWorkerTest { - public function createContextFactory(): ContextFactory + protected function createWorker(?string $autoloadPath = null): Worker { if (!ThreadContext::isSupported()) { $this->markTestSkipped('ext-parallel required'); } - return new class implements ContextFactory { - public function start(array|string $script, ?Cancellation $cancellation = null): Context - { - return (new ThreadContextFactory())->start($script, cancellation: $cancellation); - } - }; + $factory = new ContextWorkerFactory( + bootstrapPath: $autoloadPath, + contextFactory: new ThreadContextFactory(), + ); + + return $factory->create(); } }