Skip to content

Commit

Permalink
Extract common client and server methods to the WebSocketTransport cl…
Browse files Browse the repository at this point in the history
…ass.
  • Loading branch information
yallie committed Dec 12, 2024
1 parent dc9c50a commit 7517697
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 221 deletions.
162 changes: 162 additions & 0 deletions CoreRemoting/Channels/Websocket/WebSocketTransport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
using System;
using System.Net.WebSockets;
using CoreRemoting.IO;
using System.Threading.Tasks;
using System.Threading;

namespace CoreRemoting.Channels.Websocket
{
/// <summary>
/// Abstract web socket transport for reading and writing messages.
/// </summary>
public abstract class WebSocketTransport : IRawMessageTransport
{
/// <summary>
/// Handshake cookies: message encryption flag.
/// </summary>
protected const string MessageEncryptionCookie = "MessageEncryption";

/// <summary>
/// Handshake cookies: client public key.
/// </summary>
protected const string ClientPublicKeyCookie = "ShakeHands";

/// <summary>
/// Buffer size to read incoming messages.
/// Note: LOH threshold is ~85 kilobytes
/// </summary>
protected const int BufferSize = 16 * 1024;

/// <summary>
/// Empty message for ping, handshake, etc.
/// </summary>
protected static ArraySegment<byte> EmptyMessage = new([]);

/// <inheritdoc />
public NetworkException LastException { get; set; }

/// <summary>
/// Event: fires when the channel is connected.
/// </summary>
public event Action Connected;

/// <summary>
/// Fires the <see cref="Connected"/> event.
/// </summary>
protected void OnConnected() =>
Connected?.Invoke();

/// <inheritdoc />
public event Action Disconnected;

/// <summary>
/// Fires the <see cref="Disconnected"/> event.
/// </summary>
protected void OnDisconnected() =>
Disconnected?.Invoke();

/// <inheritdoc />
public event Action<byte[]> ReceiveMessage;

/// <inheritdoc />
public event Action<string, Exception> ErrorOccured;

/// <summary>
/// Web socket used to read and write messages.
/// </summary>
protected abstract WebSocket WebSocket { get; }

/// <summary>
/// Starts listening for the incoming messages.
/// </summary>
public virtual Guid StartListening()
{
_ = ReadIncomingMessages();
return Guid.Empty;
}

/// <summary>
/// Reads the incoming websocket messages
/// and fires the <see cref="ReceiveMessage"/> event.
/// </summary>
private async Task ReadIncomingMessages()
{
var buffer = new byte[BufferSize];
var segment = new ArraySegment<byte>(buffer);
var webSocket = WebSocket;

try
{
while (webSocket.State == WebSocketState.Open)
{
var ms = new SmallBlockMemoryStream();
while (true)
{
var result = await webSocket.ReceiveAsync(
segment, CancellationToken.None)
.ConfigureAwait(false);

if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
string.Empty, CancellationToken.None)
.ConfigureAwait(false);

Disconnected?.Invoke();
}
else
{
ms.Write(buffer, 0, result.Count);
}

if (result.EndOfMessage)
break;
}

if (ms.Length > 0)
{
// flush received websocket message
var message = new byte[(int)ms.Length];
ms.Position = 0;
ms.Read(message, 0, message.Length);
ReceiveMessage?.Invoke(message);
}
}
}
catch (Exception ex)
{
LastException = ex as NetworkException ??
new NetworkException(ex.Message, ex);

ErrorOccured?.Invoke(ex.Message, LastException);
Disconnected?.Invoke();
}
finally
{
webSocket?.Dispose();
}
}

/// <inheritdoc />
public async Task<bool> SendMessageAsync(byte[] rawMessage)
{
try
{
await WebSocket.SendAsync(
new ArraySegment<byte>(rawMessage),
WebSocketMessageType.Binary, true, CancellationToken.None)
.ConfigureAwait(false);

return true;
}
catch (Exception ex)
{
LastException = ex as NetworkException ??
new NetworkException(ex.Message, ex);

ErrorOccured?.Invoke(ex.Message, LastException);
return false;
}
}
}
}
133 changes: 17 additions & 116 deletions CoreRemoting/Channels/Websocket/WebsocketClientChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,32 @@
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using CoreRemoting.IO;
using CoreRemoting.Toolbox;

namespace CoreRemoting.Channels.Websocket;

/// <summary>
/// Client side websocket channel implementation based on System.Net.Websockets.
/// </summary>
public class WebsocketClientChannel : IClientChannel, IRawMessageTransport
public class WebsocketClientChannel : WebSocketTransport, IClientChannel
{
// note: LOH threshold is ~85 kilobytes
private const int BufferSize = 16 * 1024;
private static ArraySegment<byte> EmptyMessage = new ArraySegment<byte>([]);

/// <summary>
/// Gets or sets the URL this channel is connected to.
/// </summary>
public string Url { get; private set; }

private Uri Uri { get; set; }

private ClientWebSocket WebSocket { get; set; }

/// <inheritdoc />
public bool IsConnected { get; private set; }

/// <inheritdoc />
public IRawMessageTransport RawMessageTransport => this;

/// <inheritdoc />
public NetworkException LastException { get; set; }

/// <summary>
/// Event: fires when the channel is connected.
/// </summary>
public event Action Connected;

/// <inheritdoc />
public event Action Disconnected;
private ClientWebSocket ClientWebSocket { get; set; }

/// <inheritdoc />
public event Action<byte[]> ReceiveMessage;

/// <inheritdoc />
public event Action<string, Exception> ErrorOccured;
protected override WebSocket WebSocket => ClientWebSocket;

/// <inheritdoc />
public void Init(IRemotingClient client)
Expand All @@ -62,18 +43,18 @@ public void Init(IRemotingClient client)

// note: Nagle is disabled by default on NetCore, see
// https://github.com/dotnet/runtime/discussions/81175
WebSocket = new ClientWebSocket();
WebSocket.Options.Cookies = new CookieContainer();
WebSocket.Options.Cookies.Add(new Cookie(
name: "MessageEncryption",
ClientWebSocket = new ClientWebSocket();
ClientWebSocket.Options.Cookies = new CookieContainer();
ClientWebSocket.Options.Cookies.Add(new Cookie(
name: MessageEncryptionCookie,
value: client.MessageEncryption ? "1" : "0",
path: Uri.LocalPath,
domain: Uri.Host));

if (client.MessageEncryption)
{
WebSocket.Options.Cookies.Add(new Cookie(
name: "ShakeHands",
ClientWebSocket.Options.Cookies.Add(new Cookie(
name: ClientPublicKeyCookie,
value: Convert.ToBase64String(client.PublicKey),
path: Uri.LocalPath,
domain: Uri.Host));
Expand All @@ -83,98 +64,18 @@ public void Init(IRemotingClient client)
/// <inheritdoc />
public async Task ConnectAsync()
{
await WebSocket.ConnectAsync(
await ClientWebSocket.ConnectAsync(
new Uri(Url), CancellationToken.None)
.ConfigureAwait(false);

IsConnected = true;
Connected?.Invoke();
OnConnected();

await WebSocket.SendAsync(EmptyMessage,
WebSocketMessageType.Binary, true, CancellationToken.None)
.ConfigureAwait(false);

_ = StartListening();
}

private async Task StartListening()
{
var buffer = new byte[BufferSize];
var segment = new ArraySegment<byte>(buffer);
var webSocket = WebSocket;

try
{
while (webSocket.State == WebSocketState.Open)
{
var ms = new SmallBlockMemoryStream();
while (true)
{
var result = await webSocket.ReceiveAsync(
segment, CancellationToken.None)
.ConfigureAwait(false);

if (result.MessageType == WebSocketMessageType.Close)
{
await webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure,
string.Empty, CancellationToken.None)
.ConfigureAwait(false);

Disconnected?.Invoke();
}
else
{
ms.Write(buffer, 0, result.Count);
}

if (result.EndOfMessage)
break;
}

if (ms.Length > 0)
{
// flush received websocket message
var message = new byte[(int)ms.Length];
ms.Position = 0;
ms.Read(message, 0, message.Length);
ReceiveMessage?.Invoke(message);
}
}
}
catch (Exception ex)
{
LastException = ex as NetworkException ??
new NetworkException(ex.Message, ex);

ErrorOccured?.Invoke(ex.Message, ex);
Disconnected?.Invoke();
}
finally
{
webSocket?.Dispose();
}
}

/// <inheritdoc />
public async Task<bool> SendMessageAsync(byte[] rawMessage)
{
try
{
await WebSocket.SendAsync(
new ArraySegment<byte>(rawMessage),
WebSocketMessageType.Binary, true, CancellationToken.None)
.ConfigureAwait(false);

return true;
}
catch (Exception ex)
{
LastException = ex as NetworkException ??
new NetworkException(ex.Message, ex);

ErrorOccured?.Invoke(ex.Message, ex);
return false;
}
StartListening();
}

/// <inheritdoc />
Expand All @@ -187,7 +88,7 @@ public async Task DisconnectAsync()

try
{
await WebSocket.CloseAsync(
await ClientWebSocket.CloseAsync(
WebSocketCloseStatus.NormalClosure, "Ok", CancellationToken.None)
.ConfigureAwait(false);
}
Expand All @@ -196,20 +97,20 @@ await WebSocket.CloseAsync(
// web socket already closed?
}

Disconnected?.Invoke();
OnDisconnected();
}

/// <inheritdoc />
public void Dispose()
{
if (WebSocket == null)
if (ClientWebSocket == null)
return;

if (IsConnected)
DisconnectAsync()
.JustWait();

WebSocket.Dispose();
WebSocket = null;
ClientWebSocket.Dispose();
ClientWebSocket = null;
}
}
Loading

0 comments on commit 7517697

Please sign in to comment.