Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amanda/read chunks #62

Merged
merged 6 commits into from
Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions com.unity.robotics.ros-tcp-connector/Editor/ROSSettingsEditor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static void OpenWindow()
{
ROSSettingsEditor window = GetWindow<ROSSettingsEditor>(false, "ROS Settings", true);
window.minSize = new Vector2(300, 65);
window.maxSize = new Vector2(600, 200);
window.maxSize = new Vector2(600, 250);
window.Show();
}

Expand Down Expand Up @@ -51,13 +51,21 @@ protected virtual void OnGUI()
EditorGUILayout.Space();
EditorGUILayout.LabelField("If awaiting a service response:", EditorStyles.boldLabel);
prefab.awaitDataMaxRetries = EditorGUILayout.IntField(
new GUIContent("Max Retries",
new GUIContent("Max Service Retries",
"While waiting for a service to respond, check this many times before giving up."),
prefab.awaitDataMaxRetries);
prefab.awaitDataSleepSeconds = EditorGUILayout.FloatField(
new GUIContent("Sleep (seconds)",
"While waiting for a service to respond, wait this many seconds between checks."),
prefab.awaitDataSleepSeconds);
prefab.readChunkSize = EditorGUILayout.IntField(
new GUIContent("Read chunk size",
"While reading received messages, read this many bytes at a time."),
prefab.readChunkSize);
prefab.awaitDataReadRetry = EditorGUILayout.IntField(
new GUIContent("Max Read retries",
"While waiting to read a full message, check this many times before giving up."),
prefab.awaitDataReadRetry);

if (GUI.changed)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ public class ROSConnection : MonoBehaviour
[Tooltip("While waiting for a service to respond, wait this many seconds between checks.")]
public float awaitDataSleepSeconds = 1.0f;

[Tooltip("While reading received messages, read this many bytes at a time.")]
public int readChunkSize = 2048;

[Tooltip("While waiting to read a full message, check this many times before giving up.")]
public int awaitDataReadRetry = 10;

static object _lock = new object(); // sync lock
static List<Task> activeConnectionTasks = new List<Task>(); // pending connections

Expand Down Expand Up @@ -144,7 +150,7 @@ public async void SendServiceMessage<RESPONSE>(string rosServiceName, Message se
try
{
string serviceName;
byte[] content = ReadMessageContents(networkStream, out serviceName);
(string topicName, byte[] content) = await ReadMessageContents(networkStream);
serviceResponse.Deserialize(content, 0);
}
catch (Exception e)
Expand Down Expand Up @@ -251,18 +257,17 @@ protected async Task HandleConnectionAsync(TcpClient tcpClient)
await Task.Yield();

// continue asynchronously on another thread
ReadMessage(tcpClient.GetStream());
await ReadMessage(tcpClient.GetStream());
}

void ReadMessage(NetworkStream networkStream)
async Task ReadMessage(NetworkStream networkStream)
{
if (!networkStream.CanRead)
return;

SubscriberCallback subs;

string topicName;
byte[] content = ReadMessageContents(networkStream, out topicName);
(string topicName, byte[] content) = await ReadMessageContents(networkStream);

if (!subscribers.TryGetValue(topicName, out subs))
return; // not interested in this topic
Expand Down Expand Up @@ -291,7 +296,7 @@ void ReadMessage(NetworkStream networkStream)
}
}

byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)
async Task<Tuple<string, byte[]>> ReadMessageContents(NetworkStream networkStream)
{
// Get first bytes to determine length of topic name
byte[] rawTopicBytes = new byte[4];
Expand All @@ -301,7 +306,7 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)
// Read and convert topic name
byte[] topicNameBytes = new byte[topicLength];
networkStream.Read(topicNameBytes, 0, topicNameBytes.Length);
topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength);
string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength);

byte[] full_message_size_bytes = new byte[4];
networkStream.Read(full_message_size_bytes, 0, full_message_size_bytes.Length);
Expand All @@ -311,14 +316,28 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)
int bytesRemaining = full_message_size;
int totalBytesRead = 0;

while (networkStream.DataAvailable && bytesRemaining > 0)
int attempts = 0;
// Read in message contents until completion, or until attempts are maxed out
while (bytesRemaining > 0 && attempts <= this.awaitDataReadRetry)
{
int bytesRead = networkStream.Read(readBuffer, totalBytesRead, bytesRemaining);
if (attempts == this.awaitDataReadRetry)
{
Debug.LogError("No more data to read network stream after " + awaitDataReadRetry + " attempts.");
return Tuple.Create(topicName, readBuffer);
}

// Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain
int bytesRead = networkStream.Read(readBuffer, totalBytesRead, Math.Min(readChunkSize, bytesRemaining));
totalBytesRead += bytesRead;
bytesRemaining -= bytesRead;
}

return readBuffer;
if (!networkStream.DataAvailable)
{
attempts++;
await Task.Yield();
}
}
return Tuple.Create(topicName, readBuffer);
}

/// <summary>
Expand Down