From 200067da461b0239c5070e5e465071a1859a366c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Wed, 19 Sep 2018 21:49:01 +0200 Subject: [PATCH 1/2] Add lazy connection to connect in the background --- README.md | 92 +++++++++-- composer.json | 1 + examples/01-query.php | 41 +++-- examples/02-query-stream.php | 29 ++-- src/Factory.php | 71 +++++++++ src/Io/LazyConnection.php | 108 +++++++++++++ tests/FactoryTest.php | 56 +++++++ tests/Io/LazyConnectionTest.php | 262 ++++++++++++++++++++++++++++++++ tests/ResultQueryTest.php | 39 +++++ 9 files changed, 649 insertions(+), 50 deletions(-) create mode 100644 src/Io/LazyConnection.php create mode 100644 tests/Io/LazyConnectionTest.php diff --git a/README.md b/README.md index dbc2803..1ef1250 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ It is written in pure PHP and does not require any extensions. * [Usage](#usage) * [Factory](#factory) * [createConnection()](#createconnection) + * [createLazyConnection()](#createlazyconnection) * [ConnectionInterface](#connectioninterface) * [query()](#query) * [queryStream()](#querystream) @@ -35,20 +36,20 @@ $loop = React\EventLoop\Factory::create(); $factory = new Factory($loop); $uri = 'test:test@localhost/test'; -$factory->createConnection($uri)->then(function (ConnectionInterface $connection) { - $connection->query('SELECT * FROM book')->then( - function (QueryResult $command) { - print_r($command->resultFields); - print_r($command->resultRows); - echo count($command->resultRows) . ' row(s) in set' . PHP_EOL; - }, - function (Exception $error) { - echo 'Error: ' . $error->getMessage() . PHP_EOL; - } - ); - - $connection->quit(); -}); +$connection = $factory->createLazyConnection($uri); + +$connection->query('SELECT * FROM book')->then( + function (QueryResult $command) { + print_r($command->resultFields); + print_r($command->resultRows); + echo count($command->resultRows) . ' row(s) in set' . PHP_EOL; + }, + function (Exception $error) { + echo 'Error: ' . $error->getMessage() . PHP_EOL; + } +); + +$connection->quit(); $loop->run(); ``` @@ -154,6 +155,69 @@ authentication. You can explicitly pass a custom timeout value in seconds $factory->createConnection('localhost?timeout=0.5'); ``` +#### createLazyConnection() + +Creates a new connection. + +It helps with establishing a TCP/IP connection to your MySQL database +and issuing the initial authentication handshake. + +```php +$connection = $factory->createLazyConnection($url); + +$connection->query(…); +``` + +This method immediately returns a "virtual" connection implementing the +[`ConnectionInterface`](#connectioninterface) that can be used to +interface with your MySQL database. Internally, it lazily creates the +underlying database connection (which may take some time) and will +queue all outstanding requests until the underlying connection is ready. + +From a consumer side this means that you can start sending queries to the +database right away while the connection may still be pending. It will +ensure that all commands will be executed in the order they are enqueued +once the connection is ready. If the database connection fails, it will +emit an `error` event, reject all outstanding commands and `close` the +connection as described in the `ConnectionInterface`. In other words, it +behaves just like a real connection and frees you from having to deal +with its async resolution. + +Depending on your particular use case, you may prefer this method or the +underlying `createConnection()` which resolves with a promise. For many +simple use cases it may be easier to create a lazy connection. + +The `$url` parameter must contain the database host, optional +authentication, port and database to connect to: + +```php +$factory->createLazyConnection('user:secret@localhost:3306/database'); +``` + +You can omit the port if you're connecting to default port `3306`: + +```php +$factory->createLazyConnection('user:secret@localhost/database'); +``` + +If you do not include authentication and/or database, then this method +will default to trying to connect as user `root` with an empty password +and no database selected. This may be useful when initially setting up a +database, but likely to yield an authentication error in a production system: + +```php +$factory->createLazyConnection('localhost'); +``` + +This method respects PHP's `default_socket_timeout` setting (default 60s) +as a timeout for establishing the connection and waiting for successful +authentication. You can explicitly pass a custom timeout value in seconds +(or use a negative number to not apply a timeout) like this: + +```php +$factory->createLazyConnection('localhost?timeout=0.5'); +``` + ### ConnectionInterface The `ConnectionInterface` represents a connection that is responsible for diff --git a/composer.json b/composer.json index 810ea54..a119f83 100644 --- a/composer.json +++ b/composer.json @@ -8,6 +8,7 @@ "evenement/evenement": "^3.0 || ^2.1 || ^1.1", "react/event-loop": "^1.0 || ^0.5 || ^0.4", "react/promise": "^2.7", + "react/promise-stream": "^1.1", "react/promise-timer": "^1.5", "react/socket": "^1.1" }, diff --git a/examples/01-query.php b/examples/01-query.php index 93a6b1d..0ae6a16 100644 --- a/examples/01-query.php +++ b/examples/01-query.php @@ -1,6 +1,5 @@ createConnection($uri)->then(function (ConnectionInterface $connection) use ($query) { - $connection->query($query)->then(function (QueryResult $command) { - if (isset($command->resultRows)) { - // this is a response to a SELECT etc. with some rows (0+) - print_r($command->resultFields); - print_r($command->resultRows); - echo count($command->resultRows) . ' row(s) in set' . PHP_EOL; - } else { - // this is an OK message in response to an UPDATE etc. - if ($command->insertId !== 0) { - var_dump('last insert ID', $command->insertId); - } - echo 'Query OK, ' . $command->affectedRows . ' row(s) affected' . PHP_EOL; +//create a lazy mysql connection for executing query +$connection = $factory->createLazyConnection($uri); + +$connection->query($query)->then(function (QueryResult $command) { + if (isset($command->resultRows)) { + // this is a response to a SELECT etc. with some rows (0+) + print_r($command->resultFields); + print_r($command->resultRows); + echo count($command->resultRows) . ' row(s) in set' . PHP_EOL; + } else { + // this is an OK message in response to an UPDATE etc. + if ($command->insertId !== 0) { + var_dump('last insert ID', $command->insertId); } - }, function (Exception $error) { - // the query was not executed successfully - echo 'Error: ' . $error->getMessage() . PHP_EOL; - }); + echo 'Query OK, ' . $command->affectedRows . ' row(s) affected' . PHP_EOL; + } +}, function (Exception $error) { + // the query was not executed successfully + echo 'Error: ' . $error->getMessage() . PHP_EOL; +}); - $connection->quit(); -}, 'printf'); +$connection->quit(); $loop->run(); diff --git a/examples/02-query-stream.php b/examples/02-query-stream.php index 25a4904..dfc2d9f 100644 --- a/examples/02-query-stream.php +++ b/examples/02-query-stream.php @@ -2,7 +2,6 @@ // $ php examples/02-query-stream.php "SHOW VARIABLES" -use React\MySQL\ConnectionInterface; use React\MySQL\Factory; require __DIR__ . '/../vendor/autoload.php'; @@ -13,23 +12,23 @@ $uri = 'test:test@localhost/test'; $query = isset($argv[1]) ? $argv[1] : 'select * from book'; -//create a mysql connection for executing query -$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($query) { - $stream = $connection->queryStream($query); +//create a lazy mysql connection for executing query +$connection = $factory->createLazyConnection($uri); - $stream->on('data', function ($row) { - echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL; - }); +$stream = $connection->queryStream($query); - $stream->on('error', function (Exception $e) { - echo 'Error: ' . $e->getMessage() . PHP_EOL; - }); +$stream->on('data', function ($row) { + echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL; +}); - $stream->on('close', function () { - echo 'CLOSED' . PHP_EOL; - }); +$stream->on('error', function (Exception $e) { + echo 'Error: ' . $e->getMessage() . PHP_EOL; +}); - $connection->quit(); -}, 'printf'); +$stream->on('close', function () { + echo 'CLOSED' . PHP_EOL; +}); + +$connection->quit(); $loop->run(); diff --git a/src/Factory.php b/src/Factory.php index e0af03f..2777b88 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -13,6 +13,7 @@ use React\Socket\Connector; use React\Socket\ConnectorInterface; use React\Socket\ConnectionInterface; +use React\MySQL\Io\LazyConnection; class Factory { @@ -194,4 +195,74 @@ public function createConnection($uri) throw $e; }); } + + /** + * Creates a new connection. + * + * It helps with establishing a TCP/IP connection to your MySQL database + * and issuing the initial authentication handshake. + * + * ```php + * $connection = $factory->createLazyConnection($url); + * + * $connection->query(…); + * ``` + * + * This method immediately returns a "virtual" connection implementing the + * [`ConnectionInterface`](#connectioninterface) that can be used to + * interface with your MySQL database. Internally, it lazily creates the + * underlying database connection (which may take some time) and will + * queue all outstanding requests until the underlying connection is ready. + * + * From a consumer side this means that you can start sending queries to the + * database right away while the connection may still be pending. It will + * ensure that all commands will be executed in the order they are enqueued + * once the connection is ready. If the database connection fails, it will + * emit an `error` event, reject all outstanding commands and `close` the + * connection as described in the `ConnectionInterface`. In other words, it + * behaves just like a real connection and frees you from having to deal + * with its async resolution. + * + * Depending on your particular use case, you may prefer this method or the + * underlying `createConnection()` which resolves with a promise. For many + * simple use cases it may be easier to create a lazy connection. + * + * The `$url` parameter must contain the database host, optional + * authentication, port and database to connect to: + * + * ```php + * $factory->createLazyConnection('user:secret@localhost:3306/database'); + * ``` + * + * You can omit the port if you're connecting to default port `3306`: + * + * ```php + * $factory->createLazyConnection('user:secret@localhost/database'); + * ``` + * + * If you do not include authentication and/or database, then this method + * will default to trying to connect as user `root` with an empty password + * and no database selected. This may be useful when initially setting up a + * database, but likely to yield an authentication error in a production system: + * + * ```php + * $factory->createLazyConnection('localhost'); + * ``` + * + * This method respects PHP's `default_socket_timeout` setting (default 60s) + * as a timeout for establishing the connection and waiting for successful + * authentication. You can explicitly pass a custom timeout value in seconds + * (or use a negative number to not apply a timeout) like this: + * + * ```php + * $factory->createLazyConnection('localhost?timeout=0.5'); + * ``` + * + * @param string $uri + * @return ConnectionInterface + */ + public function createLazyConnection($uri) + { + return new LazyConnection($this->createConnection($uri)); + } } diff --git a/src/Io/LazyConnection.php b/src/Io/LazyConnection.php new file mode 100644 index 0000000..b760383 --- /dev/null +++ b/src/Io/LazyConnection.php @@ -0,0 +1,108 @@ +connecting = $connecting; + + $connecting->then(function (ConnectionInterface $connection) { + // connection completed => forward error and close events + $connection->on('error', function ($e) { + $this->emit('error', [$e]); + }); + $connection->on('close', function () { + $this->close(); + }); + }, function (\Exception $e) { + // connection failed => emit error if connection is not already closed + if ($this->closed) { + return; + } + + $this->emit('error', [$e]); + $this->close(); + }); + } + + public function query($sql, array $params = []) + { + if ($this->connecting === null) { + return \React\Promise\reject(new Exception('Connection closed')); + } + + return $this->connecting->then(function (ConnectionInterface $connection) use ($sql, $params) { + return $connection->query($sql, $params); + }); + } + + public function queryStream($sql, $params = []) + { + if ($this->connecting === null) { + throw new Exception('Connection closed'); + } + + return \React\Promise\Stream\unwrapReadable( + $this->connecting->then(function (ConnectionInterface $connection) use ($sql, $params) { + return $connection->queryStream($sql, $params); + }) + ); + } + + public function ping() + { + if ($this->connecting === null) { + return \React\Promise\reject(new Exception('Connection closed')); + } + + return $this->connecting->then(function (ConnectionInterface $connection) { + return $connection->ping(); + }); + } + + public function quit() + { + if ($this->connecting === null) { + return \React\Promise\reject(new Exception('Connection closed')); + } + + return $this->connecting->then(function (ConnectionInterface $connection) { + return $connection->quit(); + }); + } + + public function close() + { + if ($this->closed) { + return; + } + + $this->closed = true; + + // either close active connection or cancel pending connection attempt + $this->connecting->then(function (ConnectionInterface $connection) { + $connection->close(); + }); + $this->connecting->cancel(); + + $this->connecting = null; + + $this->emit('close'); + $this->removeAllListeners(); + } +} diff --git a/tests/FactoryTest.php b/tests/FactoryTest.php index 585999e..7f27568 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryTest.php @@ -352,4 +352,60 @@ public function testCancelConnectDuringAuthenticationWillCloseConnection() return ($e->getMessage() === 'Connection to database server cancelled'); }))); } + + public function testConnectLazyWithValidAuthWillRunUntilQuit() + { + $this->expectOutputString('closed.'); + + $loop = \React\EventLoop\Factory::create(); + $factory = new Factory($loop); + + $uri = $this->getConnectionString(); + $connection = $factory->createLazyConnection($uri); + + $connection->quit()->then(function () { + echo 'closed.'; + }); + + $loop->run(); + } + + public function testConnectLazyWithInvalidAuthWillEmitErrorAndClose() + { + $loop = \React\EventLoop\Factory::create(); + $factory = new Factory($loop); + + $uri = $this->getConnectionString(array('passwd' => 'invalidpass')); + $connection = $factory->createLazyConnection($uri); + + $connection->on('error', $this->expectCallableOnce()); + $connection->on('close', $this->expectCallableOnce()); + + $loop->run(); + } + + public function testConnectLazyWithValidAuthWillPingBeforeQuitButNotAfter() + { + $this->expectOutputString('ping.closed.'); + + $loop = \React\EventLoop\Factory::create(); + $factory = new Factory($loop); + + $uri = $this->getConnectionString(); + $connection = $factory->createLazyConnection($uri); + + $connection->ping()->then(function () { + echo 'ping.'; + }); + + $connection->quit()->then(function () { + echo 'closed.'; + }); + + $connection->ping()->then(function () { + echo 'never reached'; + }); + + $loop->run(); + } } diff --git a/tests/Io/LazyConnectionTest.php b/tests/Io/LazyConnectionTest.php new file mode 100644 index 0000000..b9f5e14 --- /dev/null +++ b/tests/Io/LazyConnectionTest.php @@ -0,0 +1,262 @@ +promise()); + + $connection->on('error', $this->expectCallableOnce()); + $connection->on('close', $this->expectCallableOnce()); + + $deferred->reject(new \RuntimeException()); + } + + public function testConnectionWillBeClosedWithoutErrorWhenUnderlyingConnectionCloses() + { + $promise = new Promise(function () { }); + $base = new LazyConnection($promise); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $connection->on('error', $this->expectCallableNever()); + $connection->on('close', $this->expectCallableOnce()); + + $base->close(); + } + + public function testConnectionWillForwardErrorFromUnderlyingConnection() + { + $promise = new Promise(function () { }); + $base = new LazyConnection($promise); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $connection->on('error', $this->expectCallableOnce()); + $connection->on('close', $this->expectCallableNever()); + + $base->emit('error', [new \RuntimeException()]); + } + + public function testQueryReturnsPendingPromiseWhenConnectionIsPending() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $ret = $connection->query('SELECT 1'); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQueryWillQueryUnderlyingConnectionWhenResolved() + { + $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); + $base->expects($this->once())->method('query')->with('SELECT 1'); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $connection->query('SELECT 1'); + } + + public function testQueryWillRejectWhenUnderlyingConnectionRejects() + { + $deferred = new Deferred(); + $connection = new LazyConnection($deferred->promise()); + + $ret = $connection->query('SELECT 1'); + $ret->then($this->expectCallableNever(), $this->expectCallableOnce()); + + $deferred->reject(new \RuntimeException()); + } + + public function testQueryStreamReturnsReadableStreamWhenConnectionIsPending() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $ret = $connection->queryStream('SELECT 1'); + + $this->assertTrue($ret instanceof ReadableStreamInterface); + $this->assertTrue($ret->isReadable()); + } + + public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWhenResolved() + { + $stream = new ThroughStream(); + $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); + $base->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn($stream); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $ret = $connection->queryStream('SELECT 1'); + + $ret->on('data', $this->expectCallableOnceWith('hello')); + $stream->write('hello'); + + $ret->on('close', $this->expectCallableOnce()); + $stream->close(); + + $this->assertFalse($ret->isReadable()); + } + + public function testQueryStreamWillCloseStreamWithErrorWhenUnderlyingConnectionRejects() + { + $deferred = new Deferred(); + $connection = new LazyConnection($deferred->promise()); + + $ret = $connection->queryStream('SELECT 1'); + + $ret->on('error', $this->expectCallableOnce()); + $ret->on('close', $this->expectCallableOnce()); + + $deferred->reject(new \RuntimeException()); + + $this->assertFalse($ret->isReadable()); + } + + public function testPingReturnsPendingPromiseWhenConnectionIsPending() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $ret = $connection->ping(); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testPingWillPingUnderlyingConnectionWhenResolved() + { + $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); + $base->expects($this->once())->method('ping'); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $connection->ping(); + } + + public function testQuitReturnsPendingPromiseWhenConnectionIsPending() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $ret = $connection->quit(); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableNever(), $this->expectCallableNever()); + } + + public function testQuitWillQuitUnderlyingConnectionWhenResolved() + { + $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); + $base->expects($this->once())->method('quit'); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $connection->quit(); + } + + public function testCloseCancelsPendingConnection() + { + $promise = new Promise(function () { }, $this->expectCallableOnce()); + $connection = new LazyConnection($promise); + + $connection->close(); + } + + public function testCloseTwiceWillCloseUnderlyingConnectionWhenResolved() + { + $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); + $base->expects($this->once())->method('close'); + + $connection = new LazyConnection(\React\Promise\resolve($base)); + + $connection->close(); + $connection->close(); + } + + public function testCloseDoesNotEmitConnectionErrorFromAbortedConnection() + { + $promise = new Promise(function () { }, function () { + throw new \RuntimeException(); + }); + $connection = new LazyConnection($promise); + + $connection->on('error', $this->expectCallableNever()); + $connection->on('close', $this->expectCallableOnce()); + + $connection->close(); + } + + public function testCloseTwiceEmitsCloseEventOnceWhenConnectionIsPending() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $connection->on('error', $this->expectCallableNever()); + $connection->on('close', $this->expectCallableOnce()); + + $connection->close(); + $connection->close(); + } + + public function testQueryReturnsRejectedPromiseAfterConnectionIsClosed() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $connection->close(); + $ret = $connection->query('SELECT 1'); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableNever(), $this->expectCallableOnce()); + } + + /** + * @expectedException React\MySQL\Exception + */ + public function testQueryStreamThrowsAfterConnectionIsClosed() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $connection->close(); + $connection->queryStream('SELECT 1'); + } + + public function testPingReturnsRejectedPromiseAfterConnectionIsClosed() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $connection->close(); + $ret = $connection->ping(); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableNever(), $this->expectCallableOnce()); + } + + public function testQuitReturnsRejectedPromiseAfterConnectionIsClosed() + { + $promise = new Promise(function () { }); + $connection = new LazyConnection($promise); + + $connection->close(); + $ret = $connection->quit(); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableNever(), $this->expectCallableOnce()); + } +} diff --git a/tests/ResultQueryTest.php b/tests/ResultQueryTest.php index 719e171..bf46634 100644 --- a/tests/ResultQueryTest.php +++ b/tests/ResultQueryTest.php @@ -4,6 +4,7 @@ use React\MySQL\Io\Constants; use React\MySQL\QueryResult; +use React\MySQL\Factory; class ResultQueryTest extends BaseTestCase { @@ -507,4 +508,42 @@ public function testQueryStreamExplicitCloseEmitsCloseEventWithoutData() $loop->run(); } + + public function testQueryStreamFromLazyConnectionEmitsSingleRow() + { + $loop = \React\EventLoop\Factory::create(); + $factory = new Factory($loop); + + $uri = $this->getConnectionString(); + $connection = $factory->createLazyConnection($uri); + + $stream = $connection->queryStream('SELECT 1'); + + $stream->on('data', $this->expectCallableOnceWith([1 => '1'])); + $stream->on('end', $this->expectCallableOnce()); + $stream->on('close', $this->expectCallableOnce()); + + $connection->quit(); + + $loop->run(); + } + + public function testQueryStreamFromLazyConnectionWillErrorWhenConnectionIsClosed() + { + $loop = \React\EventLoop\Factory::create(); + $factory = new Factory($loop); + + $uri = $this->getConnectionString(); + $connection = $factory->createLazyConnection($uri); + + $stream = $connection->queryStream('SELECT 1'); + + $stream->on('data', $this->expectCallableNever()); + $stream->on('error', $this->expectCallableOnce()); + $stream->on('close', $this->expectCallableOnce()); + + $connection->close(); + + $loop->run(); + } } From 83ac29b4bf8f4c0b074619268ff5482d76c64e73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sun, 14 Oct 2018 13:29:08 +0200 Subject: [PATCH 2/2] Create lazy connection only on demand (on first command) --- README.md | 29 +++--- src/Factory.php | 31 ++++--- src/Io/LazyConnection.php | 79 ++++++++++------ tests/FactoryTest.php | 23 ++++- tests/Io/LazyConnectionTest.php | 157 ++++++++++++++++++++++++-------- 5 files changed, 226 insertions(+), 93 deletions(-) diff --git a/README.md b/README.md index 1ef1250..d06dc06 100644 --- a/README.md +++ b/README.md @@ -171,18 +171,25 @@ $connection->query(…); This method immediately returns a "virtual" connection implementing the [`ConnectionInterface`](#connectioninterface) that can be used to interface with your MySQL database. Internally, it lazily creates the -underlying database connection (which may take some time) and will -queue all outstanding requests until the underlying connection is ready. +underlying database connection (which may take some time) only once the +first request is invoked on this instance and will queue all outstanding +requests until the underlying connection is ready. From a consumer side this means that you can start sending queries to the -database right away while the connection may still be pending. It will -ensure that all commands will be executed in the order they are enqueued -once the connection is ready. If the database connection fails, it will -emit an `error` event, reject all outstanding commands and `close` the -connection as described in the `ConnectionInterface`. In other words, it -behaves just like a real connection and frees you from having to deal +database right away while the actual connection may still be outstanding. +It will ensure that all commands will be executed in the order they are +enqueued once the connection is ready. If the database connection fails, +it will emit an `error` event, reject all outstanding commands and `close` +the connection as described in the `ConnectionInterface`. In other words, +it behaves just like a real connection and frees you from having to deal with its async resolution. +Note that creating the underlying connection will be deferred until the +first request is invoked. Accordingly, any eventual connection issues +will be detected once this instance is first used. Similarly, calling +`quit()` on this instance before invoking any requests will succeed +immediately and will not wait for an actual underlying connection. + Depending on your particular use case, you may prefer this method or the underlying `createConnection()` which resolves with a promise. For many simple use cases it may be easier to create a lazy connection. @@ -210,9 +217,9 @@ $factory->createLazyConnection('localhost'); ``` This method respects PHP's `default_socket_timeout` setting (default 60s) -as a timeout for establishing the connection and waiting for successful -authentication. You can explicitly pass a custom timeout value in seconds -(or use a negative number to not apply a timeout) like this: +as a timeout for establishing the underlying connection and waiting for +successful authentication. You can explicitly pass a custom timeout value +in seconds (or use a negative number to not apply a timeout) like this: ```php $factory->createLazyConnection('localhost?timeout=0.5'); diff --git a/src/Factory.php b/src/Factory.php index 2777b88..263358a 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -211,18 +211,25 @@ public function createConnection($uri) * This method immediately returns a "virtual" connection implementing the * [`ConnectionInterface`](#connectioninterface) that can be used to * interface with your MySQL database. Internally, it lazily creates the - * underlying database connection (which may take some time) and will - * queue all outstanding requests until the underlying connection is ready. + * underlying database connection (which may take some time) only once the + * first request is invoked on this instance and will queue all outstanding + * requests until the underlying connection is ready. * * From a consumer side this means that you can start sending queries to the - * database right away while the connection may still be pending. It will - * ensure that all commands will be executed in the order they are enqueued - * once the connection is ready. If the database connection fails, it will - * emit an `error` event, reject all outstanding commands and `close` the - * connection as described in the `ConnectionInterface`. In other words, it - * behaves just like a real connection and frees you from having to deal + * database right away while the actual connection may still be outstanding. + * It will ensure that all commands will be executed in the order they are + * enqueued once the connection is ready. If the database connection fails, + * it will emit an `error` event, reject all outstanding commands and `close` + * the connection as described in the `ConnectionInterface`. In other words, + * it behaves just like a real connection and frees you from having to deal * with its async resolution. * + * Note that creating the underlying connection will be deferred until the + * first request is invoked. Accordingly, any eventual connection issues + * will be detected once this instance is first used. Similarly, calling + * `quit()` on this instance before invoking any requests will succeed + * immediately and will not wait for an actual underlying connection. + * * Depending on your particular use case, you may prefer this method or the * underlying `createConnection()` which resolves with a promise. For many * simple use cases it may be easier to create a lazy connection. @@ -250,9 +257,9 @@ public function createConnection($uri) * ``` * * This method respects PHP's `default_socket_timeout` setting (default 60s) - * as a timeout for establishing the connection and waiting for successful - * authentication. You can explicitly pass a custom timeout value in seconds - * (or use a negative number to not apply a timeout) like this: + * as a timeout for establishing the underlying connection and waiting for + * successful authentication. You can explicitly pass a custom timeout value + * in seconds (or use a negative number to not apply a timeout) like this: * * ```php * $factory->createLazyConnection('localhost?timeout=0.5'); @@ -263,6 +270,6 @@ public function createConnection($uri) */ public function createLazyConnection($uri) { - return new LazyConnection($this->createConnection($uri)); + return new LazyConnection($this, $uri); } } diff --git a/src/Io/LazyConnection.php b/src/Io/LazyConnection.php index b760383..ad55e87 100644 --- a/src/Io/LazyConnection.php +++ b/src/Io/LazyConnection.php @@ -4,8 +4,8 @@ use React\MySQL\ConnectionInterface; use Evenement\EventEmitter; -use React\Promise\PromiseInterface; use React\MySQL\Exception; +use React\MySQL\Factory; /** * @internal @@ -13,52 +13,64 @@ */ class LazyConnection extends EventEmitter implements ConnectionInterface { + private $factory; + private $uri; private $connecting; private $closed = false; private $busy = false; - public function __construct(PromiseInterface $connecting) + public function __construct(Factory $factory, $uri) { - $this->connecting = $connecting; + $this->factory = $factory; + $this->uri = $uri; + } + + private function connecting() + { + if ($this->connecting === null) { + $this->connecting = $this->factory->createConnection($this->uri); + + $this->connecting->then(function (ConnectionInterface $connection) { + // connection completed => forward error and close events + $connection->on('error', function ($e) { + $this->emit('error', [$e]); + }); + $connection->on('close', function () { + $this->close(); + }); + }, function (\Exception $e) { + // connection failed => emit error if connection is not already closed + if ($this->closed) { + return; + } - $connecting->then(function (ConnectionInterface $connection) { - // connection completed => forward error and close events - $connection->on('error', function ($e) { $this->emit('error', [$e]); - }); - $connection->on('close', function () { $this->close(); }); - }, function (\Exception $e) { - // connection failed => emit error if connection is not already closed - if ($this->closed) { - return; - } + } - $this->emit('error', [$e]); - $this->close(); - }); + return $this->connecting; } public function query($sql, array $params = []) { - if ($this->connecting === null) { + if ($this->closed) { return \React\Promise\reject(new Exception('Connection closed')); } - return $this->connecting->then(function (ConnectionInterface $connection) use ($sql, $params) { + return $this->connecting()->then(function (ConnectionInterface $connection) use ($sql, $params) { return $connection->query($sql, $params); }); } public function queryStream($sql, $params = []) { - if ($this->connecting === null) { + if ($this->closed) { throw new Exception('Connection closed'); } return \React\Promise\Stream\unwrapReadable( - $this->connecting->then(function (ConnectionInterface $connection) use ($sql, $params) { + $this->connecting()->then(function (ConnectionInterface $connection) use ($sql, $params) { return $connection->queryStream($sql, $params); }) ); @@ -66,22 +78,28 @@ public function queryStream($sql, $params = []) public function ping() { - if ($this->connecting === null) { + if ($this->closed) { return \React\Promise\reject(new Exception('Connection closed')); } - return $this->connecting->then(function (ConnectionInterface $connection) { + return $this->connecting()->then(function (ConnectionInterface $connection) { return $connection->ping(); }); } public function quit() { - if ($this->connecting === null) { + if ($this->closed) { return \React\Promise\reject(new Exception('Connection closed')); } - return $this->connecting->then(function (ConnectionInterface $connection) { + // not already connecting => no need to connect, simply close virtual connection + if ($this->connecting === null) { + $this->close(); + return \React\Promise\resolve(); + } + + return $this->connecting()->then(function (ConnectionInterface $connection) { return $connection->quit(); }); } @@ -95,12 +113,13 @@ public function close() $this->closed = true; // either close active connection or cancel pending connection attempt - $this->connecting->then(function (ConnectionInterface $connection) { - $connection->close(); - }); - $this->connecting->cancel(); - - $this->connecting = null; + if ($this->connecting !== null) { + $this->connecting->then(function (ConnectionInterface $connection) { + $connection->close(); + }); + $this->connecting->cancel(); + $this->connecting = null; + } $this->emit('close'); $this->removeAllListeners(); diff --git a/tests/FactoryTest.php b/tests/FactoryTest.php index 7f27568..9f189e7 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryTest.php @@ -353,7 +353,22 @@ public function testCancelConnectDuringAuthenticationWillCloseConnection() }))); } - public function testConnectLazyWithValidAuthWillRunUntilQuit() + public function testConnectLazyWithAnyAuthWillQuitWithoutRunning() + { + $this->expectOutputString('closed.'); + + $loop = \React\EventLoop\Factory::create(); + $factory = new Factory($loop); + + $uri = 'mysql://random:pass@host'; + $connection = $factory->createLazyConnection($uri); + + $connection->quit()->then(function () { + echo 'closed.'; + }); + } + + public function testConnectLazyWithValidAuthWillRunUntilQuitAfterPing() { $this->expectOutputString('closed.'); @@ -363,6 +378,8 @@ public function testConnectLazyWithValidAuthWillRunUntilQuit() $uri = $this->getConnectionString(); $connection = $factory->createLazyConnection($uri); + $connection->ping(); + $connection->quit()->then(function () { echo 'closed.'; }); @@ -370,7 +387,7 @@ public function testConnectLazyWithValidAuthWillRunUntilQuit() $loop->run(); } - public function testConnectLazyWithInvalidAuthWillEmitErrorAndClose() + public function testConnectLazyWithInvalidAuthWillEmitErrorAndCloseAfterPing() { $loop = \React\EventLoop\Factory::create(); $factory = new Factory($loop); @@ -381,6 +398,8 @@ public function testConnectLazyWithInvalidAuthWillEmitErrorAndClose() $connection->on('error', $this->expectCallableOnce()); $connection->on('close', $this->expectCallableOnce()); + $connection->ping(); + $loop->run(); } diff --git a/tests/Io/LazyConnectionTest.php b/tests/Io/LazyConnectionTest.php index b9f5e14..d770e41 100644 --- a/tests/Io/LazyConnectionTest.php +++ b/tests/Io/LazyConnectionTest.php @@ -12,47 +12,64 @@ class LazyConnectionTest extends BaseTestCase { - public function testConnectionWillBeClosedWithErrorWhenPendingConnectionFails() + public function testPingWillCloseConnectionWithErrorWhenPendingConnectionFails() { $deferred = new Deferred(); - $connection = new LazyConnection($deferred->promise()); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $connection = new LazyConnection($factory, ''); $connection->on('error', $this->expectCallableOnce()); $connection->on('close', $this->expectCallableOnce()); + $connection->ping(); + $deferred->reject(new \RuntimeException()); } - public function testConnectionWillBeClosedWithoutErrorWhenUnderlyingConnectionCloses() + public function testPingWillCloseConnectionWithoutErrorWhenUnderlyingConnectionCloses() { $promise = new Promise(function () { }); - $base = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $base = new LazyConnection($factory, ''); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); $connection->on('error', $this->expectCallableNever()); $connection->on('close', $this->expectCallableOnce()); + $connection->ping(); $base->close(); } - public function testConnectionWillForwardErrorFromUnderlyingConnection() + public function testPingWillForwardErrorFromUnderlyingConnection() { $promise = new Promise(function () { }); - $base = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $base = new LazyConnection($factory, ''); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); $connection->on('error', $this->expectCallableOnce()); $connection->on('close', $this->expectCallableNever()); + $connection->ping(); + $base->emit('error', [new \RuntimeException()]); } public function testQueryReturnsPendingPromiseWhenConnectionIsPending() { - $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $deferred = new Deferred(); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $connection = new LazyConnection($factory, ''); $ret = $connection->query('SELECT 1'); @@ -65,7 +82,9 @@ public function testQueryWillQueryUnderlyingConnectionWhenResolved() $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); $base->expects($this->once())->method('query')->with('SELECT 1'); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); $connection->query('SELECT 1'); } @@ -73,7 +92,9 @@ public function testQueryWillQueryUnderlyingConnectionWhenResolved() public function testQueryWillRejectWhenUnderlyingConnectionRejects() { $deferred = new Deferred(); - $connection = new LazyConnection($deferred->promise()); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $connection = new LazyConnection($factory, ''); $ret = $connection->query('SELECT 1'); $ret->then($this->expectCallableNever(), $this->expectCallableOnce()); @@ -84,7 +105,9 @@ public function testQueryWillRejectWhenUnderlyingConnectionRejects() public function testQueryStreamReturnsReadableStreamWhenConnectionIsPending() { $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $connection = new LazyConnection($factory, ''); $ret = $connection->queryStream('SELECT 1'); @@ -98,7 +121,9 @@ public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWhenResol $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); $base->expects($this->once())->method('queryStream')->with('SELECT 1')->willReturn($stream); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); $ret = $connection->queryStream('SELECT 1'); @@ -114,7 +139,9 @@ public function testQueryStreamWillReturnStreamFromUnderlyingConnectionWhenResol public function testQueryStreamWillCloseStreamWithErrorWhenUnderlyingConnectionRejects() { $deferred = new Deferred(); - $connection = new LazyConnection($deferred->promise()); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $connection = new LazyConnection($factory, ''); $ret = $connection->queryStream('SELECT 1'); @@ -129,7 +156,9 @@ public function testQueryStreamWillCloseStreamWithErrorWhenUnderlyingConnectionR public function testPingReturnsPendingPromiseWhenConnectionIsPending() { $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $connection = new LazyConnection($factory, ''); $ret = $connection->ping(); @@ -142,80 +171,129 @@ public function testPingWillPingUnderlyingConnectionWhenResolved() $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); $base->expects($this->once())->method('ping'); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); $connection->ping(); } - public function testQuitReturnsPendingPromiseWhenConnectionIsPending() + public function testQuitResolvesAndEmitsCloseImmediatelyWhenConnectionIsNotAlreadyPending() + { + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->never())->method('createConnection'); + $connection = new LazyConnection($factory, ''); + + $connection->on('error', $this->expectCallableNever()); + $connection->on('close', $this->expectCallableOnce()); + + $ret = $connection->quit(); + + $this->assertTrue($ret instanceof PromiseInterface); + $ret->then($this->expectCallableOnce(), $this->expectCallableNever()); + } + + public function testQuitAfterPingReturnsPendingPromiseWhenConnectionIsPending() { $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $connection = new LazyConnection($factory, ''); + $connection->ping(); $ret = $connection->quit(); $this->assertTrue($ret instanceof PromiseInterface); $ret->then($this->expectCallableNever(), $this->expectCallableNever()); } - public function testQuitWillQuitUnderlyingConnectionWhenResolved() + public function testQuitAfterPingWillQuitUnderlyingConnectionWhenResolved() { $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); $base->expects($this->once())->method('quit'); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); + $connection->ping(); $connection->quit(); } - public function testCloseCancelsPendingConnection() + public function testCloseEmitsCloseImmediatelyWhenConnectionIsNotAlreadyPending() + { + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->never())->method('createConnection'); + $connection = new LazyConnection($factory, ''); + + $connection->on('error', $this->expectCallableNever()); + $connection->on('close', $this->expectCallableOnce()); + + $connection->close(); + } + + public function testCloseAfterPingCancelsPendingConnection() { - $promise = new Promise(function () { }, $this->expectCallableOnce()); - $connection = new LazyConnection($promise); + $deferred = new Deferred($this->expectCallableOnce()); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($deferred->promise()); + $connection = new LazyConnection($factory, ''); + $connection->ping(); $connection->close(); } - public function testCloseTwiceWillCloseUnderlyingConnectionWhenResolved() + public function testCloseTwiceAfterPingWillCloseUnderlyingConnectionWhenResolved() { $base = $this->getMockBuilder('React\MySQL\ConnectionInterface')->getMock(); $base->expects($this->once())->method('close'); - $connection = new LazyConnection(\React\Promise\resolve($base)); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn(\React\Promise\resolve($base)); + $connection = new LazyConnection($factory, ''); + $connection->ping(); $connection->close(); $connection->close(); } - public function testCloseDoesNotEmitConnectionErrorFromAbortedConnection() + public function testCloseAfterPingDoesNotEmitConnectionErrorFromAbortedConnection() { $promise = new Promise(function () { }, function () { throw new \RuntimeException(); }); - $connection = new LazyConnection($promise); + + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $connection = new LazyConnection($factory, ''); $connection->on('error', $this->expectCallableNever()); $connection->on('close', $this->expectCallableOnce()); + $connection->ping(); $connection->close(); } - public function testCloseTwiceEmitsCloseEventOnceWhenConnectionIsPending() + public function testCloseTwiceAfterPingEmitsCloseEventOnceWhenConnectionIsPending() { $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->once())->method('createConnection')->willReturn($promise); + $connection = new LazyConnection($factory, ''); $connection->on('error', $this->expectCallableNever()); $connection->on('close', $this->expectCallableOnce()); + $connection->ping(); $connection->close(); $connection->close(); } public function testQueryReturnsRejectedPromiseAfterConnectionIsClosed() { - $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->never())->method('createConnection'); + $connection = new LazyConnection($factory, ''); $connection->close(); $ret = $connection->query('SELECT 1'); @@ -229,8 +307,9 @@ public function testQueryReturnsRejectedPromiseAfterConnectionIsClosed() */ public function testQueryStreamThrowsAfterConnectionIsClosed() { - $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->never())->method('createConnection'); + $connection = new LazyConnection($factory, ''); $connection->close(); $connection->queryStream('SELECT 1'); @@ -238,8 +317,9 @@ public function testQueryStreamThrowsAfterConnectionIsClosed() public function testPingReturnsRejectedPromiseAfterConnectionIsClosed() { - $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->never())->method('createConnection'); + $connection = new LazyConnection($factory, ''); $connection->close(); $ret = $connection->ping(); @@ -250,8 +330,9 @@ public function testPingReturnsRejectedPromiseAfterConnectionIsClosed() public function testQuitReturnsRejectedPromiseAfterConnectionIsClosed() { - $promise = new Promise(function () { }); - $connection = new LazyConnection($promise); + $factory = $this->getMockBuilder('React\MySQL\Factory')->disableOriginalConstructor()->getMock(); + $factory->expects($this->never())->method('createConnection'); + $connection = new LazyConnection($factory, ''); $connection->close(); $ret = $connection->quit();