Skip to content

Commit

Permalink
cbs fix: abort pending calls and reject new calls when connection is …
Browse files Browse the repository at this point in the history
…closing
  • Loading branch information
xinchen10 committed Jun 29, 2018
1 parent c10fa90 commit bb45646
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,6 @@
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\Cbs\ICbsTokenProvider.cs">
<Link>Amqp\Cbs\ICbsTokenProvider.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\ICloseable.cs">
<Link>Amqp\ICloseable.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\IIoHandler.cs">
<Link>Amqp\IIoHandler.cs</Link>
</Compile>
Expand Down
3 changes: 0 additions & 3 deletions Microsoft.Azure.Amqp.PCL/Microsoft.Azure.Amqp.PCL.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\Cbs\ICbsTokenProvider.cs">
<Link>Amqp\Cbs\ICbsTokenProvider.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\ICloseable.cs">
<Link>Amqp\ICloseable.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\IIoHandler.cs">
<Link>Amqp\IIoHandler.cs</Link>
</Compile>
Expand Down
3 changes: 0 additions & 3 deletions Microsoft.Azure.Amqp.Uwp/Microsoft.Azure.Amqp.Uwp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -480,9 +480,6 @@
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\Cbs\ICbsTokenProvider.cs">
<Link>Amqp\Cbs\ICbsTokenProvider.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\ICloseable.cs">
<Link>Amqp\ICloseable.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\IIoHandler.cs">
<Link>Amqp\IIoHandler.cs</Link>
</Compile>
Expand Down
44 changes: 21 additions & 23 deletions Microsoft.Azure.Amqp/Amqp/Cbs/AmqpCbsLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,29 @@ namespace Microsoft.Azure.Amqp
/// <summary>
/// Encapsulates a pair of links to '$cbs' for managing CBS tokens
/// </summary>
public sealed class AmqpCbsLink : ICloseable
public sealed class AmqpCbsLink
{
readonly AmqpConnection connection;
readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> linkFactory;

/// <summary>
/// Constructs a new instance
/// </summary>
public AmqpCbsLink(AmqpConnection connection)
{
if (connection == null)
{
throw new ArgumentNullException(nameof(connection));
}

this.connection = connection;
this.FaultTolerantLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
this.connection = connection ?? throw new ArgumentNullException(nameof(connection));
this.linkFactory = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
t => TaskHelpers.CreateTask<RequestResponseAmqpLink>((c, s) => this.BeginCreateCbsLink(t, c, s), this.EndCreateCbsLink),
this.CloseLink);
link => CloseLink(link));

this.connection.Extensions.Add(this);
}

bool ICloseable.IsClosedOrClosing
public void Close()
{
get
{
return this.connection.IsClosing();
}
this.linkFactory.Close();
}

FaultTolerantAmqpObject<RequestResponseAmqpLink> FaultTolerantLink { get; set; }

public Task<DateTime> SendTokenAsync(ICbsTokenProvider tokenProvider, Uri namespaceAddress, string audience, string resource, string[] requiredClaims, TimeSpan timeout)
{
return TaskHelpers.CreateTask(
Expand All @@ -59,6 +50,11 @@ public IAsyncResult BeginSendToken(ICbsTokenProvider tokenProvider, Uri namespac
tokenProvider == null ? "tokenProvider" : namespaceAddress == null ? "namespaceAddress" : audience == null ? "audience" : resource == null ? "resource" : "requiredClaims");
}

if (this.connection.IsClosing())
{
throw new ObjectDisposedException(CbsConstants.CbsAddress);
}

return new SendTokenAsyncResult(this, tokenProvider, namespaceAddress, audience, resource, requiredClaims, timeout, callback, state);
}

Expand All @@ -67,6 +63,13 @@ public DateTime EndSendToken(IAsyncResult result)
return SendTokenAsyncResult.End(result).ExpiresAtUtc;
}

static void CloseLink(RequestResponseAmqpLink link)
{
AmqpSession session = link.SendingLink?.Session;
link.Abort();
session?.SafeClose();
}

IAsyncResult BeginCreateCbsLink(TimeSpan timeout, AsyncCallback callback, object state)
{
return new OpenCbsRequestResponseLinkAsyncResult(this.connection, timeout, callback, state);
Expand All @@ -78,11 +81,6 @@ RequestResponseAmqpLink EndCreateCbsLink(IAsyncResult result)
return link;
}

void CloseLink(RequestResponseAmqpLink link)
{
link.Session.SafeClose();
}

sealed class OpenCbsRequestResponseLinkAsyncResult : IteratorAsyncResult<OpenCbsRequestResponseLinkAsyncResult>, ILinkFactory
{
readonly AmqpConnection connection;
Expand Down Expand Up @@ -237,14 +235,14 @@ protected override IEnumerator<AsyncStep> GetAsyncSteps()
}

RequestResponseAmqpLink requestResponseLink;
if (this.cbsLink.FaultTolerantLink.TryGetOpenedObject(out requestResponseLink))
if (this.cbsLink.linkFactory.TryGetOpenedObject(out requestResponseLink))
{
this.requestResponseLinkTask = Task.FromResult(requestResponseLink);
}
else
{
yield return this.CallTask(
(thisPtr, t) => thisPtr.requestResponseLinkTask = thisPtr.cbsLink.FaultTolerantLink.GetOrCreateAsync(t),
(thisPtr, t) => thisPtr.requestResponseLinkTask = thisPtr.cbsLink.linkFactory.GetOrCreateAsync(t),
ExceptionPolicy.Transfer);
}

Expand Down
10 changes: 0 additions & 10 deletions Microsoft.Azure.Amqp/Amqp/ICloseable.cs

This file was deleted.

0 comments on commit bb45646

Please sign in to comment.