Skip to content

Commit

Permalink
Added benchmarks. Optimized sync paths in message reader. Removed Seq…
Browse files Browse the repository at this point in the history
…uenceReader from frame reader.
  • Loading branch information
mattnischan committed Jan 30, 2020
1 parent 6a79408 commit 4cbc411
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 27 deletions.
73 changes: 55 additions & 18 deletions src/Bedrock.Framework/Protocols/WebSockets/WebSocketFrameReader.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
using Bedrock.Framework.Protocols;
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Text;

namespace Bedrock.Framework.Protocols.WebSockets
{
/// <summary>
/// An implementation of IMessageReader that parses WebSocket message frames.
/// </summary>
public struct WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
public class WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
{
/// <summary>
/// Attempts to parse a message from a sequence.
Expand All @@ -21,17 +23,59 @@ public struct WebSocketFrameReader : IMessageReader<WebSocketReadFrame>
/// <returns>True if parsed successfully, false otherwise.</returns>
public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePosition consumed, ref SequencePosition examined, out WebSocketReadFrame message)
{
var reader = new SequenceReader<byte>(input);

//We need to at least be able to read the start of frame header
if (input.Length < 2)
{
message = default;
return false;
}

reader.TryRead(out var finOpcodeByte);
reader.TryRead(out var maskLengthByte);
if (input.IsSingleSegment)
{
if (TryParseSpan(input.FirstSpan, input.Length, out var bytesRead, out message))
{
consumed = input.GetPosition(bytesRead);
examined = consumed;

return true;
}

return false;

}
else
{
Span<byte> tempSpan = stackalloc byte[14];

var bytesToCopy = Math.Min(input.Length, tempSpan.Length);
input.Slice(0, bytesToCopy).CopyTo(tempSpan);

if (TryParseSpan(tempSpan, input.Length, out var bytesRead, out message))
{
consumed = input.GetPosition(bytesRead);
examined = consumed;

return true;
}

return false;
}
}

/// <summary>
/// Attempts to parse a span for a WebSocket frame header.
/// </summary>
/// <param name="span">The span to attempt to parse.</param>
/// <param name="inputLength">The input sequence length.</param>
/// <param name="bytesRead">The number of bytes read from the span.</param>
/// <param name="message">The WebSocketReadFrame read from the span.</param>
/// <returns>True if the span could be parsed, false otherwise.</returns>
private bool TryParseSpan(in ReadOnlySpan<byte> span, long inputLength, out int bytesRead, out WebSocketReadFrame message)
{
bytesRead = 0;

var finOpcodeByte = span[0];
var maskLengthByte = span[1];

var masked = (maskLengthByte & 0b1000_0000) != 0;
ulong initialPayloadLength = (ulong)(maskLengthByte & 0b0111_1111);
Expand All @@ -49,25 +93,23 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
break;
}

if (reader.Remaining < extendedPayloadLengthSize + maskSize)
if (inputLength < extendedPayloadLengthSize + maskSize + 2)
{
message = default;
return false;
}

var fin = (finOpcodeByte & 0b1000_0000) != 0;
var opcode = (WebSocketOpcode)(finOpcodeByte & 0b0000_1111);

ulong payloadLength = 0;
if (extendedPayloadLengthSize == 2)
{
reader.TryReadBigEndian(out short length);
payloadLength = (ushort)length;
payloadLength = BinaryPrimitives.ReadUInt16BigEndian(span.Slice(2));
}
else if (extendedPayloadLengthSize == 8)
{
reader.TryReadBigEndian(out long length);
payloadLength = (ulong)length;
payloadLength = BinaryPrimitives.ReadUInt64BigEndian(span.Slice(2));
}
else
{
Expand All @@ -77,18 +119,13 @@ public bool TryParseMessage(in ReadOnlySequence<byte> input, ref SequencePositio
int maskingKey = 0;
if (masked)
{
Span<byte> maskBytes = stackalloc byte[sizeof(int)];
reader.TryCopyTo(maskBytes);

maskingKey = BitConverter.ToInt32(maskBytes);
reader.Advance(sizeof(int));
maskingKey = BinaryPrimitives.ReadInt32LittleEndian(span.Slice(2 + extendedPayloadLengthSize));
}

var header = new WebSocketHeader(fin, opcode, masked, payloadLength, maskingKey);
message = new WebSocketReadFrame(header, new WebSocketPayloadReader(header));

consumed = input.GetPosition(2 + extendedPayloadLengthSize + maskSize);
examined = consumed;
bytesRead = 2 + extendedPayloadLengthSize + maskSize;
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class WebSocketMessageReader
/// </summary>
private ProtocolReader _protocolReader;

/// <summary>
/// An instance of the WebSocket frame reader.
/// </summary>
private WebSocketFrameReader _frameReader = new WebSocketFrameReader();

/// <summary>
/// An instance of the control frame handler for handling control frame flow.
/// </summary>
Expand Down Expand Up @@ -111,7 +116,18 @@ public async ValueTask<MessageReadResult> ReadAsync(CancellationToken cancellati
//TODO: Is this even the right value to use in this context?
if (_buffer.UnconsumedWrittenCount < _options.PauseWriterThreshold)
{
var payloadSequence = await _protocolReader.ReadAsync(_payloadReader, cancellationToken).ConfigureAwait(false);
var readTask = _protocolReader.ReadAsync(_payloadReader, cancellationToken);
ProtocolReadResult<ReadOnlySequence<byte>> payloadSequence;

if (readTask.IsCompletedSuccessfully)
{
payloadSequence = readTask.Result;
}
else
{
payloadSequence = await readTask;
}

if (payloadSequence.IsCanceled)
{
throw new OperationCanceledException("Read canceled while attempting to read WebSocket payload.");
Expand Down Expand Up @@ -235,11 +251,46 @@ public async ValueTask<bool> MoveNextMessageAsync(CancellationToken cancellation
/// </summary>
/// <param name="cancellationToken">A cancellation token, if any.</param>
/// <returns>A new WebSocket read frame.</returns>
private async ValueTask<WebSocketReadFrame> GetNextMessageFrameAsync(CancellationToken cancellationToken)
private ValueTask<WebSocketReadFrame> GetNextMessageFrameAsync(CancellationToken cancellationToken)
{
var readTask = _protocolReader.ReadAsync(_frameReader, cancellationToken);
ProtocolReadResult<WebSocketReadFrame> frame;

if (readTask.IsCompletedSuccessfully)
{
frame = readTask.Result;
}
else
{
return DoGetNextMessageAsync(readTask, cancellationToken);
}

if (frame.IsCanceled)
{
throw new OperationCanceledException("Read canceled while attempting to read WebSocket frame.");
}

var header = frame.Message.Header;
if (!(header.Opcode == WebSocketOpcode.Ping || header.Opcode == WebSocketOpcode.Pong || header.Opcode == WebSocketOpcode.Close))
{
_protocolReader.Advance();
return new ValueTask<WebSocketReadFrame>(frame.Message);
}

return DoGetNextMessageAsync(readTask, cancellationToken);
}

/// <summary>
/// Gets the next message frame from the transport as an async call.
/// </summary>
/// <param name="readTask">The active protocol reader ReadAsync task to await.</param>
/// <param name="cancellationToken">A cancellation token, if any.</param>
/// <returns>A new WebSocket read frame.</returns>
private async ValueTask<WebSocketReadFrame> DoGetNextMessageAsync(ValueTask<ProtocolReadResult<WebSocketReadFrame>> readTask, CancellationToken cancellationToken)
{
while (true)
{
var frame = await _protocolReader.ReadAsync(new WebSocketFrameReader(), cancellationToken).ConfigureAwait(false);
var frame = await readTask.ConfigureAwait(false);
_protocolReader.Advance();

if (frame.IsCanceled)
Expand All @@ -264,6 +315,8 @@ private async ValueTask<WebSocketReadFrame> GetNextMessageFrameAsync(Cancellatio
{
return frame.Message;
}

readTask = _protocolReader.ReadAsync(new WebSocketFrameReader(), cancellationToken);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,16 @@ private unsafe void MaskUnmaskSpan(in ReadOnlySpan<byte> span, long bytesToRead)
int alignedMask = (int)BitOperations.RotateRight((uint)_maskingKey, (int)localMaskIndex * 8);

//Calculate the last possible pointer position that would be able to consume a whole vector
var fullVectorReadPtr = dataEndPtr - Vector<byte>.Count;
var vectorSize = Vector<byte>.Count;
var fullVectorReadPtr = dataEndPtr - vectorSize;

//If we found we should use SIMD and there is sufficient data to read
if (Vector.IsHardwareAccelerated && dataPtr <= fullVectorReadPtr)
{
Debug.Assert((int)dataPtr % sizeof(int) == 0);

//Align by whole ints to full SIMD load boundary to avoid a perf penalty for unaligned loads
while ((ulong)dataPtr % (uint)Vector<byte>.Count != 0)
while ((ulong)dataPtr % (uint)vectorSize != 0)
{
Debug.Assert(dataPtr < dataEndPtr);

Expand All @@ -133,7 +134,7 @@ private unsafe void MaskUnmaskSpan(in ReadOnlySpan<byte> span, long bytesToRead)
do
{
*(Vector<byte>*)dataPtr ^= maskVector;
dataPtr += Vector<byte>.Count;
dataPtr += vectorSize;
}
while (dataPtr <= fullVectorReadPtr);
}
Expand Down
25 changes: 22 additions & 3 deletions src/Bedrock.Framework/Protocols/WebSockets/WebSocketProtocol.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,29 @@ public WebSocketProtocol(ConnectionContext connection, WebSocketProtocolType pro
/// </summary>
/// <param name="cancellationToken">A cancellation token, if any.</param>
/// <returns>A WebSocketReadResult.</returns>
public async ValueTask<WebSocketReadResult> ReadAsync(CancellationToken cancellationToken = default)
public ValueTask<WebSocketReadResult> ReadAsync(CancellationToken cancellationToken = default)
{
var messageIsText = await _messageReader.MoveNextMessageAsync(cancellationToken).ConfigureAwait(false);
return new WebSocketReadResult(messageIsText, _messageReader);
var readTask = _messageReader.MoveNextMessageAsync(cancellationToken);

if (readTask.IsCompletedSuccessfully)
{
return new ValueTask<WebSocketReadResult>(new WebSocketReadResult(readTask.Result, _messageReader));
}
else
{
return DoReadAsync(readTask, cancellationToken);
}
}

/// <summary>
/// Reads the next message from the WebSocket asynchronously.
/// </summary>
/// <param name="readTask">The active message reader task.</param>
/// <param name="cancellationToken">A cancellation token, if any.</param>
/// <returns>A WebSocketReadResult.</returns>
private async ValueTask<WebSocketReadResult> DoReadAsync(ValueTask<bool> readTask, CancellationToken cancellationToken)
{
return new WebSocketReadResult(await readTask.ConfigureAwait(false), _messageReader);
}

/// <summary>
Expand Down
1 change: 1 addition & 0 deletions tests/Bedrock.Framework.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ static void Main(string[] args)
{
typeof(ProtocolReaderBenchmarks),
typeof(MessagePipeReaderBenchmarks),
typeof(WebSocketProtocolBenchmarks)
};
}
}
Loading

0 comments on commit 4cbc411

Please sign in to comment.