Skip to content

Commit

Permalink
Wait for frame on data read retry
Browse files Browse the repository at this point in the history
  • Loading branch information
at669 committed Feb 5, 2021
1 parent b8c42bb commit fb965da
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static void OpenWindow()
{
ROSSettingsEditor window = GetWindow<ROSSettingsEditor>(false, "ROS Settings", true);
window.minSize = new Vector2(300, 65);
window.maxSize = new Vector2(600, 200);
window.maxSize = new Vector2(600, 250);
window.Show();
}

Expand Down Expand Up @@ -51,7 +51,7 @@ protected virtual void OnGUI()
EditorGUILayout.Space();
EditorGUILayout.LabelField("If awaiting a service response:", EditorStyles.boldLabel);
prefab.awaitDataMaxRetries = EditorGUILayout.IntField(
new GUIContent("Max Retries",
new GUIContent("Max Service Retries",
"While waiting for a service to respond, check this many times before giving up."),
prefab.awaitDataMaxRetries);
prefab.awaitDataSleepSeconds = EditorGUILayout.FloatField(
Expand All @@ -62,6 +62,10 @@ protected virtual void OnGUI()
new GUIContent("Read chunk size",
"While reading received messages, read this many bytes at a time."),
prefab.readChunkSize);
prefab.awaitDataReadRetry = EditorGUILayout.IntField(
new GUIContent("Max Read retries",
"While waiting to read a full message, check this many times before giving up."),
prefab.awaitDataReadRetry);

if (GUI.changed)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public class ROSConnection : MonoBehaviour
[Tooltip("While reading received messages, read this many bytes at a time.")]
public int readChunkSize = 2048;

[Tooltip("While waiting to read a full message, check this many times before giving up.")]
public int awaitDataReadRetry = 10;

static object _lock = new object(); // sync lock
static List<Task> activeConnectionTasks = new List<Task>(); // pending connections

Expand Down Expand Up @@ -147,7 +150,9 @@ public async void SendServiceMessage<RESPONSE>(string rosServiceName, Message se
try
{
string serviceName;
byte[] content = ReadMessageContents(networkStream, out serviceName);
var messageContents = await ReadMessageContents(networkStream);
var topicName = messageContents.Item1;
var content = messageContents.Item2;
serviceResponse.Deserialize(content, 0);
}
catch (Exception e)
Expand Down Expand Up @@ -254,18 +259,19 @@ protected async Task HandleConnectionAsync(TcpClient tcpClient)
await Task.Yield();

// continue asynchronously on another thread
ReadMessage(tcpClient.GetStream());
await ReadMessage(tcpClient.GetStream());
}

void ReadMessage(NetworkStream networkStream)
async Task ReadMessage(NetworkStream networkStream)
{
if (!networkStream.CanRead)
return;

SubscriberCallback subs;

string topicName;
byte[] content = ReadMessageContents(networkStream, out topicName);
var messageContents = await ReadMessageContents(networkStream);

This comment has been minimized.

Copy link
@LaurieCheers-unity

LaurieCheers-unity Feb 5, 2021

Contributor

(string topicName, byte[] content) = await ReadMessageContents(networkStream);

var topicName = messageContents.Item1;
var content = messageContents.Item2;

if (!subscribers.TryGetValue(topicName, out subs))
return; // not interested in this topic
Expand Down Expand Up @@ -294,7 +300,7 @@ void ReadMessage(NetworkStream networkStream)
}
}

byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)
async Task<Tuple<string, byte[]>> ReadMessageContents(NetworkStream networkStream)
{
// Get first bytes to determine length of topic name
byte[] rawTopicBytes = new byte[4];
Expand All @@ -304,7 +310,7 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)
// Read and convert topic name
byte[] topicNameBytes = new byte[topicLength];
networkStream.Read(topicNameBytes, 0, topicNameBytes.Length);
topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength);
string topicName = Encoding.ASCII.GetString(topicNameBytes, 0, topicLength);

byte[] full_message_size_bytes = new byte[4];
networkStream.Read(full_message_size_bytes, 0, full_message_size_bytes.Length);
Expand All @@ -316,12 +322,12 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)

int attempts = 0;
// Read in message contents until completion, or until attempts are maxed out
while (bytesRemaining > 0 && attempts <= this.awaitDataMaxRetries)
while (bytesRemaining > 0 && attempts <= this.awaitDataReadRetry)
{
if (attempts == this.awaitDataMaxRetries)
if (attempts == this.awaitDataReadRetry)
{
Debug.LogError("No more data available on network stream after " + awaitDataMaxRetries + " attempts.");
return readBuffer;
Debug.LogError("No more data to read network stream after " + awaitDataReadRetry + " attempts.");
return Tuple.Create(topicName, readBuffer);
}

// Read the minimum of the bytes remaining, or the designated readChunkSize in segments until none remain
Expand All @@ -332,9 +338,10 @@ byte[] ReadMessageContents(NetworkStream networkStream, out string topicName)
if (!networkStream.DataAvailable)
{
attempts++;
await Task.Yield();
}
}
return readBuffer;
return Tuple.Create(topicName, readBuffer);
}

/// <summary>
Expand Down

0 comments on commit fb965da

Please sign in to comment.