Skip to content
This repository has been archived by the owner on Oct 14, 2024. It is now read-only.

Commit

Permalink
Implement Unsubscribe and LastWill functions in C# MQTT SDK (#2)
Browse files Browse the repository at this point in the history
* Implement Unsubscribe and LastWill functions in C# MQTT SDK

* Make setters internal and add topic ID and client ID to exception messages

* Change exception message for unsubscribe and add extra test for subscribe and unsubscribe. Also commit example MQTT project

* Fix Example Project path

* Add tests for Scenarios when Connection cant be established and unsubscribe throws exception
  • Loading branch information
ashhSAG93 authored Oct 19, 2021
1 parent 78fb7d5 commit 71e6020
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 5 deletions.
21 changes: 18 additions & 3 deletions MQTT-SDK/Cumulocity.MQTT.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27130.2036
# Visual Studio Version 16
VisualStudioVersion = 16.0.31729.503
MinimumVisualStudioVersion = 15.0.26124.0
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{A261881D-97AE-4C22-A15A-D17B32300675}"
EndProject
Expand All @@ -26,7 +26,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "examples", "examples", "{8B
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloJsonExample", "examples\HelloJsonExample\HelloJsonExample.csproj", "{1CC59AC8-766E-4A58-B293-FAC77B76BB95}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloMqttNet", "examples\HelloMqttNet\HelloMqttNet.csproj", "{6FD13776-8906-470D-ADA9-E8D935FFA94E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "HelloMqttNet", "examples\HelloMqttNet\HelloMqttNet.csproj", "{6FD13776-8906-470D-ADA9-E8D935FFA94E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloMQTTNet2", "examples\HelloMQTTNet2\HelloMQTTNet2.csproj", "{A19E8DCB-B91A-4A93-8132-7146966C12F7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -110,6 +112,18 @@ Global
{6FD13776-8906-470D-ADA9-E8D935FFA94E}.Release|x64.Build.0 = Release|Any CPU
{6FD13776-8906-470D-ADA9-E8D935FFA94E}.Release|x86.ActiveCfg = Release|Any CPU
{6FD13776-8906-470D-ADA9-E8D935FFA94E}.Release|x86.Build.0 = Release|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Debug|x64.ActiveCfg = Debug|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Debug|x64.Build.0 = Debug|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Debug|x86.ActiveCfg = Debug|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Debug|x86.Build.0 = Debug|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Release|Any CPU.Build.0 = Release|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Release|x64.ActiveCfg = Release|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Release|x64.Build.0 = Release|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Release|x86.ActiveCfg = Release|Any CPU
{A19E8DCB-B91A-4A93-8132-7146966C12F7}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -121,6 +135,7 @@ Global
{48EA55D1-C3DE-42F3-8A44-45EC3D5EEAF3} = {F1945BBB-C6C9-47F3-8EDB-669C41B35FFA}
{1CC59AC8-766E-4A58-B293-FAC77B76BB95} = {8BE06300-97CB-4A9B-81EA-65AD4374D240}
{6FD13776-8906-470D-ADA9-E8D935FFA94E} = {8BE06300-97CB-4A9B-81EA-65AD4374D240}
{A19E8DCB-B91A-4A93-8132-7146966C12F7} = {8BE06300-97CB-4A9B-81EA-65AD4374D240}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {E0DC5597-A814-4157-B095-157DC5D30920}
Expand Down
12 changes: 12 additions & 0 deletions MQTT-SDK/examples/HelloMQTTNet2/HelloMQTTNet2.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Cumulocity.SDK.MQTT\Cumulocity.SDK.MQTT.csproj" />
</ItemGroup>

</Project>
109 changes: 109 additions & 0 deletions MQTT-SDK/examples/HelloMQTTNet2/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Cumulocity.SDK.MQTT.Model;
using Cumulocity.SDK.MQTT.Model.ConnectionOptions;
using Cumulocity.SDK.MQTT.Model.MqttMessage;
using MqttClient = Cumulocity.SDK.MQTT.MqttClient;

namespace MQTTApp
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("The application has started. Press Ctrl-C to stop it.");

var cSource = new CancellationTokenSource();
var myTask = Task.Factory.StartNew(() => RunJsonViaMqttClientAsync(cSource.Token), cSource.Token);
Console.CancelKeyPress += (sender, eventArgs) => cSource.Cancel();
myTask.Wait(cSource.Token);

Console.WriteLine("Now shutting down");
}

private static async Task RunJsonViaMqttClientAsync(CancellationToken cToken)
{
const string serverUrl = "<server url>";
const string clientId = "my_mqtt_cs_client";
const string device_name = "My CS MQTT device";
const string user = "<tenant>/<user>";
const string password = "<password>";

// connections details
var cDetails = new ConnectionDetailsBuilder()
.WithClientId(clientId)
.WithHost(serverUrl)
.WithCredentials(user, password)
.WithCleanSession(false)
.WithProtocol(TransportType.Tcp)
.Build();

// Configure MQTT connection with details provided above and connect eith Cumulocity.
MqttClient client = new MqttClient(cDetails);
client.MessageReceived += Client_MessageReceived;
client.Connected += Client_Connected;
client.ConnectionFailed += Client_ConnectionFailed;
await client.EstablishConnectionAsync();

// Crate new device with name device_name and type c8y_MQTDevice
string topic = "s/us";
string payload = $"100,{device_name}, c8y_MQTTDevice";
var message = new MqttMessageRequestBuilder()
.WithTopicName(topic)
.WithQoS(QoS.EXACTLY_ONCE)
.WithMessageContent(payload)
.Build();

await client.PublishAsync(message);

// set device's hardware information
var deviceMessage = new MqttMessageRequestBuilder()
.WithTopicName("s/us")
.WithQoS(QoS.EXACTLY_ONCE)
.WithMessageContent("110, S123456789, MQTT test model, Rev0.1")
.Build();

await client.PublishAsync(deviceMessage);

// add restart operation
await client.SubscribeAsync(new MqttMessageRequest() { TopicName = "s/ds" });
await client.SubscribeAsync(new MqttMessageRequest() { TopicName = "s/e" });

await client.PublishAsync(new MqttMessageRequestBuilder()
.WithTopicName("s/us")
.WithQoS(QoS.EXACTLY_ONCE)
.WithMessageContent("114,c8y_Restart")
.Build());

// generate a random temperature (10º-20º) measurement and send it every second
Random rnd = new Random();
while (!cToken.IsCancellationRequested)
{
int temp = rnd.Next(10, 20);
Console.WriteLine("Sending temperature measurement (" + temp + "º) ...");
await client.PublishAsync(new MqttMessageRequestBuilder()
.WithTopicName("s/us")
.WithQoS(QoS.EXACTLY_ONCE)
.WithMessageContent("211," + temp)
.Build());
Thread.Sleep(1000);
}
}

private static void Client_ConnectionFailed(object sender, ProcessFailedEventArgs e)
{
Console.WriteLine("Connection failed");
}

private static void Client_Connected(object sender, ClientConnectedEventArgs e)
{
Console.WriteLine("Client connected.");
}

private static void Client_MessageReceived(object sender, IMqttMessageResponse e)
{
var content = e.MessageContent;
}
}
}
8 changes: 8 additions & 0 deletions MQTT-SDK/src/Cumulocity.SDK.MQTT/IMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ public interface IMqttClient
/// <exception cref="MqttDeviceSDKException"> </exception>
Task SubscribeAsync(IMqttMessageRequest message);

/// <summary>
/// Unsubscribe from a particular topic
/// </summary>
/// <param name="topic">
/// </param>
/// <exception cref="MqttDeviceSDKException"> </exception>
Task UnsubscribeAsync(string topic);

/// <summary>
/// Disconnects the client from the broker
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,11 @@ public class ConnectionDetails : IConnectionDetails
/// </value>
public TransportType Protocol { get; set; }

/// <summary>
/// The "last will" message that is specified at connection time and that
/// is executed when the client loses the connection.
/// </summary>
public LastWillDetails LastWill { get; set; }

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ public ConnectionDetailsBuilder WithTls()
return this;
}

public ConnectionDetailsBuilder WithLastWill(LastWillDetails lastWill)
{
_options.LastWill = lastWill;
return this;
}

public IConnectionDetails Build()
{
return _options;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public interface IConnectionDetails
TransportType Protocol { get; }

bool UseTls { get; }

LastWillDetails LastWill { get; }
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cumulocity.SDK.MQTT.Model
{
public interface ILastWillDetails
{
/// <summary>
/// The topic to publish to
/// </summary>

string Topic { get; }

/// <summary>
/// The quality of service to publish the message at (0, 1 or 2)
/// </summary>
QoS qoS { get; }

/// <summary>
/// Message content
/// </summary>
string Message { get; }

/// <summary>
/// Whether or not the message should be retained
/// </summary>
bool Retained { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cumulocity.SDK.MQTT.Model.ConnectionOptions
{
public class LastWillDetails : ILastWillDetails
{
private string _topic;
/// <summary>
/// The topic to publish to
/// </summary>
public string Topic { get => _topic; }

internal void SetTopic(string topic) { _topic = topic; }

private QoS _qos;
/// <summary>
/// The quality of service to publish the message at (0, 1 or 2)
/// </summary>
public QoS qoS { get => _qos; }

internal void SetQoS(QoS qos) { _qos = qos; }

private string _message;
/// <summary>
/// Message content
/// </summary>
public string Message { get => _message; }

internal void SetMessage(string message) { _message = message; }

private bool _retained;
/// <summary>
/// Whether or not the message should be retained
/// </summary>
public bool Retained { get => _retained; }

internal void SetRetained(bool retained) { _retained = retained; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace Cumulocity.SDK.MQTT.Model.ConnectionOptions
{
public class LastWillDetailsBuilder
{
private readonly LastWillDetails _lastWillDetails = new LastWillDetails();

public LastWillDetailsBuilder WithTopic(string topic)
{
_lastWillDetails.SetTopic(topic);
return this;
}

public LastWillDetailsBuilder WithQoS(QoS qos)
{
_lastWillDetails.SetQoS(qos);
return this;
}

public LastWillDetailsBuilder WithMessage(string message)
{
_lastWillDetails.SetMessage(message);
return this;
}

public LastWillDetailsBuilder WithRetained(bool retained)
{
_lastWillDetails.SetRetained(retained);
return this;
}

public ILastWillDetails Build()
{
return _lastWillDetails;
}
}
}
25 changes: 23 additions & 2 deletions MQTT-SDK/src/Cumulocity.SDK.MQTT/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public async Task PublishAsync(IMqttMessageRequest message)

if (!MqttTopicValidator.IsTopicValidForPublish(message.TopicName))
{
throw new MqttDeviceSDKException("Invalid topic to publish.");
throw new MqttDeviceSDKException($"Invalid topic to publish. Topic name: {message.TopicName}, Client Id: {ConnectionDetails.ClientId}");
}

try
Expand All @@ -97,7 +97,7 @@ public async Task SubscribeAsync(IMqttMessageRequest message)

if (!MqttTopicValidator.IsTopicValidForSubscribe(message.TopicName))
{
throw new MqttDeviceSDKException("Invalid topic to subscribe.");
throw new MqttDeviceSDKException($"Invalid topic to subscribe. Topic name: {message.TopicName}, Client Id: {ConnectionDetails.ClientId}");
}

try
Expand All @@ -110,5 +110,26 @@ public async Task SubscribeAsync(IMqttMessageRequest message)
}
}

public async Task UnsubscribeAsync(string topic)
{
if (!OperationsProvider.ConnectionEstablished)
{
throw new MqttDeviceSDKException("Unsubscribe can happen only when client is initialized, " +
$"and connection to server established. Topic name: {topic}, Client Id: {ConnectionDetails.ClientId}");
}

if (!MqttTopicValidator.IsTopicValidForSubscribe(topic))
{
throw new MqttDeviceSDKException($"The topic cannot be subscribed to. Topic name: {topic}, Client Id: {ConnectionDetails.ClientId}");
}
try
{
await OperationsProvider.UnsubscribeAsync(topic);
}
catch (System.Exception ex)
{
throw new MqttDeviceSDKException($"Unable to unsubscribe from topic {topic} for clientId {ConnectionDetails.ClientId} : ", ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface IOperationsProvider

Task SubscribeAsync(IMqttMessageRequest message);

Task UnsubscribeAsync(string topic);

Task Disconnect();

bool ConnectionEstablished { get; }
Expand Down
Loading

0 comments on commit 71e6020

Please sign in to comment.