Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suspension listeners #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/EventLoop/Listener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Revolt\EventLoop;

interface Listener
{
/**
* Called when a Suspension is suspended.
*
* @param int $id The object ID of the Suspension.
*/
public function onSuspend(int $id): void;

/**
* Called when a Suspension is resumed.
*
* @param int $id The object ID of the Suspension.
*/
public function onResume(int $id): void;
}
91 changes: 79 additions & 12 deletions src/EventLoop/Suspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* **Example**
*
* ```php
* $suspension = Scheduler::createSuspension();
* $suspension = EventLoop::createSuspension();
*
* $promise->then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable));
*
Expand All @@ -19,13 +19,21 @@
*/
final class Suspension
{
/** @var string Next listener ID. */
private static string $nextId = 'a';

/** @var Listener[] */
private static array $listeners = [];

private static bool $invokingListeners = false;

private ?\Fiber $fiber;
private \Fiber $scheduler;
private Driver $driver;
private bool $pending = false;

/**
* Suspension constructor.
* Use {@see EventLoop::createSuspension()} to create Suspensions.
*
* @param Driver $driver
* @param \Fiber $scheduler
Expand Down Expand Up @@ -54,6 +62,10 @@ public function throw(\Throwable $throwable): void
throw new \Error('Must call throw() before calling resume()');
}

if (self::$invokingListeners) {
throw new \Error('Cannot call throw() within a suspension listener');
}

$this->pending = false;

if ($this->fiber) {
Expand All @@ -70,6 +82,10 @@ public function resume(mixed $value): void
throw new \Error('Must call suspend() before calling resume()');
}

if (self::$invokingListeners) {
throw new \Error('Cannot call throw() within a suspension listener');
}

$this->pending = false;

if ($this->fiber) {
Expand All @@ -90,22 +106,73 @@ public function suspend(): mixed
throw new \Error('Must not call suspend() from another fiber');
}

if (self::$invokingListeners) {
throw new \Error('Cannot call suspend() within a suspension listener');
}

$this->pending = true;

// Awaiting from within a fiber.
if ($this->fiber) {
return \Fiber::suspend();
if (!empty(self::$listeners)) {
$this->invokeListeners('onSuspend');
}

// Awaiting from {main}.
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();
try {
// Awaiting from within a fiber.
if ($this->fiber) {
return \Fiber::suspend();
}

// Awaiting from {main}.
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// Should only be true if the event loop exited without resolving the promise.
throw new \Error('Event loop suspended or exited unexpectedly');
}

return $lambda();
} finally {
if (!empty(self::$listeners)) {
$this->invokeListeners('onResume');
}
}
}

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// Should only be true if the event loop exited without resolving the promise.
throw new \Error('Scheduler suspended or exited unexpectedly');
private function invokeListeners(string $method): void
{
$id = \spl_object_id($this);
self::$invokingListeners = true;
foreach (self::$listeners as $listener) {
try {
$listener->{$method}($id);
} catch (\Throwable $exception) {
$this->driver->queue(static fn () => throw $exception);
}
}
self::$invokingListeners = false;
}

/**
* Add a listener that is invoked when any Suspension is suspended, resumed, or destroyed.
*
* @param Listener $listener
* @return string ID that can be used to remove the listener using {@see unlisten()}.
*/
public static function listen(Listener $listener): string
{
$id = self::$nextId++;
self::$listeners[$id] = $listener;
return $id;
}

return $lambda();
/**
* Remove the suspension listener.
*
* @param string $id
*/
public static function unlisten(string $id): void
{
unset(self::$listeners[$id]);
}
}
183 changes: 183 additions & 0 deletions test/SuspensionTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
<?php

namespace Revolt\EventLoop;

use PHPUnit\Framework\TestCase;
use Revolt\EventLoop;

class SuspensionTest extends TestCase
{
public function testListen(): void
{
$listener = new class () implements Listener {
public int $suspended = 0;
public int $resumed = 0;

public function onSuspend(int $id): void
{
++$this->suspended;
}

public function onResume(int $id): void
{
++$this->resumed;
}
};

$id = Suspension::listen($listener);

$suspension = EventLoop::createSuspension();
EventLoop::defer(fn () => $suspension->resume(null));

$suspension->suspend();

self::assertSame(1, $listener->suspended);
self::assertSame(1, $listener->resumed);

Suspension::listen($listener);

$suspension = EventLoop::createSuspension();
EventLoop::defer(fn () => $suspension->throw(new \Exception()));

try {
$suspension->suspend();
self::fail('Exception was expected to be thrown from suspend');
} catch (\Exception $e) {
// Expected, ignore.
}

self::assertSame(3, $listener->suspended);
self::assertSame(3, $listener->resumed);

Suspension::unlisten($id);

$suspension = EventLoop::createSuspension();
EventLoop::defer(fn () => $suspension->resume(null));

$suspension->suspend();

self::assertSame(4, $listener->suspended);
self::assertSame(4, $listener->resumed);
}

public function provideListenerMethods(): iterable
{
$reflectionClass = new \ReflectionClass(Listener::class);
$methods = $reflectionClass->getMethods();
return \array_map(static fn (\ReflectionMethod $reflectionMethod) => [$reflectionMethod->getName()], $methods);
}

/**
* @dataProvider provideListenerMethods
*/
public function testSuspendDuringListenerInvocation(string $functionName): void
{
$suspension = EventLoop::createSuspension();

$listener = new class ($functionName, $suspension) implements Listener {
public function __construct(
private string $functionName,
private Suspension $suspension,
) {
}

public function onSuspend(int $id): void
{
if ($this->functionName === __FUNCTION__) {
$this->suspension->suspend();
}
}

public function onResume(int $id): void
{
if ($this->functionName === __FUNCTION__) {
$this->suspension->suspend();
}
}
};

Suspension::listen($listener);

$suspension = EventLoop::createSuspension();
EventLoop::defer(fn () => $suspension->resume(null));

self::expectException(\Error::class);
self::expectExceptionMessage('within a suspension listener');

$suspension->suspend();
}

/**
* @dataProvider provideListenerMethods
*/
public function testResumeDuringListenerInvocation(string $functionName): void
{
$suspension = EventLoop::createSuspension();

$listener = new class ($functionName, $suspension) implements Listener {
public function __construct(
private string $functionName,
private Suspension $suspension,
) {
}

public function onSuspend(int $id): void
{
if ($this->functionName === __FUNCTION__) {
$this->suspension->resume(null);
}
}

public function onResume(int $id): void
{
if ($this->functionName === __FUNCTION__) {
$this->suspension->resume(null);
}
}
};

Suspension::listen($listener);

self::expectException(\Error::class);
self::expectExceptionMessage('within a suspension listener');

$suspension->suspend();
}

/**
* @dataProvider provideListenerMethods
*/
public function testThrowDuringListenerInvocation(string $functionName): void
{
$suspension = EventLoop::createSuspension();

$listener = new class ($functionName, $suspension) implements Listener {
public function __construct(
private string $functionName,
private Suspension $suspension,
) {
}

public function onSuspend(int $id): void
{
if ($this->functionName === __FUNCTION__) {
$this->suspension->throw(new \Exception());
}
}

public function onResume(int $id): void
{
if ($this->functionName === __FUNCTION__) {
$this->suspension->throw(new \Exception());
}
}
};

Suspension::listen($listener);

self::expectException(\Error::class);
self::expectExceptionMessage('within a suspension listener');

$suspension->suspend();
}
}