Skip to content

Commit

Permalink
Merge pull request #443 from Pliner/reduce-outbound-frame-allocations
Browse files Browse the repository at this point in the history
Reduce outbound frame allocations
  • Loading branch information
michaelklishin authored Sep 6, 2018
2 parents b4decec + 6cac825 commit 646c289
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 61 deletions.
4 changes: 4 additions & 0 deletions projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<DefineConstants>$(DefineConstants);CORECLR</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netstandard1.5'">
<DefineConstants>$(DefineConstants);CORECLR15</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'SignedRelease' ">
<DelaySign>true</DelaySign>
<OutputType>Library</OutputType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;

namespace RabbitMQ.Client.Impl
Expand All @@ -63,5 +64,16 @@ public static T RandomItem<T>(this IList<T> list)
var hashCode = Math.Abs(Guid.NewGuid().GetHashCode());
return list.ElementAt<T>(hashCode % n);
}

internal static ArraySegment<byte> GetBufferSegment(this MemoryStream ms)
{
#if CORECLR15
var payload = ms.ToArray();
return new ArraySegment<byte>(payload, 0, payload.Length);
#else
var buffer = ms.GetBuffer();
return new ArraySegment<byte>(buffer, 0, (int)ms.Position);
#endif
}
}
}
105 changes: 50 additions & 55 deletions projects/client/RabbitMQ.Client/src/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,98 +56,103 @@ namespace RabbitMQ.Client.Impl
{
public class HeaderOutboundFrame : OutboundFrame
{
private readonly ContentHeaderBase header;
private readonly int bodyLength;

public HeaderOutboundFrame(int channel, ContentHeaderBase header, int bodyLength) : base(FrameType.FrameHeader, channel)
{
NetworkBinaryWriter writer = base.GetWriter();
this.header = header;
this.bodyLength = bodyLength;
}

public override void WritePayload(NetworkBinaryWriter writer)
{
var ms = new MemoryStream();
var nw = new NetworkBinaryWriter(ms);

nw.Write(header.ProtocolClassId);
header.WriteTo(nw, (ulong)bodyLength);

writer.Write(header.ProtocolClassId);
header.WriteTo(writer, (ulong)bodyLength);
var bufferSegment = ms.GetBufferSegment();
writer.Write((uint)bufferSegment.Count);
writer.Write(bufferSegment.Array, bufferSegment.Offset, bufferSegment.Count);
}
}

public class BodySegmentOutboundFrame : OutboundFrame
{
private readonly byte[] body;
private readonly int offset;
private readonly int count;

public BodySegmentOutboundFrame(int channel, byte[] body, int offset, int count) : base(FrameType.FrameBody, channel)
{
NetworkBinaryWriter writer = base.GetWriter();
this.body = body;
this.offset = offset;
this.count = count;
}

public override void WritePayload(NetworkBinaryWriter writer)
{
writer.Write((uint)count);
writer.Write(body, offset, count);
}
}

public class MethodOutboundFrame : OutboundFrame
{
private readonly MethodBase method;

public MethodOutboundFrame(int channel, MethodBase method) : base(FrameType.FrameMethod, channel)
{
NetworkBinaryWriter writer = base.GetWriter();
this.method = method;
}

writer.Write(method.ProtocolClassId);
writer.Write(method.ProtocolMethodId);
public override void WritePayload(NetworkBinaryWriter writer)
{
var ms = new MemoryStream();
var nw = new NetworkBinaryWriter(ms);

var argWriter = new MethodArgumentWriter(writer);
nw.Write(method.ProtocolClassId);
nw.Write(method.ProtocolMethodId);

var argWriter = new MethodArgumentWriter(nw);
method.WriteArgumentsTo(argWriter);

argWriter.Flush();

var bufferSegment = ms.GetBufferSegment();
writer.Write((uint)bufferSegment.Count);
writer.Write(bufferSegment.Array, bufferSegment.Offset, bufferSegment.Count);
}
}

public class EmptyOutboundFrame : OutboundFrame
{
private static readonly byte[] m_emptyByteArray = new byte[0];

public EmptyOutboundFrame() : base(FrameType.FrameHeartbeat, 0)
{
base.GetWriter().Write(m_emptyByteArray);
}

public override string ToString()
public override void WritePayload(NetworkBinaryWriter writer)
{
return base.ToString() + string.Format("(type={0}, channel={1}, {2} bytes of payload)",
Type,
Channel,
Payload == null
? "(null)"
: Payload.Length.ToString());
writer.Write((uint)0);
}
}

public class OutboundFrame : Frame
public abstract class OutboundFrame : Frame
{
private readonly MemoryStream m_accumulator;
private readonly NetworkBinaryWriter writer;

public OutboundFrame(FrameType type, int channel) : base(type, channel)
{
m_accumulator = new MemoryStream();
writer = new NetworkBinaryWriter(m_accumulator);
}

public NetworkBinaryWriter GetWriter()
{
return writer;
}

public override string ToString()
{
return base.ToString() + string.Format("(type={0}, channel={1}, {2} bytes of payload)",
Type,
Channel,
Payload == null
? "(null)"
: Payload.Length.ToString());
}

public void WriteTo(NetworkBinaryWriter writer)
{
var payload = m_accumulator.ToArray();

writer.Write((byte)Type);
writer.Write((ushort)Channel);
writer.Write((uint)payload.Length);
writer.Write(payload);
WritePayload(writer);
writer.Write((byte)Constants.FrameEnd);
}

public abstract void WritePayload(NetworkBinaryWriter writer);
}

public class InboundFrame : Frame
Expand Down Expand Up @@ -252,16 +257,6 @@ public NetworkBinaryReader GetReader()
{
return new NetworkBinaryReader(new MemoryStream(base.Payload));
}

public override string ToString()
{
return base.ToString() + string.Format("(type={0}, channel={1}, {2} bytes of payload)",
base.Type,
base.Channel,
base.Payload == null
? "(null)"
: base.Payload.Length.ToString());
}
}

public class Frame
Expand All @@ -288,7 +283,7 @@ public Frame(FrameType type, int channel, byte[] payload)

public override string ToString()
{
return base.ToString() + string.Format("(type={0}, channel={1}, {2} bytes of payload)",
return string.Format("(type={0}, channel={1}, {2} bytes of payload)",
Type,
Channel,
Payload == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void SendHeader()
nbw.Write((byte)Endpoint.Protocol.MajorVersion);
nbw.Write((byte)Endpoint.Protocol.MinorVersion);
}
Write(ms.ToArray());
Write(ms.GetBufferSegment());
}

public void WriteFrame(OutboundFrame frame)
Expand All @@ -231,7 +231,7 @@ public void WriteFrame(OutboundFrame frame)
var nbw = new NetworkBinaryWriter(ms);
frame.WriteTo(nbw);
m_socket.Client.Poll(m_writeableStateTimeout, SelectMode.SelectWrite);
Write(ms.ToArray());
Write(ms.GetBufferSegment());
}

public void WriteFrameSet(IList<OutboundFrame> frames)
Expand All @@ -240,21 +240,21 @@ public void WriteFrameSet(IList<OutboundFrame> frames)
var nbw = new NetworkBinaryWriter(ms);
foreach (var f in frames) f.WriteTo(nbw);
m_socket.Client.Poll(m_writeableStateTimeout, SelectMode.SelectWrite);
Write(ms.ToArray());
Write(ms.GetBufferSegment());
}

private void Write(byte [] buffer)
private void Write(ArraySegment<byte> bufferSegment)
{
if(_ssl)
{
lock (_sslStreamLock)
{
m_writer.Write(buffer);
m_writer.Write(bufferSegment.Array, bufferSegment.Offset, bufferSegment.Count);
}
}
else
{
m_writer.Write(buffer);
m_writer.Write(bufferSegment.Array, bufferSegment.Offset, bufferSegment.Count);
}
}

Expand Down

0 comments on commit 646c289

Please sign in to comment.