Skip to content

Commit

Permalink
Close connection when all links are closed (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
varunpuranik authored Oct 6, 2018
1 parent add3efd commit 52c631f
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,31 @@ public async Task RegisterLinkHandler(ILinkHandler linkHandler)
}
}

public async Task RemoveLinkHandler(ILinkHandler linkHandler)
{
Preconditions.CheckNotNull(linkHandler);
using (await this.registryUpdateLock.LockAsync())
{
if (this.registry.ContainsKey(linkHandler.Type))
{
this.registry.Remove(linkHandler.Type);
if (this.registry.Count == 0)
{
await this.CloseConnection();
}
}
}
}

async Task CloseConnection()
{
using (await this.initializationLock.LockAsync())
{
this.isInitialized = false;
await (this.deviceListener?.CloseAsync() ?? Task.CompletedTask);
}
}

public class DeviceProxy : IDeviceProxy
{
readonly ConnectionHandler connectionHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ public interface IConnectionHandler
Task<AmqpAuthentication> GetAmqpAuthentication();

Task RegisterLinkHandler(ILinkHandler linkHandler);

Task RemoveLinkHandler(ILinkHandler linkHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public async Task OpenAsync(TimeSpan timeout)
protected virtual void OnLinkClosed(object sender, EventArgs args)
{
Events.Closed(this);
this.ConnectionHandler.RemoveLinkHandler(this);
}

public async Task CloseAsync(TimeSpan timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,52 @@ public async Task RegisterDesiredPropertiesUpdateSenderTest()
Assert.NotNull(receivedMessage);
Assert.Equal(messageToSend, receivedMessage);
}

[Fact]
public async Task CloseOnRemovingAllLinksTest()
{
// Arrange
var deviceListener = new Mock<IDeviceListener>();
deviceListener.Setup(d => d.CloseAsync()).Returns(Task.CompletedTask);
var identity = Mock.Of<IIdentity>(i => i.Id == "d1/m1");
var clientCredentials = Mock.Of<IClientCredentials>(c => c.Identity == identity);
var connectionProvider = Mock.Of<IConnectionProvider>(c => c.GetDeviceListenerAsync(clientCredentials) == Task.FromResult(deviceListener.Object));
deviceListener.Setup(d => d.BindDeviceProxy(It.IsAny<IDeviceProxy>()));

var amqpAuthentication = new AmqpAuthentication(true, Option.Some(clientCredentials));
var cbsNode = Mock.Of<ICbsNode>(c => c.GetAmqpAuthentication() == Task.FromResult(amqpAuthentication));
var amqpConnection = Mock.Of<IAmqpConnection>(c => c.FindExtension<ICbsNode>() == cbsNode);
var connectionHandler = new ConnectionHandler(amqpConnection, connectionProvider);

var eventsLinkHandler = Mock.Of<ILinkHandler>(l => l.Type == LinkType.Events);
string twinCorrelationId = Guid.NewGuid().ToString();
var twinReceivingLinkHander = Mock.Of<ILinkHandler>(l => l.Type == LinkType.TwinReceiving && l.CorrelationId == twinCorrelationId);
var twinSendingLinkHandler = Mock.Of<ILinkHandler>(l => l.Type == LinkType.TwinSending && l.CorrelationId == twinCorrelationId);
string methodCorrelationId = Guid.NewGuid().ToString();
var methodReceivingLinkHander = Mock.Of<ILinkHandler>(l => l.Type == LinkType.MethodReceiving && l.CorrelationId == methodCorrelationId);
var methodSendingLinkHandler = Mock.Of<ILinkHandler>(l => l.Type == LinkType.MethodSending && l.CorrelationId == methodCorrelationId);

// Act
await connectionHandler.GetDeviceListener();
await connectionHandler.RegisterLinkHandler(eventsLinkHandler);
await connectionHandler.RegisterLinkHandler(twinReceivingLinkHander);
await connectionHandler.RegisterLinkHandler(twinSendingLinkHandler);
await connectionHandler.RegisterLinkHandler(methodSendingLinkHandler);
await connectionHandler.RegisterLinkHandler(methodReceivingLinkHander);

await connectionHandler.RemoveLinkHandler(eventsLinkHandler);
await connectionHandler.RemoveLinkHandler(twinReceivingLinkHander);
await connectionHandler.RemoveLinkHandler(twinSendingLinkHandler);
await connectionHandler.RemoveLinkHandler(methodSendingLinkHandler);

// Assert
deviceListener.Verify(d => d.CloseAsync(), Times.Never);

// Act
await connectionHandler.RemoveLinkHandler(methodReceivingLinkHander);

// Assert
deviceListener.Verify(d => d.CloseAsync(), Times.Once);
}
}
}

0 comments on commit 52c631f

Please sign in to comment.