diff --git a/src/EventLoop/Listener.php b/src/EventLoop/Listener.php new file mode 100644 index 0000000..1b3ab57 --- /dev/null +++ b/src/EventLoop/Listener.php @@ -0,0 +1,20 @@ +then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable)); * @@ -19,13 +19,21 @@ */ final class Suspension { + /** @var string Next listener ID. */ + private static string $nextId = 'a'; + + /** @var Listener[] */ + private static array $listeners = []; + + private static bool $invokingListeners = false; + private ?\Fiber $fiber; private \Fiber $scheduler; private Driver $driver; private bool $pending = false; /** - * Suspension constructor. + * Use {@see EventLoop::createSuspension()} to create Suspensions. * * @param Driver $driver * @param \Fiber $scheduler @@ -54,6 +62,10 @@ public function throw(\Throwable $throwable): void throw new \Error('Must call throw() before calling resume()'); } + if (self::$invokingListeners) { + throw new \Error('Cannot call throw() within a suspension listener'); + } + $this->pending = false; if ($this->fiber) { @@ -70,6 +82,10 @@ public function resume(mixed $value): void throw new \Error('Must call suspend() before calling resume()'); } + if (self::$invokingListeners) { + throw new \Error('Cannot call throw() within a suspension listener'); + } + $this->pending = false; if ($this->fiber) { @@ -90,22 +106,73 @@ public function suspend(): mixed throw new \Error('Must not call suspend() from another fiber'); } + if (self::$invokingListeners) { + throw new \Error('Cannot call suspend() within a suspension listener'); + } + $this->pending = true; - // Awaiting from within a fiber. - if ($this->fiber) { - return \Fiber::suspend(); + if (!empty(self::$listeners)) { + $this->invokeListeners('onSuspend'); } - // Awaiting from {main}. - $lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start(); + try { + // Awaiting from within a fiber. + if ($this->fiber) { + return \Fiber::suspend(); + } + + // Awaiting from {main}. + $lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start(); + + /** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */ + if ($this->pending) { + // Should only be true if the event loop exited without resolving the promise. + throw new \Error('Event loop suspended or exited unexpectedly'); + } + + return $lambda(); + } finally { + if (!empty(self::$listeners)) { + $this->invokeListeners('onResume'); + } + } + } - /** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */ - if ($this->pending) { - // Should only be true if the event loop exited without resolving the promise. - throw new \Error('Scheduler suspended or exited unexpectedly'); + private function invokeListeners(string $method): void + { + $id = \spl_object_id($this); + self::$invokingListeners = true; + foreach (self::$listeners as $listener) { + try { + $listener->{$method}($id); + } catch (\Throwable $exception) { + $this->driver->queue(static fn () => throw $exception); + } } + self::$invokingListeners = false; + } + + /** + * Add a listener that is invoked when any Suspension is suspended, resumed, or destroyed. + * + * @param Listener $listener + * @return string ID that can be used to remove the listener using {@see unlisten()}. + */ + public static function listen(Listener $listener): string + { + $id = self::$nextId++; + self::$listeners[$id] = $listener; + return $id; + } - return $lambda(); + /** + * Remove the suspension listener. + * + * @param string $id + */ + public static function unlisten(string $id): void + { + unset(self::$listeners[$id]); } } diff --git a/test/SuspensionTest.php b/test/SuspensionTest.php new file mode 100644 index 0000000..eb67c2f --- /dev/null +++ b/test/SuspensionTest.php @@ -0,0 +1,183 @@ +suspended; + } + + public function onResume(int $id): void + { + ++$this->resumed; + } + }; + + $id = Suspension::listen($listener); + + $suspension = EventLoop::createSuspension(); + EventLoop::defer(fn () => $suspension->resume(null)); + + $suspension->suspend(); + + self::assertSame(1, $listener->suspended); + self::assertSame(1, $listener->resumed); + + Suspension::listen($listener); + + $suspension = EventLoop::createSuspension(); + EventLoop::defer(fn () => $suspension->throw(new \Exception())); + + try { + $suspension->suspend(); + self::fail('Exception was expected to be thrown from suspend'); + } catch (\Exception $e) { + // Expected, ignore. + } + + self::assertSame(3, $listener->suspended); + self::assertSame(3, $listener->resumed); + + Suspension::unlisten($id); + + $suspension = EventLoop::createSuspension(); + EventLoop::defer(fn () => $suspension->resume(null)); + + $suspension->suspend(); + + self::assertSame(4, $listener->suspended); + self::assertSame(4, $listener->resumed); + } + + public function provideListenerMethods(): iterable + { + $reflectionClass = new \ReflectionClass(Listener::class); + $methods = $reflectionClass->getMethods(); + return \array_map(static fn (\ReflectionMethod $reflectionMethod) => [$reflectionMethod->getName()], $methods); + } + + /** + * @dataProvider provideListenerMethods + */ + public function testSuspendDuringListenerInvocation(string $functionName): void + { + $suspension = EventLoop::createSuspension(); + + $listener = new class ($functionName, $suspension) implements Listener { + public function __construct( + private string $functionName, + private Suspension $suspension, + ) { + } + + public function onSuspend(int $id): void + { + if ($this->functionName === __FUNCTION__) { + $this->suspension->suspend(); + } + } + + public function onResume(int $id): void + { + if ($this->functionName === __FUNCTION__) { + $this->suspension->suspend(); + } + } + }; + + Suspension::listen($listener); + + $suspension = EventLoop::createSuspension(); + EventLoop::defer(fn () => $suspension->resume(null)); + + self::expectException(\Error::class); + self::expectExceptionMessage('within a suspension listener'); + + $suspension->suspend(); + } + + /** + * @dataProvider provideListenerMethods + */ + public function testResumeDuringListenerInvocation(string $functionName): void + { + $suspension = EventLoop::createSuspension(); + + $listener = new class ($functionName, $suspension) implements Listener { + public function __construct( + private string $functionName, + private Suspension $suspension, + ) { + } + + public function onSuspend(int $id): void + { + if ($this->functionName === __FUNCTION__) { + $this->suspension->resume(null); + } + } + + public function onResume(int $id): void + { + if ($this->functionName === __FUNCTION__) { + $this->suspension->resume(null); + } + } + }; + + Suspension::listen($listener); + + self::expectException(\Error::class); + self::expectExceptionMessage('within a suspension listener'); + + $suspension->suspend(); + } + + /** + * @dataProvider provideListenerMethods + */ + public function testThrowDuringListenerInvocation(string $functionName): void + { + $suspension = EventLoop::createSuspension(); + + $listener = new class ($functionName, $suspension) implements Listener { + public function __construct( + private string $functionName, + private Suspension $suspension, + ) { + } + + public function onSuspend(int $id): void + { + if ($this->functionName === __FUNCTION__) { + $this->suspension->throw(new \Exception()); + } + } + + public function onResume(int $id): void + { + if ($this->functionName === __FUNCTION__) { + $this->suspension->throw(new \Exception()); + } + } + }; + + Suspension::listen($listener); + + self::expectException(\Error::class); + self::expectExceptionMessage('within a suspension listener'); + + $suspension->suspend(); + } +}