Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve websocket performance #37

Merged
merged 3 commits into from
Jun 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
## Ignition Launch 1.x

### Ignition Launch 1.x.x (2020-xx-xx)

1. Improve websocket performance by throttling the busy loop.
* [Pull Request 36](https://github.com/ignitionrobotics/ign-launch/pull/36)

1. Fix empty SSL xml elements in the websocket plugin.
* [Pull Request 35](https://github.com/ignitionrobotics/ign-launch/pull/35)

### Ignition Launch 1.7.0 (2020-06-16)

1. Added SSL to websocket server.
Expand Down
36 changes: 29 additions & 7 deletions plugins/websocket_server/WebsocketServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ int rootCallback(struct lws *_wsi,
case LWS_CALLBACK_SERVER_WRITEABLE:
{
std::lock_guard<std::mutex> lock(self->connections[fd]->mutex);
//while (!self->connections[fd]->buffer.empty())
if (!self->connections[fd]->buffer.empty())
{
int msgSize = self->connections[fd]->len.front();
Expand All @@ -101,6 +100,8 @@ int rootCallback(struct lws *_wsi,
}
else
{
std::scoped_lock<std::mutex> runLock(self->runMutex);
self->messageCount--;
// Only pop the message if it was sent successfully.
self->connections[fd]->buffer.pop_front();
self->connections[fd]->len.pop_front();
Expand Down Expand Up @@ -136,9 +137,16 @@ WebsocketServer::WebsocketServer()
/////////////////////////////////////////////////
WebsocketServer::~WebsocketServer()
{
if (this->thread && this->run)
if (this->thread)
{
this->run = false;
{
std::scoped_lock<std::mutex> lock(this->runMutex);
if (this->run)
{
this->run = false;
this->runConditionVariable.notify_all();
}
}
this->thread->join();
}
this->thread = nullptr;
Expand Down Expand Up @@ -214,13 +222,13 @@ bool WebsocketServer::Load(const tinyxml2::XMLElement *_elem)
// Get the ssl cert file, if present.
const tinyxml2::XMLElement *certElem =
elem->FirstChildElement("cert_file");
if (certElem)
if (certElem && certElem->GetText())
sslCertFile = certElem->GetText();

// Get the ssl private key file, if present.
const tinyxml2::XMLElement *keyElem =
elem->FirstChildElement("private_key_file");
if (keyElem)
if (keyElem && keyElem->GetText())
sslPrivateKeyFile = keyElem->GetText();
}

Expand Down Expand Up @@ -322,6 +330,10 @@ void WebsocketServer::QueueMessage(Connection *_connection,
std::lock_guard<std::mutex> lock(_connection->mutex);
_connection->buffer.push_back(std::move(buf));
_connection->len.push_back(_size);

std::scoped_lock<std::mutex> runLock(this->runMutex);
this->messageCount++;
this->runConditionVariable.notify_all();
}
else
{
Expand All @@ -332,9 +344,19 @@ void WebsocketServer::QueueMessage(Connection *_connection,
//////////////////////////////////////////////////
void WebsocketServer::Run()
{
uint64_t timeout = 50;
using namespace std::chrono_literals;

while (this->run)
lws_service(this->context, timeout);
{
// The second parameter is a timeout that is no longer used by
// libwebsockets.
lws_service(this->context, 0);

// Wait for (1/60) seconds or an event.
std::unique_lock<std::mutex> lock(this->runMutex);
this->runConditionVariable.wait_for(lock,
0.0166s, [&]{return !this->run || this->messageCount > 0;});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see a potential issue that could be due to my poor understanding of the libwebsockets event loop.

From my brief search, if libwebsockets is implemented without an event loop like libev or libuv, message processing is done by active polling. Polling is started by explicitly calling lws_service.

Our event loop looks something like this.

WebsocketServer::Run → lws_service → rootCallback → switch(_reason) → lws_callback_on_writable(_wsi)
       (loop)                                                                   ↓
                                                                         specificCallback

lws_service is only called by WebsocketServer::Run, so websocket events are only processed every time WebsocketServer::Run loops. WebsocketServer::Run calls lws_service and then proceeds to acquire runMutex to wait until it hits a timeout, the server is stopped or there is a pending message in the queue (which is checked using messageCount).

The problem is that messageCount is only modified by websocket callbacks. Since the runMutex lock prevents WebsocketServer::Run from calling lws_service, callbacks modifying messageCount will not run until the lock times out and lws_service is called again. This will prevent the messageCount trigger from ever going off.

The way things are currently setup will increase connection latency as websocket events will only be processed every number of times per second, but it still may be good enough for our use case as it does not need (soft) real-time data.

A potential solution for a future version is to use an event loop like libev or libuv. There is likely a simpler solution I'm not seeing as well.

I may be completely wrong about the event loop, but thought this was worth raising.

On another note, if my event loop model is wrong and messages are actually processed in the background, then would it make sense to change the timeout duration from 0.0166s to this->publishPeriod?

}
}

//////////////////////////////////////////////////
Expand Down
10 changes: 10 additions & 0 deletions plugins/websocket_server/WebsocketServer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ namespace ignition
/// connections that have subscribed to the topic.
public: std::map<std::string, std::set<int>> topicConnections;

/// \brief Run loop mutex.
public: std::mutex runMutex;

/// \brief Run loop condition variable.
public: std::condition_variable runConditionVariable;

/// \brief Number of pending messages. This is used to throttle the
/// run loop.
public: int messageCount{0};

/// \brief Time of last publication for each subscribed topic. The key
/// is the topic name and the value the time of last publication.
/// \sa publishPeriod.
Expand Down
2 changes: 1 addition & 1 deletion plugins/websocket_server/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// Make sure to also disable SSL and authentication in the
// websocket_server plugin.
var ign = new Ignition({
url: 'wss://localhost:9002',
url: 'ws://localhost:9002',
key: "auth_key"
});

Expand Down