Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Update to use Browser instead of internal Client #21

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
]
},
"require": {
"react/http": "1.5.* | 1.6.* | 1.7.* | 1.8.*",
"react/http": "^1.9",
"voryx/event-loop": "^3.0 || ^2.0.2",
"ratchet/rfc6455": "^0.3",
"reactivex/rxphp": "^2.0.1"
Expand Down
166 changes: 82 additions & 84 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
use Psr\Http\Message\ResponseInterface;
use Ratchet\RFC6455\Handshake\ClientNegotiator;
use React\EventLoop\LoopInterface;
use React\Http\Browser;
use React\Http\Client\Client as HttpClient;
use React\Socket\ConnectionInterface;
use React\Socket\ConnectorInterface;
use React\Stream\ThroughStream;
use Rx\Disposable\CallbackDisposable;
use Rx\DisposableInterface;
use Rx\Observable;
Expand All @@ -23,8 +25,7 @@ class Client extends Observable
protected $url;
private $useMessageObject;
private $subProtocols;
private $loop;
private $connector;
private $browser;
private $keepAlive;
private $headers;

Expand All @@ -46,16 +47,13 @@ public function __construct(string $url, bool $useMessageObject = false, array $
$this->url = $url;
$this->useMessageObject = $useMessageObject;
$this->subProtocols = $subProtocols;
$this->loop = $loop ?: \EventLoop\getLoop();
$this->connector = $connector;
$this->browser = new Browser($connector, $loop);
$this->keepAlive = $keepAlive;
$this->headers = $headers;
}

public function _subscribe(ObserverInterface $clientObserver): DisposableInterface
{
$client = new HttpClient($this->loop, $this->connector);

$cNegotiator = new ClientNegotiator();

/** @var Psr7Request $nRequest */
Expand All @@ -78,88 +76,88 @@ public function _subscribe(ObserverInterface $clientObserver): DisposableInterfa
$flatHeaders[$k] = $v;
}

$request = $client->request('GET', $this->url, $flatHeaders, '1.1');

$request->on('error', function ($error) use ($clientObserver) {
$clientObserver->onError($error);
});

$request->on('response', function (ResponseInterface $response, ConnectionInterface $request) use ($flatHeaders, $cNegotiator, $nRequest, $clientObserver) {
if ($response->getStatusCode() !== 101) {
$clientObserver->onError(new \Exception('Unexpected response code ' . $response->getStatusCode()));
return;
}

$psr7Response = new Psr7Response(
$response->getStatusCode(),
$response->getHeaders(),
null,
$response->getProtocolVersion()
);

$psr7Request = new Psr7Request('GET', $this->url, $flatHeaders);

if (!$cNegotiator->validateResponse($psr7Request, $psr7Response)) {
$clientObserver->onError(new \Exception('Invalid response'));
return;
$writeStream = new ThroughStream();
$this->browser->requestStreaming('GET', $this->url, $flatHeaders, $writeStream)->then(
function (ResponseInterface $response) use ($flatHeaders, $cNegotiator, $nRequest, $clientObserver) {
if ($response->getStatusCode() !== 101) {
$clientObserver->onError(new \Exception('Unexpected response code ' . $response->getStatusCode()));
return;
}

$psr7Response = new Psr7Response(
$response->getStatusCode(),
$response->getHeaders(),
null,
$response->getProtocolVersion()
);

$psr7Request = new Psr7Request('GET', $this->url, $flatHeaders);

if (!$cNegotiator->validateResponse($psr7Request, $psr7Response)) {
$clientObserver->onError(new \Exception('Invalid response'));
return;
}

$subprotoHeader = $psr7Response->getHeader('Sec-WebSocket-Protocol');

$clientObserver->onNext(new MessageSubject(
new AnonymousObservable(function (ObserverInterface $observer) use ($response, $writeStream, $clientObserver) {

$writeStream->on('data', function ($data) use ($observer) {
$observer->onNext($data);
});

$writeStream->on('error', function ($e) use ($observer) {
$observer->onError($e);
});

$writeStream->on('close', function () use ($observer, $clientObserver) {
$observer->onCompleted();

// complete the parent observer - we only do 1 connection
$clientObserver->onCompleted();
});

$writeStream->on('end', function () use ($observer, $clientObserver) {
$observer->onCompleted();

// complete the parent observer - we only do 1 connection
$clientObserver->onCompleted();
});

return new CallbackDisposable(function () use ($writeStream) {
$writeStream->end();
});
}),
new CallbackObserver(
function ($x) use ($writeStream) {
$writeStream->write($x);
},
function ($e) use ($writeStream) {
$writeStream->close();
},
function () use ($writeStream) {
$writeStream->end();
}
),
true,
$this->useMessageObject,
$subprotoHeader,
$nRequest,
$psr7Response,
$this->keepAlive
));
},
static function ($error) use ($clientObserver) {
$clientObserver->onError($error);
}

$subprotoHeader = $psr7Response->getHeader('Sec-WebSocket-Protocol');

$clientObserver->onNext(new MessageSubject(
new AnonymousObservable(function (ObserverInterface $observer) use ($response, $request, $clientObserver) {

$request->on('data', function ($data) use ($observer) {
$observer->onNext($data);
});

$request->on('error', function ($e) use ($observer) {
$observer->onError($e);
});

$request->on('close', function () use ($observer, $clientObserver) {
$observer->onCompleted();

// complete the parent observer - we only do 1 connection
$clientObserver->onCompleted();
});

$request->on('end', function () use ($observer, $clientObserver) {
$observer->onCompleted();

// complete the parent observer - we only do 1 connection
$clientObserver->onCompleted();
});

return new CallbackDisposable(function () use ($request) {
$request->end();
});
}),
new CallbackObserver(
function ($x) use ($request) {
$request->write($x);
},
function ($e) use ($request) {
$request->close();
},
function () use ($request) {
$request->end();
}
),
true,
$this->useMessageObject,
$subprotoHeader,
$nRequest,
$psr7Response,
$this->keepAlive
));
});
);

// empty write to force connection and header send
$request->write('');
$writeStream->write('');

return new CallbackDisposable(function () use ($request) {
$request->close();
return new CallbackDisposable(function () use ($writeStream) {
$writeStream->close();
});
}
}