diff --git a/README.md b/README.md index 7c45788..3764938 100644 --- a/README.md +++ b/README.md @@ -6,28 +6,48 @@ Basic HTTP/1.0 client. ## Basic usage -Requests are prepared using the ``Client#request()`` method. Body can be -sent with ``Request#write()``. ``Request#end()`` finishes sending the request -(or sends it at all if no body was written). - -Request implements WritableStreamInterface, so a Stream can be piped to -it. Response implements ReadableStreamInterface. - +Requests are prepared using the ``Client#request()`` method. + +The `Request#write(string $data)` method can be used to +write data to the request body. +Data will be buffered until the underlying connection is established, at which +point buffered data will be sent and all further data will be passed to the +underlying connection immediately. + +The `Request#end(?string $data = null)` method can be used to +finish sending the request. +You may optionally pass a last request body data chunk that will be sent just +like a `write()` call. +Calling this method finalizes the outgoing request body (which may be empty). +Data will be buffered until the underlying connection is established, at which +point buffered data will be sent and all further data will be ignored. + +The `Request#close()` method can be used to +forefully close sending the request. +Unlike the `end()` method, this method discards any buffers and closes the +underlying connection. + +Request implements WritableStreamInterface, so a Stream can be piped to it. Interesting events emitted by Request: * `response`: The response headers were received from the server and successfully parsed. The first argument is a Response instance. -* `error`: An error occurred. -* `end`: The request is finished. If an error occurred, it is passed as first - argument. Second and third arguments are the Response and the Request. +* `drain`: The outgoing buffer drained and the response is ready to accept more + data for the next `write()` call. +* `error`: An error occurred, an `Exception` is passed as first argument. +* `close`: The request is closed. If an error occurred, this event will be + preceeded by an `error` event. +Response implements ReadableStreamInterface. Interesting events emitted by Response: -* `data`: Passes a chunk of the response body as first argument and a Response - object itself as second argument. When a response encounters a chunked encoded response it will parse it transparently for the user of `Response` and removing the `Transfer-Encoding` header. -* `error`: An error occurred. -* `end`: The response has been fully received. If an error - occurred, it is passed as first argument. +* `data`: Passes a chunk of the response body as first argument. + When a response encounters a chunked encoded response it will parse it + transparently for the user and removing the `Transfer-Encoding` header. +* `error`: An error occurred, an `Exception` is passed as first argument. +* `end`: The response has been fully received. +* `close`: The response is closed. If an error occured, this event will be + preceeded by an `error` event. ### Example diff --git a/composer.json b/composer.json index 7d28327..795d8b6 100644 --- a/composer.json +++ b/composer.json @@ -8,7 +8,7 @@ "guzzlehttp/psr7": "^1.0", "react/socket": "^0.7", "react/event-loop": "0.4.*", - "react/stream": "0.4.*", + "react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.2", "react/promise": "~2.2", "evenement/evenement": "~2.0" }, diff --git a/src/Request.php b/src/Request.php index f93bc65..8f49f63 100644 --- a/src/Request.php +++ b/src/Request.php @@ -6,9 +6,9 @@ use GuzzleHttp\Psr7 as gPsr; use React\Socket\ConnectorInterface; use React\Stream\WritableStreamInterface; +use React\Socket\ConnectionInterface; /** - * @event headers-written * @event response * @event drain * @event error @@ -29,10 +29,10 @@ class Request implements WritableStreamInterface private $stream; private $buffer; private $responseFactory; - private $response; private $state = self::STATE_INIT; + private $ended = false; - private $pendingWrites = array(); + private $pendingWrites = ''; public function __construct(ConnectorInterface $connector, RequestData $requestData) { @@ -42,39 +42,44 @@ public function __construct(ConnectorInterface $connector, RequestData $requestD public function isWritable() { - return self::STATE_END > $this->state; + return self::STATE_END > $this->state && !$this->ended; } - public function writeHead() + private function writeHead() { - if (self::STATE_WRITING_HEAD <= $this->state) { - throw new \LogicException('Headers already written'); - } - $this->state = self::STATE_WRITING_HEAD; $requestData = $this->requestData; $streamRef = &$this->stream; $stateRef = &$this->state; + $pendingWrites = &$this->pendingWrites; $this ->connect() ->done( - function ($stream) use ($requestData, &$streamRef, &$stateRef) { + function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites) { $streamRef = $stream; $stream->on('drain', array($this, 'handleDrain')); $stream->on('data', array($this, 'handleData')); $stream->on('end', array($this, 'handleEnd')); $stream->on('error', array($this, 'handleError')); + $stream->on('close', array($this, 'handleClose')); $headers = (string) $requestData; - $stream->write($headers); + $more = $stream->write($headers . $pendingWrites); $stateRef = Request::STATE_HEAD_WRITTEN; - $this->emit('headers-written', array($this)); + // clear pending writes if non-empty + if ($pendingWrites !== '') { + $pendingWrites = ''; + + if ($more) { + $this->emit('drain'); + } + } }, array($this, 'handleError') ); @@ -83,25 +88,16 @@ function ($stream) use ($requestData, &$streamRef, &$stateRef) { public function write($data) { if (!$this->isWritable()) { - return; + return false; } + // write directly to connection stream if already available if (self::STATE_HEAD_WRITTEN <= $this->state) { return $this->stream->write($data); } - if (!count($this->pendingWrites)) { - $this->on('headers-written', function ($that) { - foreach ($that->pendingWrites as $pw) { - $that->write($pw); - } - $that->pendingWrites = array(); - $that->emit('drain', array($that)); - }); - } - - $this->pendingWrites[] = $data; - + // otherwise buffer and try to establish connection + $this->pendingWrites .= $data; if (self::STATE_WRITING_HEAD > $this->state) { $this->writeHead(); } @@ -111,8 +107,8 @@ public function write($data) public function end($data = null) { - if (null !== $data && !is_scalar($data)) { - throw new \InvalidArgumentException('$data must be null or scalar'); + if (!$this->isWritable()) { + return; } if (null !== $data) { @@ -120,13 +116,17 @@ public function end($data = null) } else if (self::STATE_WRITING_HEAD > $this->state) { $this->writeHead(); } + + $this->ended = true; } + /** @internal */ public function handleDrain() { - $this->emit('drain', array($this)); + $this->emit('drain'); } + /** @internal */ public function handleData($data) { $this->buffer .= $data; @@ -135,7 +135,7 @@ public function handleData($data) try { list($response, $bodyChunk) = $this->parseResponse($this->buffer); } catch (\InvalidArgumentException $exception) { - $this->emit('error', [$exception, $this]); + $this->emit('error', array($exception)); } $this->buffer = null; @@ -144,14 +144,13 @@ public function handleData($data) $this->stream->removeListener('data', array($this, 'handleData')); $this->stream->removeListener('end', array($this, 'handleEnd')); $this->stream->removeListener('error', array($this, 'handleError')); + $this->stream->removeListener('close', array($this, 'handleClose')); if (!isset($response)) { return; } - $this->response = $response; - - $response->on('end', function () { + $response->on('close', function () { $this->close(); }); $response->on('error', function (\Exception $error) { @@ -168,14 +167,16 @@ public function handleData($data) } } + /** @internal */ public function handleEnd() { $this->closeError(new \RuntimeException( - "Connection closed before receiving response" + "Connection ended before receiving response" )); } - public function handleError($error) + /** @internal */ + public function handleError(\Exception $error) { $this->closeError(new \RuntimeException( "An error occurred in the underlying stream", @@ -184,28 +185,36 @@ public function handleError($error) )); } + /** @internal */ + public function handleClose() + { + $this->close(); + } + + /** @internal */ public function closeError(\Exception $error) { if (self::STATE_END <= $this->state) { return; } - $this->emit('error', array($error, $this)); - $this->close($error); + $this->emit('error', array($error)); + $this->close(); } - public function close(\Exception $error = null) + public function close() { if (self::STATE_END <= $this->state) { return; } $this->state = self::STATE_END; + $this->pendingWrites = ''; if ($this->stream) { $this->stream->close(); } - $this->emit('end', array($error, $this->response, $this)); + $this->emit('close'); $this->removeAllListeners(); } diff --git a/src/Response.php b/src/Response.php index d1af948..6aa1f12 100644 --- a/src/Response.php +++ b/src/Response.php @@ -2,19 +2,18 @@ namespace React\HttpClient; -use Evenement\EventEmitterTrait; +use Evenement\EventEmitter; use React\Stream\ReadableStreamInterface; use React\Stream\Util; use React\Stream\WritableStreamInterface; /** - * @event data ($bodyChunk, Response $thisResponse) + * @event data ($bodyChunk) * @event error * @event end */ -class Response implements ReadableStreamInterface +class Response extends EventEmitter implements ReadableStreamInterface { - use EventEmitterTrait; private $stream; private $protocol; @@ -48,6 +47,7 @@ public function __construct(ReadableStreamInterface $stream, $protocol, $version $this->stream->on('data', array($this, 'handleData')); $this->stream->on('error', array($this, 'handleError')); $this->stream->on('end', array($this, 'handleEnd')); + $this->stream->on('close', array($this, 'handleClose')); } public function getProtocol() @@ -75,39 +75,56 @@ public function getHeaders() return $this->headers; } + /** @internal */ public function handleData($data) { - $this->emit('data', array($data, $this)); + if ($this->readable) { + $this->emit('data', array($data)); + } } + /** @internal */ public function handleEnd() { + if (!$this->readable) { + return; + } + $this->emit('end'); $this->close(); } + /** @internal */ public function handleError(\Exception $error) { + if (!$this->readable) { + return; + } $this->emit('error', array(new \RuntimeException( "An error occurred in the underlying stream", 0, $error - ), $this)); + ))); - $this->close($error); + $this->close(); } - public function close(\Exception $error = null) + /** @internal */ + public function handleClose() + { + $this->close(); + } + + public function close() { if (!$this->readable) { return; } $this->readable = false; + $this->stream->close(); - $this->emit('end', array($error, $this)); - + $this->emit('close'); $this->removeAllListeners(); - $this->stream->close(); } public function isReadable() diff --git a/tests/RequestTest.php b/tests/RequestTest.php index 61f346c..f693651 100644 --- a/tests/RequestTest.php +++ b/tests/RequestTest.php @@ -5,8 +5,10 @@ use React\HttpClient\Request; use React\HttpClient\RequestData; use React\Stream\Stream; +use React\Stream\DuplexResourceStream; use React\Promise\RejectedPromise; use React\Promise\Deferred; +use React\Promise\Promise; class RequestTest extends TestCase { @@ -52,21 +54,29 @@ public function requestShouldBindToStreamEventsAndUseconnector() ->method('on') ->with('error', $this->identicalTo(array($request, 'handleError'))); $this->stream - ->expects($this->at(5)) + ->expects($this->at(4)) + ->method('on') + ->with('close', $this->identicalTo(array($request, 'handleClose'))); + $this->stream + ->expects($this->at(6)) ->method('removeListener') ->with('drain', $this->identicalTo(array($request, 'handleDrain'))); $this->stream - ->expects($this->at(6)) + ->expects($this->at(7)) ->method('removeListener') ->with('data', $this->identicalTo(array($request, 'handleData'))); $this->stream - ->expects($this->at(7)) + ->expects($this->at(8)) ->method('removeListener') ->with('end', $this->identicalTo(array($request, 'handleEnd'))); $this->stream - ->expects($this->at(8)) + ->expects($this->at(9)) ->method('removeListener') ->with('error', $this->identicalTo(array($request, 'handleError'))); + $this->stream + ->expects($this->at(10)) + ->method('removeListener') + ->with('close', $this->identicalTo(array($request, 'handleClose'))); $response = $this->response; @@ -76,7 +86,7 @@ public function requestShouldBindToStreamEventsAndUseconnector() $response->expects($this->at(0)) ->method('on') - ->with('end', $this->anything()) + ->with('close', $this->anything()) ->will($this->returnCallback(function ($event, $cb) use (&$endCallback) { $endCallback = $cb; })); @@ -95,18 +105,13 @@ public function requestShouldBindToStreamEventsAndUseconnector() ->with($response); $request->on('response', $handler); - $request->on('close', $this->expectCallableNever()); + $request->on('end', $this->expectCallableNever()); $handler = $this->createCallableMock(); $handler->expects($this->once()) - ->method('__invoke') - ->with( - null, - $this->isInstanceof('React\HttpClient\Response'), - $this->isInstanceof('React\HttpClient\Request') - ); + ->method('__invoke'); - $request->on('end', $handler); + $request->on('close', $handler); $request->end(); $request->handleData("HTTP/1.0 200 OK\r\n"); @@ -129,29 +134,23 @@ public function requestShouldEmitErrorIfConnectionFails() $handler->expects($this->once()) ->method('__invoke') ->with( - $this->isInstanceOf('RuntimeException'), - $this->isInstanceOf('React\HttpClient\Request') + $this->isInstanceOf('RuntimeException') ); $request->on('error', $handler); $handler = $this->createCallableMock(); $handler->expects($this->once()) - ->method('__invoke') - ->with( - $this->isInstanceOf('RuntimeException'), - null, - $this->isInstanceOf('React\HttpClient\Request') - ); + ->method('__invoke'); - $request->on('end', $handler); - $request->on('close', $this->expectCallableNever()); + $request->on('close', $handler); + $request->on('end', $this->expectCallableNever()); $request->end(); } /** @test */ - public function requestShouldEmitErrorIfConnectionEndsBeforeResponseIsParsed() + public function requestShouldEmitErrorIfConnectionClosesBeforeResponseIsParsed() { $requestData = new RequestData('GET', 'http://www.example.com'); $request = new Request($this->connector, $requestData); @@ -162,23 +161,17 @@ public function requestShouldEmitErrorIfConnectionEndsBeforeResponseIsParsed() $handler->expects($this->once()) ->method('__invoke') ->with( - $this->isInstanceOf('RuntimeException'), - $this->isInstanceOf('React\HttpClient\Request') + $this->isInstanceOf('RuntimeException') ); $request->on('error', $handler); $handler = $this->createCallableMock(); $handler->expects($this->once()) - ->method('__invoke') - ->with( - $this->isInstanceOf('RuntimeException'), - null, - $this->isInstanceOf('React\HttpClient\Request') - ); + ->method('__invoke'); - $request->on('end', $handler); - $request->on('close', $this->expectCallableNever()); + $request->on('close', $handler); + $request->on('end', $this->expectCallableNever()); $request->end(); $request->handleEnd(); @@ -196,23 +189,17 @@ public function requestShouldEmitErrorIfConnectionEmitsError() $handler->expects($this->once()) ->method('__invoke') ->with( - $this->isInstanceOf('Exception'), - $this->isInstanceOf('React\HttpClient\Request') + $this->isInstanceOf('Exception') ); $request->on('error', $handler); $handler = $this->createCallableMock(); $handler->expects($this->once()) - ->method('__invoke') - ->with( - $this->isInstanceOf('Exception'), - null, - $this->isInstanceOf('React\HttpClient\Request') - ); + ->method('__invoke'); - $request->on('end', $handler); - $request->on('close', $this->expectCallableNever()); + $request->on('close', $handler); + $request->on('end', $this->expectCallableNever()); $request->end(); $request->handleError(new \Exception('test')); @@ -230,13 +217,12 @@ public function requestShouldEmitErrorIfGuzzleParseThrowsException() $handler->expects($this->once()) ->method('__invoke') ->with( - $this->isInstanceOf('\InvalidArgumentException'), - $this->isInstanceOf('React\HttpClient\Request') + $this->isInstanceOf('\InvalidArgumentException') ); $request->on('error', $handler); - $request->writeHead(); + $request->end(); $request->handleData("\r\n\r\n"); } @@ -268,13 +254,9 @@ public function postRequestShouldSendAPostRequest() $this->successfulConnectionMock(); $this->stream - ->expects($this->at(4)) - ->method('write') - ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#")); - $this->stream - ->expects($this->at(5)) + ->expects($this->once()) ->method('write') - ->with($this->identicalTo("some post data")); + ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsome post data$#")); $factory = $this->createCallableMock(); $factory->expects($this->once()) @@ -297,14 +279,10 @@ public function writeWithAPostRequestShouldSendToTheStream() $this->successfulConnectionMock(); - $this->stream - ->expects($this->at(4)) - ->method('write') - ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#")); $this->stream ->expects($this->at(5)) ->method('write') - ->with($this->identicalTo("some")); + ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsome$#")); $this->stream ->expects($this->at(6)) ->method('write') @@ -338,20 +316,59 @@ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent $resolveConnection = $this->successfulAsyncConnectionMock(); - $this->stream - ->expects($this->at(4)) - ->method('write') - ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#")); $this->stream ->expects($this->at(5)) ->method('write') - ->with($this->identicalTo("some")); + ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsomepost$#")) + ->willReturn(true); $this->stream ->expects($this->at(6)) ->method('write') - ->with($this->identicalTo("post")); + ->with($this->identicalTo("data")); + + $factory = $this->createCallableMock(); + $factory->expects($this->once()) + ->method('__invoke') + ->will($this->returnValue($this->response)); + + $request->setResponseFactory($factory); + + $this->assertFalse($request->write("some")); + $this->assertFalse($request->write("post")); + + $request->on('drain', $this->expectCallableOnce()); + $request->once('drain', function () use ($request) { + $request->write("data"); + $request->end(); + }); + + $resolveConnection(); + + $request->handleData("HTTP/1.0 200 OK\r\n"); + $request->handleData("Content-Type: text/plain\r\n"); + $request->handleData("\r\nbody"); + } + + /** @test */ + public function writeWithAPostRequestShouldForwardDrainEventIfFirstChunkExceedsBuffer() + { + $requestData = new RequestData('POST', 'http://www.example.com'); + $request = new Request($this->connector, $requestData); + + $this->stream = $this->getMockBuilder('React\Socket\Connection') + ->disableOriginalConstructor() + ->setMethods(array('write')) + ->getMock(); + + $resolveConnection = $this->successfulAsyncConnectionMock(); + $this->stream - ->expects($this->at(7)) + ->expects($this->at(0)) + ->method('write') + ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsomepost$#")) + ->willReturn(false); + $this->stream + ->expects($this->at(1)) ->method('write') ->with($this->identicalTo("data")); @@ -365,12 +382,14 @@ public function writeWithAPostRequestShouldSendBodyAfterHeadersAndEmitDrainEvent $this->assertFalse($request->write("some")); $this->assertFalse($request->write("post")); + $request->on('drain', $this->expectCallableOnce()); $request->once('drain', function () use ($request) { $request->write("data"); $request->end(); }); $resolveConnection(); + $this->stream->emit('drain'); $request->handleData("HTTP/1.0 200 OK\r\n"); $request->handleData("Content-Type: text/plain\r\n"); @@ -385,14 +404,10 @@ public function pipeShouldPipeDataIntoTheRequestBody() $this->successfulConnectionMock(); - $this->stream - ->expects($this->at(4)) - ->method('write') - ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\n$#")); $this->stream ->expects($this->at(5)) ->method('write') - ->with($this->identicalTo("some")); + ->with($this->matchesRegularExpression("#^POST / HTTP/1\.0\r\nHost: www.example.com\r\nUser-Agent:.*\r\n\r\nsome$#")); $this->stream ->expects($this->at(6)) ->method('write') @@ -414,7 +429,7 @@ public function pipeShouldPipeDataIntoTheRequestBody() $request->setResponseFactory($factory); $stream = fopen('php://memory', 'r+'); - $stream = new Stream($stream, $loop); + $stream = class_exists('React\Stream\DuplexResourceStream') ? new DuplexResourceStream($stream, $loop) : new Stream($stream, $loop); $stream->pipe($request); $stream->emit('data', array('some')); @@ -428,15 +443,77 @@ public function pipeShouldPipeDataIntoTheRequestBody() /** * @test - * @expectedException InvalidArgumentException - * @expectedExceptionMessage $data must be null or scalar */ - public function endShouldOnlyAcceptScalars() + public function writeShouldStartConnecting() + { + $requestData = new RequestData('POST', 'http://www.example.com'); + $request = new Request($this->connector, $requestData); + + $this->connector->expects($this->once()) + ->method('connect') + ->with('www.example.com:80') + ->willReturn(new Promise(function () { })); + + $request->write('test'); + } + + /** + * @test + */ + public function endShouldStartConnectingAndChangeStreamIntoNonWritableMode() { $requestData = new RequestData('POST', 'http://www.example.com'); $request = new Request($this->connector, $requestData); - $request->end(array()); + $this->connector->expects($this->once()) + ->method('connect') + ->with('www.example.com:80') + ->willReturn(new Promise(function () { })); + + $request->end(); + + $this->assertFalse($request->isWritable()); + } + + /** + * @test + */ + public function closeShouldEmitCloseEvent() + { + $requestData = new RequestData('POST', 'http://www.example.com'); + $request = new Request($this->connector, $requestData); + + $request->on('close', $this->expectCallableOnce()); + $request->close(); + } + + /** + * @test + */ + public function writeAfterCloseReturnsFalse() + { + $requestData = new RequestData('POST', 'http://www.example.com'); + $request = new Request($this->connector, $requestData); + + $request->close(); + + $this->assertFalse($request->isWritable()); + $this->assertFalse($request->write('nope')); + } + + /** + * @test + */ + public function endAfterCloseIsNoOp() + { + $requestData = new RequestData('POST', 'http://www.example.com'); + $request = new Request($this->connector, $requestData); + + $this->connector->expects($this->never()) + ->method('connect'); + + $request->close(); + $request->end(); } /** @test */ @@ -451,7 +528,7 @@ public function requestShouldRelayErrorEventsFromResponse() $response->expects($this->at(0)) ->method('on') - ->with('end', $this->anything()); + ->with('close', $this->anything()); $response->expects($this->at(1)) ->method('on') ->with('error', $this->anything()) @@ -482,11 +559,11 @@ public function requestShouldRemoveAllListenerAfterClosed() $requestData = new RequestData('GET', 'http://www.example.com'); $request = new Request($this->connector, $requestData); - $request->on('end', function () {}); - $this->assertCount(1, $request->listeners('end')); + $request->on('close', function () {}); + $this->assertCount(1, $request->listeners('close')); $request->close(); - $this->assertCount(0, $request->listeners('end')); + $this->assertCount(0, $request->listeners('close')); } private function successfulConnectionMock() @@ -530,7 +607,7 @@ public function multivalueHeader() $response->expects($this->at(0)) ->method('on') - ->with('end', $this->anything()); + ->with('close', $this->anything()); $response->expects($this->at(1)) ->method('on') ->with('error', $this->anything()) diff --git a/tests/ResponseTest.php b/tests/ResponseTest.php index e4720b9..2bea171 100644 --- a/tests/ResponseTest.php +++ b/tests/ResponseTest.php @@ -11,8 +11,7 @@ class ResponseTest extends TestCase public function setUp() { - $this->stream = $this->getMockBuilder('React\Stream\Stream') - ->disableOriginalConstructor() + $this->stream = $this->getMockBuilder('React\Stream\DuplexStreamInterface') ->getMock(); } @@ -31,23 +30,31 @@ public function responseShouldEmitEndEventOnEnd() ->expects($this->at(2)) ->method('on') ->with('end', $this->anything()); + $this->stream + ->expects($this->at(3)) + ->method('on') + ->with('close', $this->anything()); $response = new Response($this->stream, 'HTTP', '1.0', '200', 'OK', array('Content-Type' => 'text/plain')); $handler = $this->createCallableMock(); $handler->expects($this->once()) ->method('__invoke') - ->with('some data', $this->anything()); + ->with('some data'); $response->on('data', $handler); $handler = $this->createCallableMock(); $handler->expects($this->once()) - ->method('__invoke') - ->with(null, $this->isInstanceOf('React\HttpClient\Response')); + ->method('__invoke'); $response->on('end', $handler); - $response->on('close', $this->expectCallableNever()); + + $handler = $this->createCallableMock(); + $handler->expects($this->once()) + ->method('__invoke'); + + $response->on('close', $handler); $this->stream ->expects($this->at(0)) @@ -106,7 +113,7 @@ public function chunkedEncodingResponse() ); $buffer = ''; - $response->on('data', function ($data, $stream) use (&$buffer) { + $response->on('data', function ($data) use (&$buffer) { $buffer.= $data; }); $this->assertSame('', $buffer);