From 6ce121f756a58d9f2dc8b53b7a7219b9f05f924b Mon Sep 17 00:00:00 2001 From: Matt Bonneau Date: Mon, 30 Oct 2017 23:37:21 -0400 Subject: [PATCH] Add keepalive and some minor fixes with disposables (#14) * Add keepalive and some minor fixes with disposables * Shorter callables * Correct ping to actually send the frame instead of nothing * Remove old method that should have been removed a long time ago * Mask ping if client --- composer.json | 8 ++- src/Client.php | 7 +- src/MessageSubject.php | 76 ++++++++++++++------ src/Server.php | 7 +- test/ClientTest.php | 35 ---------- test/MessageSubjectTest.php | 135 +++++++++++++++++++++++++++++++++++- test/ServerTest.php | 59 ++++++++++++++++ test/TestCase.php | 32 +++++++++ test/ab/clientRunner.php | 12 ++-- test/ab/fuzzingclient.json | 2 +- test/ab/fuzzingserver.json | 2 +- 11 files changed, 305 insertions(+), 70 deletions(-) create mode 100644 test/ServerTest.php create mode 100644 test/TestCase.php diff --git a/composer.json b/composer.json index 059c38d..528c655 100644 --- a/composer.json +++ b/composer.json @@ -29,8 +29,12 @@ }, "autoload-dev": { "psr-4": { - "Rx\\Websocket\\Test\\": "tests/" - } + "Rx\\Websocket\\Test\\": "test/", + "Rx\\": "vendor/reactivex/rxphp/test/Rx" + }, + "files": [ + "vendor/reactivex/rxphp/test/helper-functions.php" + ] }, "require": { "react/http": "^0.7.3", diff --git a/src/Client.php b/src/Client.php index 161b564..af9bd61 100644 --- a/src/Client.php +++ b/src/Client.php @@ -25,8 +25,9 @@ class Client extends Observable private $subProtocols; private $loop; private $connector; + private $keepAlive; - public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, ConnectorInterface $connector = null) + public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, ConnectorInterface $connector = null, int $keepAlive = 60000) { $parsedUrl = parse_url($url); if (!isset($parsedUrl['scheme']) || !in_array($parsedUrl['scheme'], ['wss', 'ws'])) { @@ -46,6 +47,7 @@ public function __construct(string $url, bool $useMessageObject = false, array $ $this->subProtocols = $subProtocols; $this->loop = $loop ?: \EventLoop\getLoop(); $this->connector = $connector; + $this->keepAlive = $keepAlive; } public function _subscribe(ObserverInterface $clientObserver): DisposableInterface @@ -137,7 +139,8 @@ function () use ($request) { $this->useMessageObject, $subprotoHeader, $nRequest, - $psr7Response + $psr7Response, + $this->keepAlive )); }); diff --git a/src/MessageSubject.php b/src/MessageSubject.php index 2964b38..21e2f4b 100644 --- a/src/MessageSubject.php +++ b/src/MessageSubject.php @@ -10,6 +10,10 @@ use Ratchet\RFC6455\Messaging\Message; use Ratchet\RFC6455\Messaging\MessageBuffer; use Ratchet\RFC6455\Messaging\MessageInterface; +use Rx\Disposable\CallbackDisposable; +use Rx\Disposable\CompositeDisposable; +use Rx\DisposableInterface; +use Rx\Exception\TimeoutException; use Rx\Observable; use Rx\ObserverInterface; use Rx\Subject\Subject; @@ -32,11 +36,12 @@ public function __construct( bool $useMessageObject = false, $subProtocol = "", RequestInterface $request, - ResponseInterface $response + ResponseInterface $response, + int $keepAlive = 60000 ) { $this->request = $request; $this->response = $response; - $this->rawDataIn = $rawDataIn; + $this->rawDataIn = $rawDataIn->share(); $this->rawDataOut = $rawDataOut; $this->mask = $mask; $this->subProtocol = $subProtocol; @@ -46,7 +51,7 @@ public function __construct( function (MessageInterface $msg) use ($useMessageObject) { parent::onNext($useMessageObject ? $msg : $msg->getPayload()); }, - function (FrameInterface $frame) use ($rawDataOut) { + function (FrameInterface $frame) { switch ($frame->getOpcode()) { case Frame::OP_PING: $this->sendFrame(new Frame($frame->getPayload(), true, Frame::OP_PONG)); @@ -63,31 +68,63 @@ function (FrameInterface $frame) use ($rawDataOut) { parent::onError($exception); } - // complete output stream - $rawDataOut->onCompleted(); + $this->rawDataOut->onCompleted(); - // signal subscribers that we are done here - //parent::onCompleted(); + parent::onCompleted(); + + $this->rawDataDisp->dispose(); return; } }, !$this->mask ); - $this->rawDataDisp = $this->rawDataIn->subscribe( - function ($data) use ($messageBuffer) { - $messageBuffer->onData($data); - }, - function (\Exception $exception) { - parent::onError($exception); - }, - function () { - parent::onCompleted(); - }); + // keepAlive + $keepAliveObs = Observable::empty(); + if ($keepAlive > 0) { + $keepAliveObs = $this->rawDataIn + ->startWith(0) + ->throttle($keepAlive / 2) + ->map(function () use ($keepAlive, $rawDataOut) { + return Observable::timer($keepAlive) + ->do(function () use ($rawDataOut) { + $frame = new Frame('', true, Frame::OP_PING); + if ($this->mask) { + $frame->maskPayload(); + } + $rawDataOut->onNext($frame->getContents()); + }) + ->delay($keepAlive) + ->do(function () use ($rawDataOut) { + $rawDataOut->onError(new TimeoutException()); + }); + }) + ->switch() + ->flatMapTo(Observable::never()); + } + + $this->rawDataDisp = $this->rawDataIn + ->merge($keepAliveObs) + ->subscribe( + [$messageBuffer, 'onData'], + [$this, 'parent::onError'], + [$this, 'parent::onCompleted'] + ); $this->subProtocol = $subProtocol; } + protected function _subscribe(ObserverInterface $observer): DisposableInterface + { + $disposable = new CompositeDisposable([ + parent::_subscribe($observer), + $this->rawDataDisp, + new CallbackDisposable([$this->rawDataOut, 'onCompleted']) + ]); + + return $disposable; + } + private function createCloseFrame(int $closeCode = Frame::CLOSE_NORMAL): Frame { $frame = new Frame(pack('n', $closeCode), true, Frame::OP_CLOSE); @@ -112,11 +149,6 @@ public function sendFrame(Frame $frame) $this->rawDataOut->onNext($frame->getContents()); } - public function getControlFrames(): Observable - { - return $this->controlFrames; - } - // The ObserverInterface is commandeered by this class. We will use the parent:: stuff ourselves for notifying // subscribers public function onNext($value) diff --git a/src/Server.php b/src/Server.php index 5b3c0f6..5afb40c 100644 --- a/src/Server.php +++ b/src/Server.php @@ -27,13 +27,15 @@ class Server extends Observable private $useMessageObject; private $subProtocols; private $loop; + private $keepAlive; - public function __construct(string $bindAddressOrPort, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null) + public function __construct(string $bindAddressOrPort, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, int $keepAlive = 60000) { $this->bindAddress = $bindAddressOrPort; $this->useMessageObject = $useMessageObject; $this->subProtocols = $subProtocols; $this->loop = $loop ?: \EventLoop\getLoop(); + $this->keepAlive = $keepAlive; } public function _subscribe(ObserverInterface $observer): DisposableInterface @@ -124,7 +126,8 @@ function () use ($responseStream) { $this->useMessageObject, $subProtocol, $psrRequest, - $negotiatorResponse + $negotiatorResponse, + $this->keepAlive ); $observer->onNext($messageSubject); diff --git a/test/ClientTest.php b/test/ClientTest.php index 336a09c..fe13114 100644 --- a/test/ClientTest.php +++ b/test/ClientTest.php @@ -3,9 +3,6 @@ namespace Rx\Websocket\Test; use React\EventLoop\Factory; -use Rx\Websocket\Client; -use Rx\Websocket\MessageSubject; -use Rx\Websocket\Server; class ClientTest extends \PHPUnit_Framework_TestCase { @@ -29,36 +26,4 @@ function ($err) use (&$errored) { $this->assertTrue($errored); } - - public function testRequestEndOnDispose() - { - $this->markTestSkipped(); - $loop = Factory::create(); - - $server = new Server('tcp://127.0.0.1:1234', false, [], $loop); - $serverDisp = $server->subscribe(function (MessageSubject $ms) { - $ms->map('strrev')->subscribe($ms); - }); - - $value = null; - - $client = new Client('ws://127.0.0.1:1234/', false, [], $loop); - $client - ->subscribe(function (MessageSubject $ms) use ($serverDisp) { - $ms->onNext('Hello'); - $ms - ->finally(function () use ($serverDisp) { - $serverDisp->dispose(); - }) - ->take(1) - ->subscribe(function ($x) use (&$value) { - $this->assertNull($value); - $value = $x; - }); - }); - - $loop->run(); - - $this->assertEquals('olleH', $value); - } } diff --git a/test/MessageSubjectTest.php b/test/MessageSubjectTest.php index 97ee5fe..1076c7d 100644 --- a/test/MessageSubjectTest.php +++ b/test/MessageSubjectTest.php @@ -5,12 +5,14 @@ use GuzzleHttp\Psr7\Request; use GuzzleHttp\Psr7\Response; use Ratchet\RFC6455\Messaging\Frame; +use Rx\Exception\TimeoutException; use Rx\Observer\CallbackObserver; use Rx\Subject\Subject; +use Rx\Testing\MockObserver; use Rx\Websocket\MessageSubject; use Rx\Websocket\WebsocketErrorException; -class MessageSubjectTest extends \PHPUnit_Framework_TestCase +class MessageSubjectTest extends TestCase { public function testCloseCodeSentToOnError() { @@ -50,4 +52,135 @@ function () use (&$closeCode) { $this->assertEquals(4000, $closeCode); } + + public function testPingPongTimeout() + { + $dataIn = $this->createHotObservable([ + onNext(200, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()), + ]); + + $dataOut = new Subject(); + + $ms = new MessageSubject( + $dataIn, + $dataOut, + true, + false, + '', + new Request('GET', '/ws'), + new Response(), + 300 + ); + + $result = $this->scheduler->startWithCreate(function () use ($dataOut) { + return $dataOut; + }); + + $this->assertMessages([ + onNext(650, (new Frame('', true, Frame::OP_PING))->getContents()), + onError(950, new TimeoutException()) + ], $result->getMessages()); + } + + public function testPingPong() + { + $dataIn = $this->createHotObservable([ + onNext(200, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onNext(651, (new Frame('', true, Frame::OP_PONG))->getContents()) + ]); + + $dataOut = new Subject(); + + $ms = new MessageSubject( + $dataIn, + $dataOut, + true, + false, + '', + new Request('GET', '/ws'), + new Response(), + 300 + ); + + $result = $this->scheduler->startWithDispose(function () use ($dataOut) { + return $dataOut; + }, 2000); + + $this->assertMessages([ + onNext(650, (new Frame('', true, Frame::OP_PING))->getContents()), + onNext(951, (new Frame('', true, Frame::OP_PING))->getContents()), + onError(1251, new TimeoutException()) + ], $result->getMessages()); + } + + public function testPingPongDataSuppressesPing() + { + $dataIn = $this->createHotObservable([ + onNext(201, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onNext(649, (new Frame('', true, Frame::OP_TEXT))->getContents()) + ]); + + $dataOut = new Subject(); + + $ms = new MessageSubject( + $dataIn, + $dataOut, + true, + false, + '', + new Request('GET', '/ws'), + new Response(), + 300 + ); + + $result = $this->scheduler->startWithDispose(function () use ($dataOut) { + return $dataOut; + }, 2000); + + $this->assertMessages([ + onNext(949, (new Frame('', true, Frame::OP_PING))->getContents()), + onError(1249, new TimeoutException()) + ], $result->getMessages()); + } + + public function testDisposeOnMessageSubjectClosesConnection() + { + $dataIn = $this->createHotObservable([ + onNext(201, (new Frame('', true, Frame::OP_TEXT))->getContents()), + onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()), + ]); + + $dataOut = new MockObserver($this->scheduler); + + $ms = new MessageSubject( + $dataIn, + $dataOut, + true, + false, + '', + new Request('GET', '/ws'), + new Response(), + 300 + ); + + $result = $this->scheduler->startWithDispose(function () use ($ms) { + return $ms; + }, 300); + + $this->assertMessages([ + onNext(201, ''), + onNext(205, ''), + ], $result->getMessages()); + + $this->assertSubscriptions([ + subscribe(0,300) + ], $dataIn->getSubscriptions()); + + $this->assertMessages([ + onCompleted(300) + ], $dataOut->getMessages()); + } } \ No newline at end of file diff --git a/test/ServerTest.php b/test/ServerTest.php new file mode 100644 index 0000000..66c635a --- /dev/null +++ b/test/ServerTest.php @@ -0,0 +1,59 @@ +subscribe(); + + addTimer(0.1, function () use ($serverDisp) { + $serverDisp->dispose(); + }); + + getLoop()->run(); + + // we are making sure it is not hanging - if it gets here it worked + $this->assertTrue(true); + } + + public function testServerShutsDownAfterOneConnection() + { + $server = new Server('127.0.0.1:1236'); + + $serverDisp = $server->take(1)->subscribe( + function (MessageSubject $ms) { + $ms->map('strrev')->subscribe($ms); + } + ); + + $value = null; + + addTimer(0.1, function () use (&$value) { + $client = new Client('ws://127.0.0.1:1236'); + $client + ->flatMap(function (MessageSubject $ms) { + $ms->send('Hello'); + return $ms; + }) + ->take(1) + ->subscribe(function ($x) use (&$value) { + $this->assertNull($value); + $value = $x; + }); + }); + + getLoop()->run(); + + $this->assertEquals('olleH', $value); + } +} \ No newline at end of file diff --git a/test/TestCase.php b/test/TestCase.php new file mode 100644 index 0000000..0a64795 --- /dev/null +++ b/test/TestCase.php @@ -0,0 +1,32 @@ +scheduler; + }); + } + + public static function resetScheduler() + { + $ref = new \ReflectionClass(Scheduler::class); + $props = $ref->getProperties(); + + foreach ($props as $prop) { + $prop->setAccessible(true); + $prop->setValue(null); + $prop->setAccessible(false); + } + } +} \ No newline at end of file diff --git a/test/ab/clientRunner.php b/test/ab/clientRunner.php index 8536f9c..2eaf5e4 100644 --- a/test/ab/clientRunner.php +++ b/test/ab/clientRunner.php @@ -15,12 +15,12 @@ $client->subscribe(); }; -$runIndividualTest = function ($case) { +$runIndividualTest = function ($case, $timeout = 60000) { echo "Running " . $case . "\n"; - $casePath = "/runCase?case={$case}&agent=" . AGENT; + $casePath = "/runCase?case={$case}&agent=" . AGENT . "-" . $timeout; - $client = new \Rx\Websocket\Client("ws://127.0.0.1:9001" . $casePath, true); + $client = new \Rx\Websocket\Client("ws://127.0.0.1:9001" . $casePath, true, [], null, null, $timeout); $deferred = new \React\Promise\Deferred(); @@ -61,7 +61,11 @@ function () use ($case, $deferred) { $deferred->resolve(); return; } - $runIndividualTest($i)->then($runNextCase); + $runIndividualTest($i, 60000)->then(function ($result) use ($runIndividualTest, &$i) { + // Use this if you want to run with no keepalive + //return $runIndividualTest($i, 0); + return $result; + })->then($runNextCase); }; $runNextCase(); diff --git a/test/ab/fuzzingclient.json b/test/ab/fuzzingclient.json index fb922e3..09cc740 100644 --- a/test/ab/fuzzingclient.json +++ b/test/ab/fuzzingclient.json @@ -8,6 +8,6 @@ "options": {"version": 18}} ], "cases": ["*"], - "exclude-cases": ["9.*", "10.*", "12.*", "13.*"], + "exclude-cases": ["10.*", "12.*", "13.*"], "exclude-agent-cases": {} } diff --git a/test/ab/fuzzingserver.json b/test/ab/fuzzingserver.json index 908de12..54f03c6 100644 --- a/test/ab/fuzzingserver.json +++ b/test/ab/fuzzingserver.json @@ -5,6 +5,6 @@ } , "outdir": "./reports/clients" , "cases": ["*"] - , "exclude-cases": ["9.*", "10.*", "12.*", "13.*"] + , "exclude-cases": ["10.*", "12.*", "13.*"] , "exclude-agent-cases": {} }