Skip to content

Commit

Permalink
Merge pull request #90 from clue-labs/stream
Browse files Browse the repository at this point in the history
Consistent stream semantics and forward compatibility with upcoming Stream v1.0
  • Loading branch information
WyriHaximus authored May 19, 2017
2 parents f5eaedb + 583b98a commit 510041f
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 152 deletions.
50 changes: 35 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,48 @@ Basic HTTP/1.0 client.

## Basic usage

Requests are prepared using the ``Client#request()`` method. Body can be
sent with ``Request#write()``. ``Request#end()`` finishes sending the request
(or sends it at all if no body was written).

Request implements WritableStreamInterface, so a Stream can be piped to
it. Response implements ReadableStreamInterface.

Requests are prepared using the ``Client#request()`` method.

The `Request#write(string $data)` method can be used to
write data to the request body.
Data will be buffered until the underlying connection is established, at which
point buffered data will be sent and all further data will be passed to the
underlying connection immediately.

The `Request#end(?string $data = null)` method can be used to
finish sending the request.
You may optionally pass a last request body data chunk that will be sent just
like a `write()` call.
Calling this method finalizes the outgoing request body (which may be empty).
Data will be buffered until the underlying connection is established, at which
point buffered data will be sent and all further data will be ignored.

The `Request#close()` method can be used to
forefully close sending the request.
Unlike the `end()` method, this method discards any buffers and closes the
underlying connection.

Request implements WritableStreamInterface, so a Stream can be piped to it.
Interesting events emitted by Request:

* `response`: The response headers were received from the server and successfully
parsed. The first argument is a Response instance.
* `error`: An error occurred.
* `end`: The request is finished. If an error occurred, it is passed as first
argument. Second and third arguments are the Response and the Request.
* `drain`: The outgoing buffer drained and the response is ready to accept more
data for the next `write()` call.
* `error`: An error occurred, an `Exception` is passed as first argument.
* `close`: The request is closed. If an error occurred, this event will be
preceeded by an `error` event.

Response implements ReadableStreamInterface.
Interesting events emitted by Response:

* `data`: Passes a chunk of the response body as first argument and a Response
object itself as second argument. When a response encounters a chunked encoded response it will parse it transparently for the user of `Response` and removing the `Transfer-Encoding` header.
* `error`: An error occurred.
* `end`: The response has been fully received. If an error
occurred, it is passed as first argument.
* `data`: Passes a chunk of the response body as first argument.
When a response encounters a chunked encoded response it will parse it
transparently for the user and removing the `Transfer-Encoding` header.
* `error`: An error occurred, an `Exception` is passed as first argument.
* `end`: The response has been fully received.
* `close`: The response is closed. If an error occured, this event will be
preceeded by an `error` event.

### Example

Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"guzzlehttp/psr7": "^1.0",
"react/socket": "^0.7",
"react/event-loop": "0.4.*",
"react/stream": "0.4.*",
"react/stream": "^1.0 || ^0.7 || ^0.6 || ^0.5 || ^0.4.2",
"react/promise": "~2.2",
"evenement/evenement": "~2.0"
},
Expand Down
85 changes: 47 additions & 38 deletions src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
use GuzzleHttp\Psr7 as gPsr;
use React\Socket\ConnectorInterface;
use React\Stream\WritableStreamInterface;
use React\Socket\ConnectionInterface;

/**
* @event headers-written
* @event response
* @event drain
* @event error
Expand All @@ -29,10 +29,10 @@ class Request implements WritableStreamInterface
private $stream;
private $buffer;
private $responseFactory;
private $response;
private $state = self::STATE_INIT;
private $ended = false;

private $pendingWrites = array();
private $pendingWrites = '';

public function __construct(ConnectorInterface $connector, RequestData $requestData)
{
Expand All @@ -42,39 +42,44 @@ public function __construct(ConnectorInterface $connector, RequestData $requestD

public function isWritable()
{
return self::STATE_END > $this->state;
return self::STATE_END > $this->state && !$this->ended;
}

public function writeHead()
private function writeHead()
{
if (self::STATE_WRITING_HEAD <= $this->state) {
throw new \LogicException('Headers already written');
}

$this->state = self::STATE_WRITING_HEAD;

$requestData = $this->requestData;
$streamRef = &$this->stream;
$stateRef = &$this->state;
$pendingWrites = &$this->pendingWrites;

$this
->connect()
->done(
function ($stream) use ($requestData, &$streamRef, &$stateRef) {
function (ConnectionInterface $stream) use ($requestData, &$streamRef, &$stateRef, &$pendingWrites) {
$streamRef = $stream;

$stream->on('drain', array($this, 'handleDrain'));
$stream->on('data', array($this, 'handleData'));
$stream->on('end', array($this, 'handleEnd'));
$stream->on('error', array($this, 'handleError'));
$stream->on('close', array($this, 'handleClose'));

$headers = (string) $requestData;

$stream->write($headers);
$more = $stream->write($headers . $pendingWrites);

$stateRef = Request::STATE_HEAD_WRITTEN;

$this->emit('headers-written', array($this));
// clear pending writes if non-empty
if ($pendingWrites !== '') {
$pendingWrites = '';

if ($more) {
$this->emit('drain');
}
}
},
array($this, 'handleError')
);
Expand All @@ -83,25 +88,16 @@ function ($stream) use ($requestData, &$streamRef, &$stateRef) {
public function write($data)
{
if (!$this->isWritable()) {
return;
return false;
}

// write directly to connection stream if already available
if (self::STATE_HEAD_WRITTEN <= $this->state) {
return $this->stream->write($data);
}

if (!count($this->pendingWrites)) {
$this->on('headers-written', function ($that) {
foreach ($that->pendingWrites as $pw) {
$that->write($pw);
}
$that->pendingWrites = array();
$that->emit('drain', array($that));
});
}

$this->pendingWrites[] = $data;

// otherwise buffer and try to establish connection
$this->pendingWrites .= $data;
if (self::STATE_WRITING_HEAD > $this->state) {
$this->writeHead();
}
Expand All @@ -111,22 +107,26 @@ public function write($data)

public function end($data = null)
{
if (null !== $data && !is_scalar($data)) {
throw new \InvalidArgumentException('$data must be null or scalar');
if (!$this->isWritable()) {
return;
}

if (null !== $data) {
$this->write($data);
} else if (self::STATE_WRITING_HEAD > $this->state) {
$this->writeHead();
}

$this->ended = true;
}

/** @internal */
public function handleDrain()
{
$this->emit('drain', array($this));
$this->emit('drain');
}

/** @internal */
public function handleData($data)
{
$this->buffer .= $data;
Expand All @@ -135,7 +135,7 @@ public function handleData($data)
try {
list($response, $bodyChunk) = $this->parseResponse($this->buffer);
} catch (\InvalidArgumentException $exception) {
$this->emit('error', [$exception, $this]);
$this->emit('error', array($exception));
}

$this->buffer = null;
Expand All @@ -144,14 +144,13 @@ public function handleData($data)
$this->stream->removeListener('data', array($this, 'handleData'));
$this->stream->removeListener('end', array($this, 'handleEnd'));
$this->stream->removeListener('error', array($this, 'handleError'));
$this->stream->removeListener('close', array($this, 'handleClose'));

if (!isset($response)) {
return;
}

$this->response = $response;

$response->on('end', function () {
$response->on('close', function () {
$this->close();
});
$response->on('error', function (\Exception $error) {
Expand All @@ -168,14 +167,16 @@ public function handleData($data)
}
}

/** @internal */
public function handleEnd()
{
$this->closeError(new \RuntimeException(
"Connection closed before receiving response"
"Connection ended before receiving response"
));
}

public function handleError($error)
/** @internal */
public function handleError(\Exception $error)
{
$this->closeError(new \RuntimeException(
"An error occurred in the underlying stream",
Expand All @@ -184,28 +185,36 @@ public function handleError($error)
));
}

/** @internal */
public function handleClose()
{
$this->close();
}

/** @internal */
public function closeError(\Exception $error)
{
if (self::STATE_END <= $this->state) {
return;
}
$this->emit('error', array($error, $this));
$this->close($error);
$this->emit('error', array($error));
$this->close();
}

public function close(\Exception $error = null)
public function close()
{
if (self::STATE_END <= $this->state) {
return;
}

$this->state = self::STATE_END;
$this->pendingWrites = '';

if ($this->stream) {
$this->stream->close();
}

$this->emit('end', array($error, $this->response, $this));
$this->emit('close');
$this->removeAllListeners();
}

Expand Down
39 changes: 28 additions & 11 deletions src/Response.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@

namespace React\HttpClient;

use Evenement\EventEmitterTrait;
use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

/**
* @event data ($bodyChunk, Response $thisResponse)
* @event data ($bodyChunk)
* @event error
* @event end
*/
class Response implements ReadableStreamInterface
class Response extends EventEmitter implements ReadableStreamInterface
{
use EventEmitterTrait;

private $stream;
private $protocol;
Expand Down Expand Up @@ -48,6 +47,7 @@ public function __construct(ReadableStreamInterface $stream, $protocol, $version
$this->stream->on('data', array($this, 'handleData'));
$this->stream->on('error', array($this, 'handleError'));
$this->stream->on('end', array($this, 'handleEnd'));
$this->stream->on('close', array($this, 'handleClose'));
}

public function getProtocol()
Expand Down Expand Up @@ -75,39 +75,56 @@ public function getHeaders()
return $this->headers;
}

/** @internal */
public function handleData($data)
{
$this->emit('data', array($data, $this));
if ($this->readable) {
$this->emit('data', array($data));
}
}

/** @internal */
public function handleEnd()
{
if (!$this->readable) {
return;
}
$this->emit('end');
$this->close();
}

/** @internal */
public function handleError(\Exception $error)
{
if (!$this->readable) {
return;
}
$this->emit('error', array(new \RuntimeException(
"An error occurred in the underlying stream",
0,
$error
), $this));
)));

$this->close($error);
$this->close();
}

public function close(\Exception $error = null)
/** @internal */
public function handleClose()
{
$this->close();
}

public function close()
{
if (!$this->readable) {
return;
}

$this->readable = false;
$this->stream->close();

$this->emit('end', array($error, $this));

$this->emit('close');
$this->removeAllListeners();
$this->stream->close();
}

public function isReadable()
Expand Down
Loading

0 comments on commit 510041f

Please sign in to comment.