diff --git a/README.md b/README.md index bda5d9b..46a0ee3 100644 --- a/README.md +++ b/README.md @@ -33,9 +33,7 @@ This example runs a simple `SELECT` query and dumps all the records from a `book ```php $factory = new React\MySQL\Factory(); - -$uri = 'test:test@localhost/test'; -$connection = $factory->createLazyConnection($uri); +$connection = $factory->createLazyConnection('user:pass@localhost/bookstore'); $connection->query('SELECT * FROM book')->then( function (QueryResult $command) { diff --git a/examples/01-query.php b/examples/01-query.php index 1ce8ea6..269b066 100644 --- a/examples/01-query.php +++ b/examples/01-query.php @@ -1,18 +1,17 @@ createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test'); -$uri = 'test:test@localhost/test'; $query = isset($argv[1]) ? $argv[1] : 'select * from book'; - -//create a lazy mysql connection for executing query -$connection = $factory->createLazyConnection($uri); - $connection->query($query)->then(function (QueryResult $command) { if (isset($command->resultRows)) { // this is a response to a SELECT etc. with some rows (0+) diff --git a/examples/02-query-stream.php b/examples/02-query-stream.php index 101175e..1bc3744 100644 --- a/examples/02-query-stream.php +++ b/examples/02-query-stream.php @@ -1,19 +1,16 @@ createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test'); -$uri = 'test:test@localhost/test'; $query = isset($argv[1]) ? $argv[1] : 'select * from book'; - -//create a lazy mysql connection for executing query -$connection = $factory->createLazyConnection($uri); - $stream = $connection->queryStream($query); $stream->on('data', function ($row) { diff --git a/examples/11-interactive.php b/examples/11-interactive.php index 459b780..10ee6ea 100644 --- a/examples/11-interactive.php +++ b/examples/11-interactive.php @@ -1,5 +1,8 @@ then(function (SocketConnectionInterface $connection) { @@ -198,7 +205,7 @@ public function createConnection($uri) $connecting->cancel(); }); - $connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred) { + $connecting->then(function (SocketConnectionInterface $stream) use ($authCommand, $deferred, $uri) { $executor = new Executor(); $parser = new Parser($stream, $executor); @@ -209,12 +216,27 @@ public function createConnection($uri) $command->on('success', function () use ($deferred, $connection) { $deferred->resolve($connection); }); - $command->on('error', function ($error) use ($deferred, $stream) { - $deferred->reject($error); + $command->on('error', function (\Exception $error) use ($deferred, $stream, $uri) { + $const = ''; + $errno = $error->getCode(); + if ($error instanceof Exception) { + $const = ' (EACCES)'; + $errno = \defined('SOCKET_EACCES') ? \SOCKET_EACCES : 13; + } + + $deferred->reject(new \RuntimeException( + 'Connection to ' . $uri . ' failed during authentication: ' . $error->getMessage() . $const, + $errno, + $error + )); $stream->close(); }); - }, function ($error) use ($deferred) { - $deferred->reject(new \RuntimeException('Unable to connect to database server', 0, $error)); + }, function (\Exception $error) use ($deferred, $uri) { + $deferred->reject(new \RuntimeException( + 'Connection to ' . $uri . ' failed: ' . $error->getMessage(), + $error->getCode(), + $error + )); }); // use timeout from explicit ?timeout=x parameter or default to PHP's default_socket_timeout (60) @@ -223,10 +245,11 @@ public function createConnection($uri) return $deferred->promise(); } - return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) { + return \React\Promise\Timer\timeout($deferred->promise(), $timeout, $this->loop)->then(null, function ($e) use ($uri) { if ($e instanceof TimeoutException) { throw new \RuntimeException( - 'Connection to database server timed out after ' . $e->getTimeout() . ' seconds' + 'Connection to ' . $uri . ' timed out after ' . $e->getTimeout() . ' seconds (ETIMEDOUT)', + \defined('SOCKET_ETIMEDOUT') ? \SOCKET_ETIMEDOUT : 110 ); } throw $e; diff --git a/src/Io/Connection.php b/src/Io/Connection.php index 6b6dd1c..20810d2 100644 --- a/src/Io/Connection.php +++ b/src/Io/Connection.php @@ -157,14 +157,25 @@ public function close() } $this->state = self::STATE_CLOSED; + $remoteClosed = $this->stream->isReadable() === false && $this->stream->isWritable() === false; $this->stream->close(); // reject all pending commands if connection is closed while (!$this->executor->isIdle()) { $command = $this->executor->dequeue(); - $command->emit('error', [ - new \RuntimeException('Connection lost') - ]); + assert($command instanceof CommandInterface); + + if ($remoteClosed) { + $command->emit('error', [new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + \defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104 + )]); + } else { + $command->emit('error', [new \RuntimeException( + 'Connection closing (ECONNABORTED)', + \defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103 + )]); + } } $this->emit('close'); @@ -189,7 +200,10 @@ public function handleConnectionError($err) public function handleConnectionClosed() { if ($this->state < self::STATE_CLOSEING) { - $this->emit('error', [new \RuntimeException('mysql server has gone away'), $this]); + $this->emit('error', [new \RuntimeException( + 'Connection closed by peer (ECONNRESET)', + \defined('SOCKET_ECONNRESET') ? \SOCKET_ECONNRESET : 104 + )]); } $this->close(); @@ -202,10 +216,13 @@ public function handleConnectionClosed() */ protected function _doCommand(CommandInterface $command) { - if ($this->state === self::STATE_AUTHENTICATED) { - return $this->executor->enqueue($command); - } else { - throw new Exception("Can't send command"); + if ($this->state !== self::STATE_AUTHENTICATED) { + throw new \RuntimeException( + 'Connection ' . ($this->state === self::STATE_CLOSED ? 'closed' : 'closing'). ' (ENOTCONN)', + \defined('SOCKET_ENOTCONN') ? \SOCKET_ENOTCONN : 107 + ); } + + return $this->executor->enqueue($command); } } diff --git a/src/Io/Parser.php b/src/Io/Parser.php index 9e9399e..0085a84 100644 --- a/src/Io/Parser.php +++ b/src/Io/Parser.php @@ -307,7 +307,7 @@ protected function onSuccess() if ($command instanceof QueryCommand) { $command->affectedRows = $this->affectedRows; $command->insertId = $this->insertId; - $command->warningCount = $this->warningCount; + $command->warningCount = $this->warningCount; $command->message = $this->message; } $command->emit('success'); @@ -322,9 +322,10 @@ public function onClose() if ($command instanceof QuitCommand) { $command->emit('success'); } else { - $command->emit('error', [ - new \RuntimeException('Connection lost') - ]); + $command->emit('error', [new \RuntimeException( + 'Connection closing (ECONNABORTED)', + \defined('SOCKET_ECONNABORTED') ? \SOCKET_ECONNABORTED : 103 + )]); } } } diff --git a/tests/FactoryTest.php b/tests/FactoryTest.php index 4147725..bee52ee 100644 --- a/tests/FactoryTest.php +++ b/tests/FactoryTest.php @@ -52,7 +52,17 @@ public function testConnectWillRejectWhenGivenInvalidScheme() $promise = $factory->createConnection('foo://127.0.0.1'); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('InvalidArgumentException'))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('InvalidArgumentException'), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getMessage() === 'Invalid MySQL URI given (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); + }) + ) + )); } public function testConnectWillUseGivenHostAndGivenPort() @@ -109,8 +119,17 @@ public function testConnectWithInvalidUriWillRejectWithoutConnecting() $factory = new Factory($loop, $connector); $promise = $factory->createConnection('///'); - $this->assertInstanceof('React\Promise\PromiseInterface', $promise); - $promise->then(null, $this->expectCallableOnce()); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('InvalidArgumentException'), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getMessage() === 'Invalid MySQL URI given (EINVAL)'; + }), + $this->callback(function (\InvalidArgumentException $e) { + return $e->getCode() === (defined('SOCKET_EINVAL') ? SOCKET_EINVAL : 22); + }) + ) + )); } public function testConnectWithInvalidCharsetWillRejectWithoutConnecting() @@ -147,9 +166,15 @@ public function testConnectWithInvalidPassRejectsWithAuthenticationError() $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( - $this->isInstanceOf('Exception'), - $this->callback(function (\Exception $e) { - return !!preg_match("/^Access denied for user '.*?'@'.*?' \(using password: YES\)$/", $e->getMessage()); + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return !!preg_match("/^Connection to mysql:\/\/[^ ]* failed during authentication: Access denied for user '.*?'@'.*?' \(using password: YES\) \(EACCES\)$/", $e->getMessage()); + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_EACCES') ? SOCKET_EACCES : 13); + }), + $this->callback(function (\RuntimeException $e) { + return !!preg_match("/^Access denied for user '.*?'@'.*?' \(using password: YES\)$/", $e->getPrevious()->getMessage()); }) ) )); @@ -171,7 +196,20 @@ public function testConnectWillRejectWhenServerClosesConnection() $uri = $this->getConnectionString(['host' => $parts['host'], 'port' => $parts['port']]); $promise = $factory->createConnection($uri); - $promise->then(null, $this->expectCallableOnce()); + + $uri = preg_replace('/:[^:]*@/', ':***@', $uri); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($uri) { + return $e->getMessage() === 'Connection to mysql://' . $uri . ' failed during authentication: Connection closed by peer (ECONNRESET)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }) + ) + )); Loop::run(); } @@ -180,15 +218,20 @@ public function testConnectWillRejectOnExplicitTimeoutDespiteValidAuth() { $factory = new Factory(); - $uri = $this->getConnectionString() . '?timeout=0'; + $uri = 'mysql://' . $this->getConnectionString() . '?timeout=0'; $promise = $factory->createConnection($uri); + $uri = preg_replace('/:[^:]*@/', ':***@', $uri); + $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( - $this->isInstanceOf('Exception'), - $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to database server timed out after 0 seconds'; + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($uri) { + return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds (ETIMEDOUT)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110); }) ) )); @@ -200,18 +243,23 @@ public function testConnectWillRejectOnDefaultTimeoutFromIniDespiteValidAuth() { $factory = new Factory(); - $uri = $this->getConnectionString(); + $uri = 'mysql://' . $this->getConnectionString(); $old = ini_get('default_socket_timeout'); ini_set('default_socket_timeout', '0'); $promise = $factory->createConnection($uri); ini_set('default_socket_timeout', $old); + $uri = preg_replace('/:[^:]*@/', ':***@', $uri); + $promise->then(null, $this->expectCallableOnceWith( $this->logicalAnd( - $this->isInstanceOf('Exception'), - $this->callback(function (\Exception $e) { - return $e->getMessage() === 'Connection to database server timed out after 0 seconds'; + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($uri) { + return $e->getMessage() === 'Connection to ' . $uri . ' timed out after 0 seconds (ETIMEDOUT)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ETIMEDOUT') ? SOCKET_ETIMEDOUT : 110); }) ) )); @@ -356,7 +404,7 @@ public function testConnectWithValidAuthCanCloseOnlyOnce() public function testConnectWithValidAuthCanCloseAndAbortPing() { - $this->expectOutputString('connected.aborted pending (Connection lost).aborted queued (Connection lost).closed.'); + $this->expectOutputString('connected.aborted pending (Connection closing (ECONNABORTED)).aborted queued (Connection closing (ECONNABORTED)).closed.'); $factory = new Factory(); @@ -382,7 +430,60 @@ public function testConnectWithValidAuthCanCloseAndAbortPing() Loop::run(); } - public function testCancelConnectWillCancelPendingConnection() + public function testlConnectWillRejectWhenUnderlyingConnectorRejects() + { + $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); + $connector = $this->getMockBuilder('React\Socket\ConnectorInterface')->getMock(); + $connector->expects($this->once())->method('connect')->willReturn(\React\Promise\reject(new \RuntimeException('Failed', 123))); + + $factory = new Factory($loop, $connector); + $promise = $factory->createConnection('user:secret@127.0.0.1'); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to mysql://user:***@127.0.0.1 failed: Failed'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === 123; + }) + ) + )); + } + + public function provideUris() + { + return [ + [ + 'localhost', + 'mysql://localhost' + ], + [ + 'mysql://localhost', + 'mysql://localhost' + ], + [ + 'mysql://user:pass@localhost', + 'mysql://user:***@localhost' + ], + [ + 'mysql://user:@localhost', + 'mysql://user:***@localhost' + ], + [ + 'mysql://user@localhost', + 'mysql://user@localhost' + ] + ]; + } + + /** + * @dataProvider provideUris + * @param string $uri + * @param string $safe + */ + public function testCancelConnectWillCancelPendingConnection($uri, $safe) { $pending = new Promise(function () { }, $this->expectCallableOnce()); $loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock(); @@ -390,14 +491,21 @@ public function testCancelConnectWillCancelPendingConnection() $connector->expects($this->once())->method('connect')->willReturn($pending); $factory = new Factory($loop, $connector); - $promise = $factory->createConnection('127.0.0.1'); + $promise = $factory->createConnection($uri); $promise->cancel(); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getMessage() === 'Connection to database server cancelled'); - }))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) use ($safe) { + return $e->getMessage() === 'Connection to ' . $safe . ' cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); + }) + ) + )); } public function testCancelConnectWillCancelPendingConnectionWithRuntimeException() @@ -414,10 +522,17 @@ public function testCancelConnectWillCancelPendingConnectionWithRuntimeException $promise->cancel(); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getMessage() === 'Connection to database server cancelled'); - }))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); + }) + ) + )); } public function testCancelConnectDuringAuthenticationWillCloseConnection() @@ -434,10 +549,17 @@ public function testCancelConnectDuringAuthenticationWillCloseConnection() $promise->cancel(); - $promise->then(null, $this->expectCallableOnceWith($this->isInstanceOf('RuntimeException'))); - $promise->then(null, $this->expectCallableOnceWith($this->callback(function ($e) { - return ($e->getMessage() === 'Connection to database server cancelled'); - }))); + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection to mysql://127.0.0.1 cancelled (ECONNABORTED)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103); + }) + ) + )); } public function testConnectLazyWithAnyAuthWillQuitWithoutRunning() diff --git a/tests/Io/ConnectionTest.php b/tests/Io/ConnectionTest.php index 0241354..c9503c7 100644 --- a/tests/Io/ConnectionTest.php +++ b/tests/Io/ConnectionTest.php @@ -25,12 +25,44 @@ public function testQueryAfterQuitRejectsImmediately() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->query('SELECT 1')->then(null, $this->expectCallableOnce()); + $promise = $conn->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); + } + + public function testQueryAfterCloseRejectsImmediately() + { + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $executor = $this->getMockBuilder('React\MySQL\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->never())->method('enqueue'); + + $conn = new Connection($stream, $executor); + $conn->close(); + $promise = $conn->query('SELECT 1'); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } - /** - * @expectedException React\MySQL\Exception - */ public function testQueryStreamAfterQuitThrows() { $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); @@ -39,7 +71,13 @@ public function testQueryStreamAfterQuitThrows() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->queryStream('SELECT 1'); + + try { + $conn->queryStream('SELECT 1'); + } catch (\RuntimeException $e) { + $this->assertEquals('Connection closing (ENOTCONN)', $e->getMessage()); + $this->assertEquals(defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107, $e->getCode()); + } } public function testPingAfterQuitRejectsImmediately() @@ -50,7 +88,19 @@ public function testPingAfterQuitRejectsImmediately() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->ping()->then(null, $this->expectCallableOnce()); + $promise = $conn->ping(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); } public function testQuitAfterQuitRejectsImmediately() @@ -61,6 +111,49 @@ public function testQuitAfterQuitRejectsImmediately() $conn = new Connection($stream, $executor); $conn->quit(); - $conn->quit()->then(null, $this->expectCallableOnce()); + $promise = $conn->quit(); + + $promise->then(null, $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closing (ENOTCONN)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ENOTCONN') ? SOCKET_ENOTCONN : 107); + }) + ) + )); + } + + public function testCloseStreamEmitsErrorEvent() + { + $closeHandler = null; + $stream = $this->getMockBuilder('React\Socket\ConnectionInterface')->getMock(); + $stream->expects($this->exactly(2))->method('on')->withConsecutive( + array('error', $this->anything()), + array('close', $this->callback(function ($arg) use (&$closeHandler) { + $closeHandler = $arg; + return true; + })) + ); + $executor = $this->getMockBuilder('React\MySQL\Io\Executor')->setMethods(['enqueue'])->getMock(); + $executor->expects($this->never())->method('enqueue'); + + $conn = new Connection($stream, $executor); + $conn->on('error', $this->expectCallableOnceWith( + $this->logicalAnd( + $this->isInstanceOf('RuntimeException'), + $this->callback(function (\RuntimeException $e) { + return $e->getMessage() === 'Connection closed by peer (ECONNRESET)'; + }), + $this->callback(function (\RuntimeException $e) { + return $e->getCode() === (defined('SOCKET_ECONNRESET') ? SOCKET_ECONNRESET : 104); + }) + ) + )); + + $this->assertNotNull($closeHandler); + $closeHandler(); } } diff --git a/tests/Io/ParserTest.php b/tests/Io/ParserTest.php index 45460bb..b96c9e9 100644 --- a/tests/Io/ParserTest.php +++ b/tests/Io/ParserTest.php @@ -22,12 +22,23 @@ public function testClosingStreamEmitsErrorForCurrentCommand() $command = new QueryCommand(); $command->on('error', $this->expectCallableOnce()); + $error = null; + $command->on('error', function ($e) use (&$error) { + $error = $e; + }); + // hack to inject command as current command $ref = new \ReflectionProperty($parser, 'currCommand'); $ref->setAccessible(true); $ref->setValue($parser, $command); $stream->close(); + + $this->assertInstanceOf('RuntimeException', $error); + assert($error instanceof \RuntimeException); + + $this->assertEquals('Connection closing (ECONNABORTED)', $error->getMessage()); + $this->assertEquals(defined('SOCKET_ECONNABORTED') ? SOCKET_ECONNABORTED : 103, $error->getCode()); } public function testUnexpectedErrorWithoutCurrentCommandWillBeIgnored()