Skip to content

Commit

Permalink
Update websocket frames
Browse files Browse the repository at this point in the history
  • Loading branch information
Nate Koenig committed Apr 22, 2020
1 parent 0fdd8f3 commit f1ae31b
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 100 deletions.
2 changes: 1 addition & 1 deletion plugins/websocket_server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ if (websockets_FOUND)

add_library(${plugin} SHARED ${sources})

file (READ combined.proto MESSAGE_DEFINITIONS)
file (READ combined.proto WEBSOCKETSERVER_MESSAGE_DEFINITIONS)
configure_file("MessageDefinitions.hh.in"
"${CMAKE_CURRENT_BINARY_DIR}/MessageDefinitions.hh" @ONLY)
# Add a dependency on binary source dir so that you can change the
Expand Down
11 changes: 6 additions & 5 deletions plugins/websocket_server/MessageDefinitions.hh.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2019 Open Source Robotics Foundation
* Copyright (C) 2020 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -14,19 +14,20 @@
* limitations under the License.
*
*/
#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_
#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGE_DEFINITIONS_HH_
#ifndef IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_
#define IGNITION_LAUNCH_WEBSOCKETSERVER_MESSAGEDEFINITIONS_HH_

namespace ignition
{
namespace launch
{
/// \brief All of the protobuf messages provided by the websocket
/// server.
///
/// \todo Get this information from a running simulation. The problem
/// right now is that Ignition Transport does not show subscribers.
static const std::string kMessageDefinitions =
R"protos(@MESSAGE_DEFINITIONS@)protos";
static const std::string kWebSocketServerMessageDefinitions =
R"protos(@WEBSOCKETSERVER_MESSAGE_DEFINITIONS@)protos";
}
}
#endif
98 changes: 55 additions & 43 deletions plugins/websocket_server/WebsocketServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*
*/

#include <algorithm>
#include <ignition/common/Console.hh>
#include <ignition/common/Util.hh>
#include <ignition/msgs.hh>
Expand All @@ -24,6 +25,9 @@

using namespace ignition::launch;

#define BUILD_HEADER(_op, _topic, _type) ((_op)+","+(_topic)+","+(_type)+",")
#define BUILD_MSG(_op, _topic, _type, _payload) (BUILD_HEADER(_op, _topic, _type) + _payload)

int rootCallback(struct lws *_wsi,
enum lws_callback_reasons _reason,
void * /*user*/,
Expand Down Expand Up @@ -266,25 +270,43 @@ void WebsocketServer::OnDisconnect(int _socketId)
}
}

// Option 1: Encode everything in a Packet.
// Pros: Works, and has serialization.
// Cons: Server need to de-serialize and re-serialize messages.
//
// Option 2: Send header along with payload. Don't encode the header, and
// only copy the serialized payload.
//
// Option 3: Encode the header and payload into a Packet.

//////////////////////////////////////////////////
void WebsocketServer::OnMessage(int _socketId, const std::string &_msg)
{
// Handle the case where the client requests the message definitions.
if (_msg == "message_definitions")
// Frame: operation,topic,type,payload
std::vector<std::string> frameParts = common::split(_msg, ",");

// Check for a valid frame.
if (frameParts.size() != 4 &&
// Count the number of commas to handle a frame like "sub,,,"
std::count(_msg.begin(), _msg.end(), ',') != 3)
{
igndbg << "Message definitions request recieved\n";
this->QueueMessage(this->connections[_socketId].get(),
kMessageDefinitions.c_str(), kMessageDefinitions.length());
ignerr << "Received an invalid frame with " << frameParts.size()
<< "components when 4 is expected.\n";
return;
}

ignition::msgs::WebRequest requestMsg;
requestMsg.ParseFromString(_msg);

if (requestMsg.operation() == "topic_list")
// Handle the case where the client requests the message definitions.
if (frameParts[0] == "protos")
{
igndbg << "Protos request received\n";
this->QueueMessage(this->connections[_socketId].get(),
kWebSocketServerMessageDefinitions.c_str(),
kWebSocketServerMessageDefinitions.length());
}
else if (frameParts[0] == "topics")
{
igndbg << "Topic list request recieved\n";
ignition::msgs::Packet msg;
ignition::msgs::StringMsg_V msg;

std::vector<std::string> topics;

Expand All @@ -293,25 +315,24 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg)

// Store the topics in a message and serialize the message.
for (const std::string &topic : topics)
msg.mutable_string_msg_v()->add_data(topic);
msg.add_data(topic);

msg.set_topic("/topic_list");
msg.set_type("ignition.msgs.StringMsg_V");
std::string data = msg.SerializeAsString();
std::string data = BUILD_MSG(this->operations[PUBLISH], frameParts[0],
std::string("ignition.msgs.StringMsg_V"), msg.SerializeAsString());

// Queue the message for delivery.
this->QueueMessage(this->connections[_socketId].get(),
data.c_str(), data.length());
}
else if (requestMsg.operation() == "subscribe")
else if (frameParts[0] == "sub")
{
// Store the relation of socketId to subscribed topic.
this->topicConnections[requestMsg.topic()].insert(_socketId);
this->topicTimestamps[requestMsg.topic()] =
this->topicConnections[frameParts[1]].insert(_socketId);
this->topicTimestamps[frameParts[1]] =
std::chrono::steady_clock::now() - this->publishPeriod;

igndbg << "Subscribe request to topic[" << requestMsg.topic() << "]\n";
this->node.SubscribeRaw(requestMsg.topic(),
igndbg << "Subscribe request to topic[" << frameParts[1] << "]\n";
this->node.SubscribeRaw(frameParts[1],
std::bind(&WebsocketServer::OnWebsocketSubscribedMessage,
this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
Expand All @@ -328,44 +349,35 @@ void WebsocketServer::OnWebsocketSubscribedMessage(

if (iter != this->topicConnections.end())
{
std::lock_guard<std::mutex> mainLock(this->subscriptionMutex);
std::chrono::time_point<std::chrono::steady_clock> systemTime =
std::chrono::steady_clock::now();

ignition::msgs::Packet msg;
msg.set_topic(_info.Topic());
msg.set_type(_info.Type());

std::chrono::nanoseconds timeDelta =
systemTime - this->topicTimestamps[_info.Topic()];

if (timeDelta > this->publishPeriod)
{
if (_info.Type() == "ignition.msgs.CmdVel2D")
msg.mutable_cmd_vel2d()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.Image")
msg.mutable_image()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.StringMsg_V")
msg.mutable_string_msg_v()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.WebRequest")
msg.mutable_web_request()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.Pose")
msg.mutable_pose()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.Pose_V")
msg.mutable_pose_v()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.Time")
msg.mutable_time()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.Clock")
msg.mutable_clock()->ParseFromArray(_data, _size);
else if (_info.Type() == "ignition.msgs.WorldStatistics")
msg.mutable_world_stats()->ParseFromArray(_data, _size);
// Get the header, or build a new header if it doesn't exist.
auto header = this->publishHeaders.find(_info.Topic());
if (header == this->publishHeaders.end())
{
this->publishHeaders[_info.Topic()] = BUILD_HEADER(
this->operations[PUBLISH], _info.Topic(), _info.Type());
header = this->publishHeaders.find(_info.Topic());
}

// Store the last time this topic was published.
this->topicTimestamps[_info.Topic()] = systemTime;

std::string data = msg.SerializeAsString();
// Construct the final message.
std::string msg = header->second + std::string(_data, _size);

// Send the message
for (const int &socketId : iter->second)
{
this->QueueMessage(this->connections[socketId].get(),
data.c_str(), data.length());
msg.c_str(), msg.length());
}
}
}
Expand Down
70 changes: 68 additions & 2 deletions plugins/websocket_server/WebsocketServer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,45 @@ namespace ignition
///
/// # Websocket Server Interface
///
/// \todo: Describe the websocket API.
/// The websocket server listens for incoming requests and sends
/// messages based on the requests.
///
/// All messages on the websocket, incoming and outgoing, are structured
/// in a frame that consists of four comma separated components:
/// 1. `operation`: string,
/// 2. `topic_name`: string,
/// 3. `message_type`: string, and
/// 4. `payload`: serialized data.
///
/// The `operation` component is mandatory and must be one of:
/// 1. "sub": Subscribe to the topic in the `topic_name` component,
/// 2. "pub": Publish a message from the Ingition Transport topic in
/// the `topic_name` component,
/// 3. "topics": Get the list of available topics, and
/// 4. "protos": Get a string containing all the protobuf
/// definitions.
///
/// The `topic_name` component is mandatory for the "sub" and "pub"
/// operations. If present, it must be the name of an Ignition Transport
/// topic.
///
/// The `message_type` component is mandatory for the "pub" operation. If
/// present it names the Ignition Message type, such as
/// "ignition.msgs.Clock".
///
/// The `payload` component is mandatory for the "pub" operation. If
/// present, it contains a serialized string of an Igntion Message.
///
/// ## Example frames
///
/// 1. Get the list of topics: `topics,,,`
///
/// 2. Get the protobuf definitions: `protos,,,`
///
/// 3. Subscribe to the "/clock" topic: `sub,/clock,,`
///
/// 4. Websocker server publishing data on the "/clock" topic:
/// `pub,/clock,ignition.msgs.Clock,<serialized_data>`
///
/// # Example usage
///
Expand Down Expand Up @@ -80,11 +118,11 @@ namespace ignition

public: void OnConnect(int _socketId);
public: void OnDisconnect(int _socketId);

public: void OnMessage(int _socketId, const std::string &_msg);

public: void OnRequestMessage(int _socketId, const std::string &_msg);


private: ignition::transport::Node node;

private: bool run = true;
Expand All @@ -104,6 +142,7 @@ namespace ignition
const char *_data, const size_t _size);

public: std::mutex mutex;
public: std::mutex subscriptionMutex;
public: std::map<int, std::unique_ptr<Connection>> connections;
public: std::map<std::string, std::set<int>> topicConnections;

Expand All @@ -114,6 +153,33 @@ namespace ignition
std::chrono::time_point<std::chrono::steady_clock>>
topicTimestamps;

/// \brief The set of valid operations. This enum must align with the
/// `operations` member variable.
private: enum Operation
{
/// \brief Subscribe to a topic.
SUBSCRIBE = 0,

/// \brief Publish a message from a topic.
PUBLISH = 1,

/// \brief Get the list of topics.
TOPICS = 2,

/// \brief Get the protobuf definitions.
PROTOS = 3,
};

/// \brief The set of valid operations, in string form. These values
/// can be sent in websocket message frames.
/// These valus must align with the `Operation` enum.
private: std::vector<std::string> operations{
"sub", "pub", "topics", "protos"};

/// \brief Store publish headers for topics. This is here to improve
/// performance. Keys are topic names and values are frame headers.
private: std::map<std::string, std::string> publishHeaders;

/// \brief Period at which messages will be published on the websocket
/// for each subscribed topic.
/// \sa topicTimestamps.
Expand Down
Loading

0 comments on commit f1ae31b

Please sign in to comment.