From 8b1fb6752a79591391b65c81a20e7bb65431b948 Mon Sep 17 00:00:00 2001 From: Varun Puranik Date: Wed, 19 Sep 2018 15:08:20 -0700 Subject: [PATCH] Twin DP sub fix (#323) --- .../linkhandlers/TwinReceivingLinkHandler.cs | 41 ++++++++++++++++++- .../device/DeviceMessageHandler.cs | 32 +++++++++++++++ .../device/IDeviceListener.cs | 4 ++ 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Amqp/linkhandlers/TwinReceivingLinkHandler.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Amqp/linkhandlers/TwinReceivingLinkHandler.cs index c83852dbf5c..3b56e1dc55e 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Amqp/linkhandlers/TwinReceivingLinkHandler.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Amqp/linkhandlers/TwinReceivingLinkHandler.cs @@ -19,6 +19,8 @@ public class TwinReceivingLinkHandler : ReceivingLinkHandler { public const string TwinPatch = "PATCH"; public const string TwinGet = "GET"; + public const string TwinPut = "PUT"; + public const string TwinDelete = "DELETE"; public TwinReceivingLinkHandler(IReceivingAmqpLink link, Uri requestUri, IDictionary boundVariables, IMessageConverter messageConverter) @@ -51,14 +53,39 @@ protected override async Task OnMessageReceived(AmqpMessage amqpMessage) Events.InvalidCorrelationId(this); return; } + await this.DeviceListener.SendGetTwinRequest(correlationId); Events.ProcessedTwinGetRequest(this); break; + case TwinPatch: EdgeMessage reportedPropertiesMessage = new EdgeMessage.Builder(amqpMessage.GetPayloadBytes()).Build(); await this.DeviceListener.UpdateReportedPropertiesAsync(reportedPropertiesMessage, correlationId); Events.ProcessedTwinReportedPropertiesUpdate(this); break; + + case TwinPut: + if (string.IsNullOrWhiteSpace(correlationId)) + { + Events.InvalidCorrelationId(this); + return; + } + + await this.DeviceListener.AddDesiredPropertyUpdatesSubscription(correlationId); + Events.ProcessedDesiredPropertyUpdatesSubscriptionRequest(this, correlationId); + break; + + case TwinDelete: + if (string.IsNullOrWhiteSpace(correlationId)) + { + Events.InvalidCorrelationId(this); + return; + } + + await this.DeviceListener.RemoveDesiredPropertyUpdatesSubscription(correlationId); + Events.ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest(this, correlationId); + break; + default: Events.InvalidOperation(this, operation); break; @@ -74,7 +101,9 @@ enum EventIds { InvalidOperation = IdStart, ProcessedTwinGetRequest, - ProcessedTwinReportedPropertiesUpdate + ProcessedTwinReportedPropertiesUpdate, + ProcessedDesiredPropertyUpdatesSubscriptionRequest, + ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest } public static void InvalidOperation(TwinReceivingLinkHandler handler) @@ -101,6 +130,16 @@ public static void InvalidCorrelationId(TwinReceivingLinkHandler handler) { Log.LogWarning((int)EventIds.InvalidOperation, $"Cannot process message on link {handler.LinkUri} because no correlation ID was specified"); } + + public static void ProcessedDesiredPropertyUpdatesSubscriptionRequest(TwinReceivingLinkHandler handler, string correlationId) + { + Log.LogDebug((int)EventIds.ProcessedDesiredPropertyUpdatesSubscriptionRequest, $"Processed Twin desired properties subscription for {handler.ClientId} on request {correlationId}"); + } + + public static void ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest(TwinReceivingLinkHandler handler, string correlationId) + { + Log.LogDebug((int)EventIds.ProcessedDesiredPropertyUpdatesSubscriptionRemovalRequest, $"Processed removing Twin desired properties subscription for {handler.ClientId} on request {correlationId}"); + } } } } diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs index d7cb77e5269..8c52129ee13 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/DeviceMessageHandler.cs @@ -117,6 +117,38 @@ public async Task ProcessMessageFeedbackAsync(string messageId, FeedbackStatus f public Task RemoveSubscription(DeviceSubscription subscription) => this.edgeHub.RemoveSubscription(this.Identity.Id, subscription); + public async Task AddDesiredPropertyUpdatesSubscription(string correlationId) + { + await this.edgeHub.AddSubscription(this.Identity.Id, DeviceSubscription.DesiredPropertyUpdates); + if (!string.IsNullOrWhiteSpace(correlationId)) + { + IMessage responseMessage = new EdgeMessage.Builder(new byte[0]) + .SetSystemProperties(new Dictionary + { + [SystemProperties.CorrelationId] = correlationId, + [SystemProperties.StatusCode] = ((int)HttpStatusCode.OK).ToString() + }) + .Build(); + await this.SendTwinUpdate(responseMessage); + } + } + + public async Task RemoveDesiredPropertyUpdatesSubscription(string correlationId) + { + await this.edgeHub.RemoveSubscription(this.Identity.Id, DeviceSubscription.DesiredPropertyUpdates); + if (!string.IsNullOrWhiteSpace(correlationId)) + { + IMessage responseMessage = new EdgeMessage.Builder(new byte[0]) + .SetSystemProperties(new Dictionary + { + [SystemProperties.CorrelationId] = correlationId, + [SystemProperties.StatusCode] = ((int)HttpStatusCode.OK).ToString() + }) + .Build(); + await this.SendTwinUpdate(responseMessage); + } + } + public async Task SendGetTwinRequest(string correlationId) { try diff --git a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/IDeviceListener.cs b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/IDeviceListener.cs index 6f9e61143e9..d4af8a6b33c 100644 --- a/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/IDeviceListener.cs +++ b/edge-hub/src/Microsoft.Azure.Devices.Edge.Hub.Core/device/IDeviceListener.cs @@ -29,5 +29,9 @@ public interface IDeviceListener Task AddSubscription(DeviceSubscription subscription); Task RemoveSubscription(DeviceSubscription subscription); + + Task AddDesiredPropertyUpdatesSubscription(string correlationId); + + Task RemoveDesiredPropertyUpdatesSubscription(string correlationId); } }