Skip to content

Commit

Permalink
Merge pull request #65 from rajeevmv/master
Browse files Browse the repository at this point in the history
Merge changes that are part of version 2.0.6
  • Loading branch information
xinchen10 authored Jun 2, 2017
2 parents c579d73 + 8b30f90 commit f137b5a
Show file tree
Hide file tree
Showing 92 changed files with 2,819 additions and 1,696 deletions.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
773 changes: 579 additions & 194 deletions Microsoft.Azure.Amqp.Android/Microsoft.Azure.Amqp.Android.csproj

Large diffs are not rendered by default.

Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
692 changes: 517 additions & 175 deletions Microsoft.Azure.Amqp.PCL/Microsoft.Azure.Amqp.PCL.csproj

Large diffs are not rendered by default.

Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
709 changes: 530 additions & 179 deletions Microsoft.Azure.Amqp.Uwp/Microsoft.Azure.Amqp.Uwp.csproj

Large diffs are not rendered by default.

Empty file.
8 changes: 4 additions & 4 deletions Microsoft.Azure.Amqp/ActionItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void Schedule()
{
if (isScheduled)
{
throw Fx.Exception.AsError(new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled));
throw new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled);
}

this.isScheduled = true;
Expand Down Expand Up @@ -83,11 +83,11 @@ protected void ScheduleWithContext(SecurityContext contextToSchedule)
{
if (contextToSchedule == null)
{
throw Fx.Exception.ArgumentNull("context");
throw new ArgumentNullException("context");
}
if (isScheduled)
{
throw Fx.Exception.AsError(new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled));
throw new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled);
}

this.isScheduled = true;
Expand All @@ -103,7 +103,7 @@ protected void ScheduleWithoutContext()
{
if (isScheduled)
{
throw Fx.Exception.AsError(new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled));
throw new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled);
}

this.isScheduled = true;
Expand Down
14 changes: 14 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Microsoft.Azure.Amqp
public class AmqpConnection : AmqpConnectionBase, ISessionFactory
{
static readonly EventHandler onSessionClosed = OnSessionClosed;
static AmqpConnectionFactory defaultFactory;
readonly bool isInitiator;
readonly ProtocolHeader initialHeader;
readonly AmqpSettings amqpSettings;
Expand All @@ -25,6 +26,19 @@ public class AmqpConnection : AmqpConnectionBase, ISessionFactory
HeartBeat heartBeat;
KeyedByTypeCollection<object> extensions;

public static AmqpConnectionFactory Factory
{
get
{
if (defaultFactory == null)
{
Interlocked.CompareExchange(ref defaultFactory, new AmqpConnectionFactory(), null);
}

return defaultFactory;
}
}

public AmqpConnection(TransportBase transport, AmqpSettings amqpSettings, AmqpConnectionSettings connectionSettings) :
this(transport, amqpSettings.GetDefaultHeader(), true, amqpSettings, connectionSettings)
{
Expand Down
122 changes: 122 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpConnectionFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//----------------------------------------------------------------

namespace Microsoft.Azure.Amqp
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp.Sasl;
using Microsoft.Azure.Amqp.Transport;

public class AmqpConnectionFactory
{
public Task<AmqpConnection> OpenConnectionAsync(string address)
{
return this.OpenConnectionAsync(address, AmqpConstants.DefaultTimeout);
}

public Task<AmqpConnection> OpenConnectionAsync(string address, TimeSpan timeout)
{
return this.OpenConnectionAsync(new Uri(address), timeout);
}

public Task<AmqpConnection> OpenConnectionAsync(Uri addressUri, TimeSpan timeout)
{
SaslHandler saslHandler = null;

if (!string.IsNullOrEmpty(addressUri.UserInfo))
{
string[] parts = addressUri.UserInfo.Split(':');
if (parts.Length > 2)
{
throw new ArgumentException("addressUri.UserInfo " + addressUri.UserInfo);
}

string userName = Uri.UnescapeDataString(parts[0]);
string password = parts.Length > 1 ? Uri.UnescapeDataString(parts[1]) : string.Empty;

#if !PCL
saslHandler = new SaslPlainHandler() { AuthenticationIdentity = userName, Password = password };
#endif
}

return OpenConnectionAsync(addressUri, saslHandler, timeout);
}

public async Task<AmqpConnection> OpenConnectionAsync(Uri addressUri, SaslHandler saslHandler, TimeSpan timeout)
{
TransportSettings transportSettings;
if (addressUri.Scheme.Equals(AmqpConstants.SchemeAmqp, StringComparison.OrdinalIgnoreCase))
{
transportSettings = new TcpTransportSettings()
{
Host = addressUri.Host,
Port = addressUri.Port > -1 ? addressUri.Port : AmqpConstants.DefaultPort
};
}
else if (addressUri.Scheme.Equals(AmqpConstants.SchemeAmqps, StringComparison.OrdinalIgnoreCase))
{
TcpTransportSettings tcpSettings = new TcpTransportSettings()
{
Host = addressUri.Host,
Port = addressUri.Port > -1 ? addressUri.Port : AmqpConstants.DefaultSecurePort
};

transportSettings = new TlsTransportSettings(tcpSettings) { TargetHost = addressUri.Host };
}
#if NET45
else if (addressUri.Scheme.Equals(WebSocketTransport.WebSockets, StringComparison.OrdinalIgnoreCase) ||
addressUri.Scheme.Equals(WebSocketTransport.SecureWebSockets, StringComparison.OrdinalIgnoreCase))
{
transportSettings = new WebSocketTransportSettings() { Uri = addressUri };
}
#endif
else
{
throw new NotSupportedException(addressUri.Scheme);
}

AmqpSettings settings = new AmqpSettings();

if (saslHandler != null)
{
// Provider for "AMQP3100"
SaslTransportProvider saslProvider = new SaslTransportProvider();
saslProvider.Versions.Add(new AmqpVersion(1, 0, 0));
saslProvider.AddHandler(saslHandler);
settings.TransportProviders.Add(saslProvider);
}

// Provider for "AMQP0100"
AmqpTransportProvider amqpProvider = new AmqpTransportProvider();
amqpProvider.Versions.Add(new AmqpVersion(new Version(1, 0, 0, 0)));
settings.TransportProviders.Add(amqpProvider);

AmqpTransportInitiator initiator = new AmqpTransportInitiator(settings, transportSettings);
TransportBase transport = await Task.Factory.FromAsync(
(c, s) => initiator.BeginConnect(timeout, c, s),
(r) => initiator.EndConnect(r),
null);

try
{
AmqpConnectionSettings connectionSettings = new AmqpConnectionSettings()
{
ContainerId = Guid.NewGuid().ToString(),
HostName = addressUri.Host
};

AmqpConnection connection = new AmqpConnection(transport, settings, connectionSettings);
await connection.OpenAsync(timeout);

return connection;
}
catch
{
transport.Abort();
throw;
}
}
}
}
1 change: 0 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/AmqpConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public AmqpConnectionSettings Clone()
newSettings.IdleTimeOut = this.IdleTimeOut;
newSettings.OutgoingLocales = this.OutgoingLocales;
newSettings.IncomingLocales = this.IncomingLocales;
newSettings.Properties = this.Properties;
newSettings.OfferedCapabilities = this.OfferedCapabilities;
newSettings.DesiredCapabilities = this.DesiredCapabilities;
newSettings.Properties = this.Properties;
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/AmqpExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ public static Terminus Terminus(this Attach attach)

public static Address Address(this Attach attach)
{
if (attach.IsReceiver())
return Address(attach, attach.IsReceiver());
}

public static Address Address(this Attach attach, bool role)
{
if (role)
{
Fx.Assert(attach.Source != null && attach.Source is Source, "Source is not valid.");
return ((Source)attach.Source).Address;
Expand Down
46 changes: 33 additions & 13 deletions Microsoft.Azure.Amqp/Amqp/AmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Amqp
using System.Threading;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Transaction;
using Microsoft.Azure.Amqp.Tracing;

/// <summary>
/// Implements the transport-layer link, including
Expand Down Expand Up @@ -207,7 +206,6 @@ public void OnFlow(Flow flow)
public bool TrySendDelivery(Delivery delivery)
{
Fx.Assert(delivery.DeliveryTag.Array != null, "delivery-tag must be set.");
bool settled = this.settings.SettleType == SettleMode.SettleOnSend;

// check link credit first
bool canSend = false;
Expand All @@ -230,19 +228,23 @@ public bool TrySendDelivery(Delivery delivery)
return false;
}

delivery.PrepareForSend();
delivery.Settled = settled;
if (!delivery.Settled)
this.StartSendDelivery(delivery);
return true;
}

public void ForceSendDelivery(Delivery delivery)
{
// Send the delivery even if there is no link credit
lock (this.syncRoot)
{
lock (this.syncRoot)
this.deliveryCount.Increment();
if (this.linkCredit > 0 && this.linkCredit < uint.MaxValue)
{
this.unsettledMap.Add(delivery.DeliveryTag, delivery);
--this.linkCredit;
}
}

delivery.Link = this;
this.inflightDeliveries.DoWork(delivery);
return true;
this.StartSendDelivery(delivery);
}

// up-down: from application to link to session (to send a disposition)
Expand Down Expand Up @@ -457,7 +459,7 @@ internal virtual void OnIoEvent(IoEvent ioEvent)
}
}

public bool Invoke(Delivery delivery)
bool IWorkDelegate<Delivery>.Invoke(Delivery delivery)
{
return this.DoActionIfNotClosed(
(thisPtr, paramDelivery, p1, p2, p3) => thisPtr.SendDelivery(paramDelivery),
Expand Down Expand Up @@ -682,6 +684,22 @@ protected bool DoActionIfNotClosed<T1, T2, T3, T4>(Func<AmqpLink, T1, T2, T3, T4
}
}

void StartSendDelivery(Delivery delivery)
{
delivery.PrepareForSend();
delivery.Settled = this.settings.SettleType == SettleMode.SettleOnSend;
if (!delivery.Settled)
{
lock (this.syncRoot)
{
this.unsettledMap.Add(delivery.DeliveryTag, delivery);
}
}

delivery.Link = this;
this.inflightDeliveries.DoWork(delivery);
}

void DisposeDeliveryInternal(Delivery delivery, bool settled, DeliveryState state, bool noFlush)
{
AmqpTrace.Provider.AmqpDispose(this, delivery.DeliveryId.Value, settled, state);
Expand Down Expand Up @@ -750,7 +768,8 @@ AmqpObjectState SendDetach()

this.Session.SendCommand(detach);

AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "S:DETACH", detach.Error != null ? detach.Error.Condition.Value : string.Empty);
AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "S:DETACH",
detach.Error != null ? detach.Error.Condition.Value : string.Empty);

return transition.To;
}
Expand Down Expand Up @@ -846,7 +865,8 @@ void OnReceiveAttach(Attach attach)

void OnReceiveDetach(Detach detach)
{
AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "R:DETACH", detach.Error != null ? detach.Error.Condition.Value : string.Empty);
AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "R:DETACH",
detach.Error != null ? detach.Error.Condition.Value : string.Empty);

this.OnReceiveCloseCommand("R:DETACH", detach.Error);
}
Expand Down
10 changes: 4 additions & 6 deletions Microsoft.Azure.Amqp/Amqp/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,7 @@ public AmqpMessage Clone()
{
if (this.BytesTransfered > 0)
{
var exception = new InvalidOperationException(AmqpResources.AmqpCannotCloneSentMessage);
AmqpTrace.Provider.AmqpThrowingExceptionWarning(ExceptionTrace.GetDetailsForThrownException(exception));
throw Fx.Exception.AsWarning(exception);
throw new InvalidOperationException(AmqpResources.AmqpCannotCloneSentMessage);
}

bool more;
Expand Down Expand Up @@ -1281,7 +1279,7 @@ public AmqpStreamMessage(BufferListStream messageStream, bool payloadInitialized
{
if (messageStream == null)
{
throw Fx.Exception.ArgumentNullOrEmpty("bufferStream");
throw new ArgumentNullException("bufferStream");
}

this.messageStream = messageStream;
Expand All @@ -1297,7 +1295,7 @@ public AmqpStreamMessage(Stream nonBodySections, Stream bodySection, bool forceC
// Currently message always has header stream, may change in the future
if (nonBodySections == null)
{
throw Fx.Exception.ArgumentNull("nonBodySections");
throw new ArgumentNullException("nonBodySections");
}

this.messageStream = BufferListStream.Create(nonBodySections, AmqpConstants.SegmentSize, forceCopyStream);
Expand Down Expand Up @@ -1576,7 +1574,7 @@ public AmqpStreamMessageHeader(BufferListStream headerStream)
{
if (headerStream == null)
{
throw Fx.Exception.ArgumentNullOrEmpty("headerStream");
throw new ArgumentNullException("headerStream");
}

this.bufferStream = headerStream;
Expand Down
10 changes: 3 additions & 7 deletions Microsoft.Azure.Amqp/Amqp/AmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ public void Open(TimeSpan timeout)
{
if (this.openCalled)
{
var exception = new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
AmqpTrace.Provider.AmqpThrowingExceptionWarning(ExceptionTrace.GetDetailsForThrownException(exception));
throw Fx.Exception.AsWarning(exception);
throw new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
}

this.openCalled = true;
Expand Down Expand Up @@ -180,9 +178,7 @@ public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object s
{
if (this.openCalled)
{
var exception = new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
AmqpTrace.Provider.AmqpThrowingExceptionWarning(ExceptionTrace.GetDetailsForThrownException(exception));
throw Fx.Exception.AsWarning(exception);
throw new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
}

this.openCalled = true;
Expand Down Expand Up @@ -297,7 +293,7 @@ public void Abort()
catch (Exception exception)
{
// No one is supposed to throw but it someone does, we need to investigate
AmqpTrace.Provider.AmqpThrowingExceptionError(exception.ToStringSlim());
AmqpTrace.Provider.AmqpAbortThrowingException(exception.ToStringSlim());
throw;
}
finally
Expand Down
Loading

0 comments on commit f137b5a

Please sign in to comment.