Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistent stream semantics and forward compatibility with upcoming Stream v1.0 #90

Merged
merged 5 commits into from
May 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,48 @@ Basic HTTP/1.0 client.

## Basic usage

Requests are prepared using the ``Client#request()`` method. Body can be
sent with ``Request#write()``. ``Request#end()`` finishes sending the request
(or sends it at all if no body was written).

Request implements WritableStreamInterface, so a Stream can be piped to
it. Response implements ReadableStreamInterface.

Requests are prepared using the ``Client#request()`` method.

The `Request#write(string $data)` method can be used to
write data to the request body.
Data will be buffered until the underlying connection is established, at which
point buffered data will be sent and all further data will be passed to the
underlying connection immediately.

The `Request#end(?string $data = null)` method can be used to
finish sending the request.
You may optionally pass a last request body data chunk that will be sent just
like a `write()` call.
Calling this method finalizes the outgoing request body (which may be empty).
Data will be buffered until the underlying connection is established, at which
point buffered data will be sent and all further data will be ignored.

The `Request#close()` method can be used to
forefully close sending the request.
Unlike the `end()` method, this method discards any buffers and closes the
underlying connection.

Request implements WritableStreamInterface, so a Stream can be piped to it.
Interesting events emitted by Request:

* `response`: The response headers were received from the server and successfully
parsed. The first argument is a Response instance.
* `error`: An error occurred.
* `end`: The request is finished. If an error occurred, it is passed as first
argument. Second and third arguments are the Response and the Request.
* `drain`: The outgoing buffer drained and the response is ready to accept more
data for the next `write()` call.
* `error`: An error occurred, an `Exception` is passed as first argument.
* `close`: The request is closed. If an error occurred, this event will be
preceeded by an `error` event.

Response implements ReadableStreamInterface.
Interesting events emitted by Response:

* `data`: Passes a chunk of the response body as first argument and a Response
object itself as second argument. When a response encounters a chunked encoded response it will parse it transparently for the user of `Response` and removing the `Transfer-Encoding` header.
* `error`: An error occurred.
* `end`: The response has been fully received. If an error
occurred, it is passed as first argument.
* `data`: Passes a chunk of the response body as first argument.
When a response encounters a chunked encoded response it will parse it
transparently for the user and removing the `Transfer-Encoding` header.
* `error`: An error occurred, an `Exception` is passed as first argument.
* `end`: The response has been fully received.
* `close`: The response is closed. If an error occured, this event will be
preceeded by an `error` event.

### Example

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"guzzlehttp/psr7": "^1.0",
"react/socket": "^0.7",
"react/event-loop": "0.4.*",
"react/stream": "0.4.*",
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.2",
"react/promise": "~2.2",
"evenement/evenement": "~2.0"
},
Expand Down
85 changes: 47 additions & 38 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
use GuzzleHttp\Psr7 as gPsr;
use React\Socket\ConnectorInterface;
use React\Stream\WritableStreamInterface;
use React\Socket\ConnectionInterface;

/**
* @event headers-written
* @event response
* @event drain
* @event error
Expand All @@ -29,10 +29,10 @@ class Request implements WritableStreamInterface
private $stream;
private $buffer;
private $responseFactory;
private $response;
private $state = self::STATE_INIT;
private $ended = false;

private $pendingWrites = array();
private $pendingWrites = '';

public function __construct(ConnectorInterface $connector, RequestData $requestData)
{
Expand All @@ -42,39 +42,44 @@ public function __construct(ConnectorInterface $connector, RequestData $requestD

public function isWritable()
{
return self::STATE_END > $this->state;
return self::STATE_END > $this->state && !$this->ended;
}

public function writeHead()
private function writeHead()
{
if (self::STATE_WRITING_HEAD <= $this->state) {
throw new \LogicException('Headers already written');
}

$this->state = self::STATE_WRITING_HEAD;

$requestData = $this->requestData;
$streamRef = &$this->stream;
$stateRef = &$this->state;
$pendingWrites = &$this->pendingWrites;

$this
->connect()
->done(
function ($stream) use ($requestData, &$streamRef, &$stateRef) {
function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites) {
$streamRef = $stream;

$stream->on('drain', array($this, 'handleDrain'));
$stream->on('data', array($this, 'handleData'));
$stream->on('end', array($this, 'handleEnd'));
$stream->on('error', array($this, 'handleError'));
$stream->on('close', array($this, 'handleClose'));

$headers = (string) $requestData;

$stream->write($headers);
$more = $stream->write($headers . $pendingWrites);

$stateRef = Request::STATE_HEAD_WRITTEN;

$this->emit('headers-written', array($this));
// clear pending writes if non-empty
if ($pendingWrites !== '') {
$pendingWrites = '';

if ($more) {
$this->emit('drain');
}
}
},
array($this, 'handleError')
);
Expand All @@ -83,25 +88,16 @@ function ($stream) use ($requestData, &$streamRef, &$stateRef) {
public function write($data)
{
if (!$this->isWritable()) {
return;
return false;
}

// write directly to connection stream if already available
if (self::STATE_HEAD_WRITTEN <= $this->state) {
return $this->stream->write($data);
}

if (!count($this->pendingWrites)) {
$this->on('headers-written', function ($that) {
foreach ($that->pendingWrites as $pw) {
$that->write($pw);
}
$that->pendingWrites = array();
$that->emit('drain', array($that));
});
}

$this->pendingWrites[] = $data;

// otherwise buffer and try to establish connection
$this->pendingWrites .= $data;
if (self::STATE_WRITING_HEAD > $this->state) {
$this->writeHead();
}
Expand All @@ -111,22 +107,26 @@ public function write($data)

public function end($data = null)
{
if (null !== $data && !is_scalar($data)) {
throw new \InvalidArgumentException('$data must be null or scalar');
if (!$this->isWritable()) {
return;
}

if (null !== $data) {
$this->write($data);
} else if (self::STATE_WRITING_HEAD > $this->state) {
$this->writeHead();
}

$this->ended = true;
}

/** @internal */
public function handleDrain()
{
$this->emit('drain', array($this));
$this->emit('drain');
}

/** @internal */
public function handleData($data)
{
$this->buffer .= $data;
Expand All @@ -135,7 +135,7 @@ public function handleData($data)
try {
list($response, $bodyChunk) = $this->parseResponse($this->buffer);
} catch (\InvalidArgumentException $exception) {
$this->emit('error', [$exception, $this]);
$this->emit('error', array($exception));
}

$this->buffer = null;
Expand All @@ -144,14 +144,13 @@ public function handleData($data)
$this->stream->removeListener('data', array($this, 'handleData'));
$this->stream->removeListener('end', array($this, 'handleEnd'));
$this->stream->removeListener('error', array($this, 'handleError'));
$this->stream->removeListener('close', array($this, 'handleClose'));

if (!isset($response)) {
return;
}

$this->response = $response;

$response->on('end', function () {
$response->on('close', function () {
$this->close();
});
$response->on('error', function (\Exception $error) {
Expand All @@ -168,14 +167,16 @@ public function handleData($data)
}
}

/** @internal */
public function handleEnd()
{
$this->closeError(new \RuntimeException(
"Connection closed before receiving response"
"Connection ended before receiving response"
));
}

public function handleError($error)
/** @internal */
public function handleError(\Exception $error)
{
$this->closeError(new \RuntimeException(
"An error occurred in the underlying stream",
Expand All @@ -184,28 +185,36 @@ public function handleError($error)
));
}

/** @internal */
public function handleClose()
{
$this->close();
}

/** @internal */
public function closeError(\Exception $error)
{
if (self::STATE_END <= $this->state) {
return;
}
$this->emit('error', array($error, $this));
$this->close($error);
$this->emit('error', array($error));
$this->close();
}

public function close(\Exception $error = null)
public function close()
{
if (self::STATE_END <= $this->state) {
return;
}

$this->state = self::STATE_END;
$this->pendingWrites = '';

if ($this->stream) {
$this->stream->close();
}

$this->emit('end', array($error, $this->response, $this));
$this->emit('close');
$this->removeAllListeners();
}

Expand Down
39 changes: 28 additions & 11 deletions src/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@

namespace React\HttpClient;

use Evenement\EventEmitterTrait;
use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* @event data ($bodyChunk, Response $thisResponse)
* @event data ($bodyChunk)
* @event error
* @event end
*/
class Response implements ReadableStreamInterface
class Response extends EventEmitter implements ReadableStreamInterface
{
use EventEmitterTrait;

private $stream;
private $protocol;
Expand Down Expand Up @@ -48,6 +47,7 @@ public function __construct(ReadableStreamInterface $stream, $protocol, $version
$this->stream->on('data', array($this, 'handleData'));
$this->stream->on('error', array($this, 'handleError'));
$this->stream->on('end', array($this, 'handleEnd'));
$this->stream->on('close', array($this, 'handleClose'));
}

public function getProtocol()
Expand Down Expand Up @@ -75,39 +75,56 @@ public function getHeaders()
return $this->headers;
}

/** @internal */
public function handleData($data)
{
$this->emit('data', array($data, $this));
if ($this->readable) {
$this->emit('data', array($data));
}
}

/** @internal */
public function handleEnd()
{
if (!$this->readable) {
return;
}
$this->emit('end');
$this->close();
}

/** @internal */
public function handleError(\Exception $error)
{
if (!$this->readable) {
return;
}
$this->emit('error', array(new \RuntimeException(
"An error occurred in the underlying stream",
0,
$error
), $this));
)));

$this->close($error);
$this->close();
}

public function close(\Exception $error = null)
/** @internal */
public function handleClose()
{
$this->close();
}

public function close()
{
if (!$this->readable) {
return;
}

$this->readable = false;
$this->stream->close();

$this->emit('end', array($error, $this));

$this->emit('close');
$this->removeAllListeners();
$this->stream->close();
}

public function isReadable()
Expand Down
Loading