Skip to content

Commit

Permalink
Merge pull request #2 from Azure/develop
Browse files Browse the repository at this point in the history
Getting latest develop from base fork
  • Loading branch information
nemakam authored Aug 10, 2018
2 parents af724f7 + c50c7fd commit 1a33d22
Show file tree
Hide file tree
Showing 30 changed files with 4,311 additions and 3,033 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,6 @@
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\Cbs\ICbsTokenProvider.cs">
<Link>Amqp\Cbs\ICbsTokenProvider.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\ICloseable.cs">
<Link>Amqp\ICloseable.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\IIoHandler.cs">
<Link>Amqp\IIoHandler.cs</Link>
</Compile>
Expand Down
3 changes: 0 additions & 3 deletions Microsoft.Azure.Amqp.PCL/Microsoft.Azure.Amqp.PCL.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\Cbs\ICbsTokenProvider.cs">
<Link>Amqp\Cbs\ICbsTokenProvider.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\ICloseable.cs">
<Link>Amqp\ICloseable.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\IIoHandler.cs">
<Link>Amqp\IIoHandler.cs</Link>
</Compile>
Expand Down
5 changes: 1 addition & 4 deletions Microsoft.Azure.Amqp.Uwp/Microsoft.Azure.Amqp.Uwp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<AssemblyName>Microsoft.Azure.Amqp</AssemblyName>
<DefaultLanguage>en-US</DefaultLanguage>
<TargetPlatformIdentifier>UAP</TargetPlatformIdentifier>
<TargetPlatformVersion>10.0.10586.0</TargetPlatformVersion>
<TargetPlatformVersion>10.0.14393.0</TargetPlatformVersion>
<TargetPlatformMinVersion>10.0.10240.0</TargetPlatformMinVersion>
<MinimumVisualStudioVersion>14</MinimumVisualStudioVersion>
<FileAlignment>512</FileAlignment>
Expand Down Expand Up @@ -480,9 +480,6 @@
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\Cbs\ICbsTokenProvider.cs">
<Link>Amqp\Cbs\ICbsTokenProvider.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\ICloseable.cs">
<Link>Amqp\ICloseable.cs</Link>
</Compile>
<Compile Include="..\Microsoft.Azure.Amqp\Amqp\IIoHandler.cs">
<Link>Amqp\IIoHandler.cs</Link>
</Compile>
Expand Down
2 changes: 2 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ protected bool SendDelivery(Delivery delivery)
transfer.DeliveryTag = delivery.DeliveryTag;
transfer.MessageFormat = delivery.MessageFormat ?? AmqpConstants.AmqpMessageFormat;
transfer.Batchable = delivery.Batchable;
transfer.State = delivery.State;
if (delivery.Settled)
{
transfer.Settled = true;
Expand Down Expand Up @@ -879,6 +880,7 @@ void OnReceiveTransfer(Transfer transfer, Frame rawFrame)
delivery.Settled = transfer.Settled();
delivery.Batchable = transfer.Batchable();
delivery.MessageFormat = transfer.MessageFormat;
delivery.State = transfer.State;
TransactionalState txnState = transfer.State as TransactionalState;
if (txnState != null)
{
Expand Down
25 changes: 17 additions & 8 deletions Microsoft.Azure.Amqp/Amqp/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,14 @@ public override long SerializedMessageSize
{
get
{
this.EnsureInitialized();
return this.bufferStream.Length;
if (this.bufferStream != null)
{
return this.bufferStream.Length;
}

// Do not cache the result stream
// as the message could be updated again
return this.Initialize().Length;
}
}

Expand All @@ -414,7 +420,7 @@ protected void EnsureInitialized()
{
if (!this.initialized)
{
this.Initialize();
this.bufferStream = this.Initialize();
this.initialized = true;
}
}
Expand All @@ -431,9 +437,12 @@ protected virtual void OnInitialize()

public override Stream ToStream()
{
bool more;
ArraySegment<byte>[] payload = this.GetPayload(int.MaxValue, out more);
return new BufferListStream(payload);
if (this.bufferStream != null)
{
return (Stream)this.bufferStream.Clone();
}

return this.Initialize();
}

protected abstract int GetBodySize();
Expand All @@ -444,7 +453,7 @@ protected virtual void AddCustomSegments(List<ArraySegment<byte>> segmentList)
{
}

void Initialize()
BufferListStream Initialize()
{
this.OnInitialize();

Expand Down Expand Up @@ -494,7 +503,7 @@ void Initialize()
}
}

this.bufferStream = new BufferListStream(segmentList.ToArray());
return new BufferListStream(segmentList.ToArray());
}
}

Expand Down
44 changes: 21 additions & 23 deletions Microsoft.Azure.Amqp/Amqp/Cbs/AmqpCbsLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,29 @@ namespace Microsoft.Azure.Amqp
/// <summary>
/// Encapsulates a pair of links to '$cbs' for managing CBS tokens
/// </summary>
public sealed class AmqpCbsLink : ICloseable
public sealed class AmqpCbsLink
{
readonly AmqpConnection connection;
readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> linkFactory;

/// <summary>
/// Constructs a new instance
/// </summary>
public AmqpCbsLink(AmqpConnection connection)
{
if (connection == null)
{
throw new ArgumentNullException(nameof(connection));
}

this.connection = connection;
this.FaultTolerantLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
this.connection = connection ?? throw new ArgumentNullException(nameof(connection));
this.linkFactory = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
t => TaskHelpers.CreateTask<RequestResponseAmqpLink>((c, s) => this.BeginCreateCbsLink(t, c, s), this.EndCreateCbsLink),
this.CloseLink);
link => CloseLink(link));

this.connection.Extensions.Add(this);
}

bool ICloseable.IsClosedOrClosing
public void Close()
{
get
{
return this.connection.IsClosing();
}
this.linkFactory.Close();
}

FaultTolerantAmqpObject<RequestResponseAmqpLink> FaultTolerantLink { get; set; }

public Task<DateTime> SendTokenAsync(ICbsTokenProvider tokenProvider, Uri namespaceAddress, string audience, string resource, string[] requiredClaims, TimeSpan timeout)
{
return TaskHelpers.CreateTask(
Expand All @@ -59,6 +50,11 @@ public IAsyncResult BeginSendToken(ICbsTokenProvider tokenProvider, Uri namespac
tokenProvider == null ? "tokenProvider" : namespaceAddress == null ? "namespaceAddress" : audience == null ? "audience" : resource == null ? "resource" : "requiredClaims");
}

if (this.connection.IsClosing())
{
throw new ObjectDisposedException(CbsConstants.CbsAddress);
}

return new SendTokenAsyncResult(this, tokenProvider, namespaceAddress, audience, resource, requiredClaims, timeout, callback, state);
}

Expand All @@ -67,6 +63,13 @@ public DateTime EndSendToken(IAsyncResult result)
return SendTokenAsyncResult.End(result).ExpiresAtUtc;
}

static void CloseLink(RequestResponseAmqpLink link)
{
AmqpSession session = link.SendingLink?.Session;
link.Abort();
session?.SafeClose();
}

IAsyncResult BeginCreateCbsLink(TimeSpan timeout, AsyncCallback callback, object state)
{
return new OpenCbsRequestResponseLinkAsyncResult(this.connection, timeout, callback, state);
Expand All @@ -78,11 +81,6 @@ RequestResponseAmqpLink EndCreateCbsLink(IAsyncResult result)
return link;
}

void CloseLink(RequestResponseAmqpLink link)
{
link.Session.SafeClose();
}

sealed class OpenCbsRequestResponseLinkAsyncResult : IteratorAsyncResult<OpenCbsRequestResponseLinkAsyncResult>, ILinkFactory
{
readonly AmqpConnection connection;
Expand Down Expand Up @@ -237,14 +235,14 @@ protected override IEnumerator<AsyncStep> GetAsyncSteps()
}

RequestResponseAmqpLink requestResponseLink;
if (this.cbsLink.FaultTolerantLink.TryGetOpenedObject(out requestResponseLink))
if (this.cbsLink.linkFactory.TryGetOpenedObject(out requestResponseLink))
{
this.requestResponseLinkTask = Task.FromResult(requestResponseLink);
}
else
{
yield return this.CallTask(
(thisPtr, t) => thisPtr.requestResponseLinkTask = thisPtr.cbsLink.FaultTolerantLink.GetOrCreateAsync(t),
(thisPtr, t) => thisPtr.requestResponseLinkTask = thisPtr.cbsLink.linkFactory.GetOrCreateAsync(t),
ExceptionPolicy.Transfer);
}

Expand Down
5 changes: 5 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/FaultTolerantAmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public bool TryGetOpenedObject(out T openedAmqpObject)
return openedAmqpObject != null;
}

protected override bool IsValid(T value)
{
return value.State == AmqpObjectState.Opened;
}

protected override async Task<T> OnCreateAsync(TimeSpan timeout)
{
T amqpObject = await this.createObjectAsync(timeout).ConfigureAwait(false);
Expand Down
10 changes: 0 additions & 10 deletions Microsoft.Azure.Amqp/Amqp/ICloseable.cs

This file was deleted.

66 changes: 52 additions & 14 deletions Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public sealed class ReceivingAmqpLink : AmqpLink

Action<AmqpMessage> messageListener;
SizeBasedFlowQueue messageQueue;
WorkCollection<ArraySegment<byte>, DisposeAsyncResult, Outcome> pendingDispositions;
WorkCollection<ArraySegment<byte>, DisposeAsyncResult, DeliveryState> pendingDispositions;
AmqpMessage currentMessage;
LinkedList<ReceiveAsyncResult> waiterList;

Expand Down Expand Up @@ -241,17 +241,27 @@ public bool EndReceiveMessages(IAsyncResult result, out IEnumerable<AmqpMessage>
}

public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout)
{
return this.DisposeMessageAsync(deliveryTag, AmqpConstants.NullBinary, outcome, batchable, timeout);
}

public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, TimeSpan timeout)
{
return TaskHelpers.CreateTask(
(c, s) => this.BeginDisposeMessage(deliveryTag, outcome, batchable, timeout, c, s),
(c, s) => this.BeginDisposeMessage(deliveryTag, txnId, outcome, batchable, timeout, c, s),
a => ((ReceivingAmqpLink)a.AsyncState).EndDisposeMessage(a),
this);
}

public IAsyncResult BeginDisposeMessage(ArraySegment<byte> deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout, AsyncCallback callback, object state)
{
return this.BeginDisposeMessage(deliveryTag, AmqpConstants.NullBinary, outcome, batchable, timeout, callback, state);
}

public IAsyncResult BeginDisposeMessage(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, TimeSpan timeout, AsyncCallback callback, object state)
{
this.ThrowIfClosed();
return new DisposeAsyncResult(this, deliveryTag, outcome, batchable, timeout, callback, state);
return new DisposeAsyncResult(this, deliveryTag, txnId, outcome, batchable, timeout, callback, state);
}

public Outcome EndDisposeMessage(IAsyncResult result)
Expand Down Expand Up @@ -318,7 +328,7 @@ protected override bool OpenInternal()
{
this.messageQueue = new SizeBasedFlowQueue(this);
this.waiterList = new LinkedList<ReceiveAsyncResult>();
this.pendingDispositions = new WorkCollection<ArraySegment<byte>, DisposeAsyncResult, Outcome>(ByteArrayComparer.Instance);
this.pendingDispositions = new WorkCollection<ArraySegment<byte>, DisposeAsyncResult, DeliveryState>(ByteArrayComparer.Instance);
bool syncComplete = base.OpenInternal();
if (this.LinkCredit > 0)
{
Expand All @@ -334,14 +344,10 @@ protected override void OnDisposeDeliveryInternal(Delivery delivery)
// in the EO delivery scenario, and also in transaction case.
AmqpTrace.Provider.AmqpDispose(this, delivery.DeliveryId.Value, delivery.Settled, delivery.State);
DeliveryState deliveryState = delivery.State;
if (delivery.Transactional())
{
deliveryState = ((TransactionalState)delivery.State).Outcome;
}

if (deliveryState != null)
{
this.pendingDispositions.CompleteWork(delivery.DeliveryTag, false, (Outcome)deliveryState);
this.pendingDispositions.CompleteWork(delivery.DeliveryTag, false, deliveryState);
}
}

Expand Down Expand Up @@ -735,16 +741,18 @@ static void OnTimer(object state)
}
}

sealed class DisposeAsyncResult : AsyncResult, IWork<Outcome>
sealed class DisposeAsyncResult : AsyncResult, IWork<DeliveryState>
{
readonly ReceivingAmqpLink link;
readonly ArraySegment<byte> deliveryTag;
readonly bool batchable;
Outcome outcome;
ArraySegment<byte> txnId;

public DisposeAsyncResult(
ReceivingAmqpLink link,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> deliveryTag,
ArraySegment<byte> txnId,
Outcome outcome,
bool batchable,
TimeSpan timeout,
Expand All @@ -756,6 +764,7 @@ public DisposeAsyncResult(
this.deliveryTag = deliveryTag;
this.batchable = batchable;
this.outcome = outcome;
this.txnId = txnId;
this.link.pendingDispositions.StartWork(deliveryTag, this);
}

Expand All @@ -766,16 +775,45 @@ public static Outcome End(IAsyncResult result)

public void Start()
{
if (!link.DisposeDelivery(deliveryTag, false, outcome, batchable))
DeliveryState deliveryState;
if (txnId.Array != null)
{
deliveryState = new TransactionalState()
{
Outcome = this.outcome,
TxnId = this.txnId
};
}
else
{
deliveryState = this.outcome;
}

if (!link.DisposeDelivery(deliveryTag, false, deliveryState, batchable))
{
// Delivery tag not found
link.pendingDispositions.CompleteWork(deliveryTag, true, AmqpConstants.RejectedNotFoundOutcome);
}
}

public void Done(bool completedSynchronously, Outcome outcome)
public void Done(bool completedSynchronously, DeliveryState state)
{
this.outcome = outcome;
if (state is Outcome outcome)
{
this.outcome = outcome;
}
else
{
if (state is TransactionalState transactionalState)
{
this.outcome = transactionalState.Outcome;
}
else
{
this.Complete(completedSynchronously, new AmqpException(AmqpErrorCode.IllegalState, $"DeliveryState '{state.GetType()}' is not valid for disposition."));
}
}

this.Complete(completedSynchronously);
}

Expand Down
Loading

0 comments on commit 1a33d22

Please sign in to comment.