Skip to content

Commit

Permalink
Merge pull request rabbitmq#241 from GoFightNguyen/master
Browse files Browse the repository at this point in the history
dotnet-visual-studio: Tutorial 7 Publisher Confirms
  • Loading branch information
michaelklishin authored Sep 19, 2019
2 parents beb65a2 + ec132af commit 42b0adb
Show file tree
Hide file tree
Showing 7 changed files with 353 additions and 95 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{A46637FF-A028-4FF9-A70F-82CB32EC3A74}</ProjectGuid>
<OutputType>Exe</OutputType>
<RootNamespace>_7_PublisherConfirms</RootNamespace>
<AssemblyName>7_PublisherConfirms</AssemblyName>
<TargetFrameworkVersion>v4.5.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<Deterministic>true</Deterministic>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="Microsoft.Diagnostics.Tracing.EventSource, Version=1.1.28.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a, processorArchitecture=MSIL">
<HintPath>..\packages\Microsoft.Diagnostics.Tracing.EventSource.Redist.1.1.28\lib\net40\Microsoft.Diagnostics.Tracing.EventSource.dll</HintPath>
</Reference>
<Reference Include="RabbitMQ.Client, Version=5.0.0.0, Culture=neutral, PublicKeyToken=89e7d7c5feba84ce, processorArchitecture=MSIL">
<HintPath>..\packages\RabbitMQ.Client.5.1.0\lib\net451\RabbitMQ.Client.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Net.Http" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<None Include="App.config" />
<None Include="packages.config" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>
6 changes: 6 additions & 0 deletions dotnet-visual-studio/7_PublisherConfirms/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5.1" />
</startup>
</configuration>
138 changes: 138 additions & 0 deletions dotnet-visual-studio/7_PublisherConfirms/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using RabbitMQ.Client;
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Text;
using System.Linq;
using System.Threading;

class PublisherConfirms
{
private const int MESSAGE_COUNT = 50_000;

public static void Main()
{
PublishMessagesIndividually();
PublishMessagesInBatch();
HandlePublishConfirmsAsynchronously();
}

private static IConnection CreateConnection()
{
var factory = new ConnectionFactory { HostName = "localhost" };
return factory.CreateConnection();
}

private static void PublishMessagesIndividually()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
}
timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages individually in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static void PublishMessagesInBatch()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var batchSize = 100;
var outstandingMessageCount = 0;
var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
outstandingMessageCount++;

if (outstandingMessageCount == batchSize)
{
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
outstandingMessageCount = 0;
}
}

if (outstandingMessageCount > 0)
channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));

timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages in batch in {timer.ElapsedMilliseconds:N0} ms");
}
}

private static void HandlePublishConfirmsAsynchronously()
{
using (var connection = CreateConnection())
using (var channel = connection.CreateModel())
{
var queueName = channel.QueueDeclare().QueueName;
channel.ConfirmSelect();

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

void cleanOutstandingConfirms(ulong sequenceNumber, bool multiple)
{
if (multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= sequenceNumber);
foreach (var entry in confirmed)
outstandingConfirms.TryRemove(entry.Key, out _);
}
else
outstandingConfirms.TryRemove(sequenceNumber, out _);
}

channel.BasicAcks += (sender, ea) => cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
channel.BasicNacks += (sender, ea) =>
{
outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body {body} has been nack-ed. Sequence number: {ea.DeliveryTag}, multiple: {ea.Multiple}");
cleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
};

var timer = new Stopwatch();
timer.Start();
for (int i = 0; i < MESSAGE_COUNT; i++)
{
var body = i.ToString();
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, i.ToString());
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: Encoding.UTF8.GetBytes(body));
}

if (!WaitUntil(60, () => outstandingConfirms.IsEmpty))
throw new Exception("All messages could not be confirmed in 60 seconds");

timer.Stop();
Console.WriteLine($"Published {MESSAGE_COUNT:N0} messages and handled confirm asynchronously {timer.ElapsedMilliseconds:N0} ms");
}
}

private static bool WaitUntil(int numberOfSeconds, Func<bool> condition)
{
int waited = 0;
while (!condition() && waited < numberOfSeconds * 1000)
{
Thread.Sleep(100);
waited += 100;
}

return condition();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("7_PublisherConfirms")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("7_PublisherConfirms")]
[assembly: AssemblyCopyright("Copyright © 2019")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("a46637ff-a028-4ff9-a70f-82cb32ec3a74")]

// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]
5 changes: 5 additions & 0 deletions dotnet-visual-studio/7_PublisherConfirms/packages.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Microsoft.Diagnostics.Tracing.EventSource.Redist" version="1.1.28" targetFramework="net451" />
<package id="RabbitMQ.Client" version="5.1.0" targetFramework="net451" />
</packages>
4 changes: 4 additions & 0 deletions dotnet-visual-studio/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,7 @@ time opening it in order to get the .NET RabbitMQ dependency from NuGet.

- 6_RPCServer
- 6_RPCClient

#### [Tutorial 7: Publisher Confirms](https://www.rabbitmq.com/tutorial-seven-dotnet.html)

- 7_PublisherConfirms
Loading

0 comments on commit 42b0adb

Please sign in to comment.