Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge changes that are part of version 2.0.6 #65

Merged
merged 31 commits into from
Jun 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4ce8f5b
Missing one parameter in getting resource string AmqpConnectionInactive
xinchen10 Feb 7, 2017
dcea2a2
Serializer update: external type compiler, IEnumerable member in cont…
xinchen10 Mar 28, 2017
449700b
SASL EXTERNAL does not establish identity.
xinchen10 Mar 28, 2017
5541496
Receiver link could run into null reference exception.
xinchen10 Mar 28, 2017
5379ed0
Request response link open/close race condition.
xinchen10 Mar 28, 2017
a0e405d
Receiver should use delivery id from sender.
xinchen10 Mar 28, 2017
8de0277
Make SSL protocols settable.
xinchen10 Mar 28, 2017
be527e8
Allow sending delivery if link credit was reduced while the listener …
xinchen10 Mar 28, 2017
461afc5
Fix ci build failure: use explicit sdk version.
xinchen10 Mar 28, 2017
710a3c3
Update global.json
xinchen10 Mar 28, 2017
4099deb
Add ConnectionFactory for easier connection establishment
xinchen10 Apr 10, 2017
1d2fa1a
Remove UnsafeNativeMethods.cs
xinchen10 Apr 12, 2017
23d8cd4
Add folders in projects
xinchen10 Apr 12, 2017
ec6f2e5
Remove some trace events.
xinchen10 Apr 12, 2017
598441e
Remove ExceptionTrace. Cleanup code.
xinchen10 Apr 12, 2017
52faa54
Serializer: make buffer based read/write methods public
xinchen10 Apr 12, 2017
5d5f8f9
Segment buffer pool: improve memory usage on client.
xinchen10 Apr 13, 2017
6c9323f
Move Cbs files to itw own folder
xinchen10 Apr 22, 2017
698323c
AMQP over WebSockets (.net framework only)
xinchen10 Apr 22, 2017
5bb88c1
Adding static flag to include error details
Apr 27, 2017
f1e1de4
Merge pull request #59 from arunkaithayil/develop
xinchen10 Apr 27, 2017
453e940
make TlsTransport and associated classes extensible outside the library
May 16, 2017
3c8703a
Address Xin's comments
May 17, 2017
944dbf1
Merge from develop branch
May 17, 2017
2debd13
Merge pull request #62 from rajeevmv/master
xinchen10 May 17, 2017
12824b1
Update TlsTransportProvider.cs
rajeevmv May 19, 2017
ecf1c70
Merge pull request #63 from Azure/rajeevmv-patch-1
xinchen10 May 20, 2017
2df3eeb
Update solution and projects to support VS2017
May 22, 2017
d9c983e
Merge branch 'develop'
May 22, 2017
6c9dbd1
Merge pull request #64 from rajeevmv/master
xinchen10 May 23, 2017
8b30f90
Merge from develop commit hash 6c9dbd1a8f1ca and bump version number
May 26, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
773 changes: 579 additions & 194 deletions Microsoft.Azure.Amqp.Android/Microsoft.Azure.Amqp.Android.csproj

Large diffs are not rendered by default.

Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
692 changes: 517 additions & 175 deletions Microsoft.Azure.Amqp.PCL/Microsoft.Azure.Amqp.PCL.csproj

Large diffs are not rendered by default.

Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
709 changes: 530 additions & 179 deletions Microsoft.Azure.Amqp.Uwp/Microsoft.Azure.Amqp.Uwp.csproj

Large diffs are not rendered by default.

Empty file.
8 changes: 4 additions & 4 deletions Microsoft.Azure.Amqp/ActionItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected void Schedule()
{
if (isScheduled)
{
throw Fx.Exception.AsError(new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled));
throw new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled);
}

this.isScheduled = true;
Expand Down Expand Up @@ -83,11 +83,11 @@ protected void ScheduleWithContext(SecurityContext contextToSchedule)
{
if (contextToSchedule == null)
{
throw Fx.Exception.ArgumentNull("context");
throw new ArgumentNullException("context");
}
if (isScheduled)
{
throw Fx.Exception.AsError(new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled));
throw new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled);
}

this.isScheduled = true;
Expand All @@ -103,7 +103,7 @@ protected void ScheduleWithoutContext()
{
if (isScheduled)
{
throw Fx.Exception.AsError(new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled));
throw new InvalidOperationException(CommonResources.ActionItemIsAlreadyScheduled);
}

this.isScheduled = true;
Expand Down
14 changes: 14 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace Microsoft.Azure.Amqp
public class AmqpConnection : AmqpConnectionBase, ISessionFactory
{
static readonly EventHandler onSessionClosed = OnSessionClosed;
static AmqpConnectionFactory defaultFactory;
readonly bool isInitiator;
readonly ProtocolHeader initialHeader;
readonly AmqpSettings amqpSettings;
Expand All @@ -25,6 +26,19 @@ public class AmqpConnection : AmqpConnectionBase, ISessionFactory
HeartBeat heartBeat;
KeyedByTypeCollection<object> extensions;

public static AmqpConnectionFactory Factory
{
get
{
if (defaultFactory == null)
{
Interlocked.CompareExchange(ref defaultFactory, new AmqpConnectionFactory(), null);
}

return defaultFactory;
}
}

public AmqpConnection(TransportBase transport, AmqpSettings amqpSettings, AmqpConnectionSettings connectionSettings) :
this(transport, amqpSettings.GetDefaultHeader(), true, amqpSettings, connectionSettings)
{
Expand Down
122 changes: 122 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpConnectionFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
//----------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
//----------------------------------------------------------------

namespace Microsoft.Azure.Amqp
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp.Sasl;
using Microsoft.Azure.Amqp.Transport;

public class AmqpConnectionFactory
{
public Task<AmqpConnection> OpenConnectionAsync(string address)
{
return this.OpenConnectionAsync(address, AmqpConstants.DefaultTimeout);
}

public Task<AmqpConnection> OpenConnectionAsync(string address, TimeSpan timeout)
{
return this.OpenConnectionAsync(new Uri(address), timeout);
}

public Task<AmqpConnection> OpenConnectionAsync(Uri addressUri, TimeSpan timeout)
{
SaslHandler saslHandler = null;

if (!string.IsNullOrEmpty(addressUri.UserInfo))
{
string[] parts = addressUri.UserInfo.Split(':');
if (parts.Length > 2)
{
throw new ArgumentException("addressUri.UserInfo " + addressUri.UserInfo);
}

string userName = Uri.UnescapeDataString(parts[0]);
string password = parts.Length > 1 ? Uri.UnescapeDataString(parts[1]) : string.Empty;

#if !PCL
saslHandler = new SaslPlainHandler() { AuthenticationIdentity = userName, Password = password };
#endif
}

return OpenConnectionAsync(addressUri, saslHandler, timeout);
}

public async Task<AmqpConnection> OpenConnectionAsync(Uri addressUri, SaslHandler saslHandler, TimeSpan timeout)
{
TransportSettings transportSettings;
if (addressUri.Scheme.Equals(AmqpConstants.SchemeAmqp, StringComparison.OrdinalIgnoreCase))
{
transportSettings = new TcpTransportSettings()
{
Host = addressUri.Host,
Port = addressUri.Port > -1 ? addressUri.Port : AmqpConstants.DefaultPort
};
}
else if (addressUri.Scheme.Equals(AmqpConstants.SchemeAmqps, StringComparison.OrdinalIgnoreCase))
{
TcpTransportSettings tcpSettings = new TcpTransportSettings()
{
Host = addressUri.Host,
Port = addressUri.Port > -1 ? addressUri.Port : AmqpConstants.DefaultSecurePort
};

transportSettings = new TlsTransportSettings(tcpSettings) { TargetHost = addressUri.Host };
}
#if NET45
else if (addressUri.Scheme.Equals(WebSocketTransport.WebSockets, StringComparison.OrdinalIgnoreCase) ||
addressUri.Scheme.Equals(WebSocketTransport.SecureWebSockets, StringComparison.OrdinalIgnoreCase))
{
transportSettings = new WebSocketTransportSettings() { Uri = addressUri };
}
#endif
else
{
throw new NotSupportedException(addressUri.Scheme);
}

AmqpSettings settings = new AmqpSettings();

if (saslHandler != null)
{
// Provider for "AMQP3100"
SaslTransportProvider saslProvider = new SaslTransportProvider();
saslProvider.Versions.Add(new AmqpVersion(1, 0, 0));
saslProvider.AddHandler(saslHandler);
settings.TransportProviders.Add(saslProvider);
}

// Provider for "AMQP0100"
AmqpTransportProvider amqpProvider = new AmqpTransportProvider();
amqpProvider.Versions.Add(new AmqpVersion(new Version(1, 0, 0, 0)));
settings.TransportProviders.Add(amqpProvider);

AmqpTransportInitiator initiator = new AmqpTransportInitiator(settings, transportSettings);
TransportBase transport = await Task.Factory.FromAsync(
(c, s) => initiator.BeginConnect(timeout, c, s),
(r) => initiator.EndConnect(r),
null);

try
{
AmqpConnectionSettings connectionSettings = new AmqpConnectionSettings()
{
ContainerId = Guid.NewGuid().ToString(),
HostName = addressUri.Host
};

AmqpConnection connection = new AmqpConnection(transport, settings, connectionSettings);
await connection.OpenAsync(timeout);

return connection;
}
catch
{
transport.Abort();
throw;
}
}
}
}
1 change: 0 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/AmqpConnectionSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public AmqpConnectionSettings Clone()
newSettings.IdleTimeOut = this.IdleTimeOut;
newSettings.OutgoingLocales = this.OutgoingLocales;
newSettings.IncomingLocales = this.IncomingLocales;
newSettings.Properties = this.Properties;
newSettings.OfferedCapabilities = this.OfferedCapabilities;
newSettings.DesiredCapabilities = this.DesiredCapabilities;
newSettings.Properties = this.Properties;
Expand Down
7 changes: 6 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/AmqpExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ public static Terminus Terminus(this Attach attach)

public static Address Address(this Attach attach)
{
if (attach.IsReceiver())
return Address(attach, attach.IsReceiver());
}

public static Address Address(this Attach attach, bool role)
{
if (role)
{
Fx.Assert(attach.Source != null && attach.Source is Source, "Source is not valid.");
return ((Source)attach.Source).Address;
Expand Down
46 changes: 33 additions & 13 deletions Microsoft.Azure.Amqp/Amqp/AmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Microsoft.Azure.Amqp
using System.Threading;
using Microsoft.Azure.Amqp.Framing;
using Microsoft.Azure.Amqp.Transaction;
using Microsoft.Azure.Amqp.Tracing;

/// <summary>
/// Implements the transport-layer link, including
Expand Down Expand Up @@ -207,7 +206,6 @@ public void OnFlow(Flow flow)
public bool TrySendDelivery(Delivery delivery)
{
Fx.Assert(delivery.DeliveryTag.Array != null, "delivery-tag must be set.");
bool settled = this.settings.SettleType == SettleMode.SettleOnSend;

// check link credit first
bool canSend = false;
Expand All @@ -230,19 +228,23 @@ public bool TrySendDelivery(Delivery delivery)
return false;
}

delivery.PrepareForSend();
delivery.Settled = settled;
if (!delivery.Settled)
this.StartSendDelivery(delivery);
return true;
}

public void ForceSendDelivery(Delivery delivery)
{
// Send the delivery even if there is no link credit
lock (this.syncRoot)
{
lock (this.syncRoot)
this.deliveryCount.Increment();
if (this.linkCredit > 0 && this.linkCredit < uint.MaxValue)
{
this.unsettledMap.Add(delivery.DeliveryTag, delivery);
--this.linkCredit;
}
}

delivery.Link = this;
this.inflightDeliveries.DoWork(delivery);
return true;
this.StartSendDelivery(delivery);
}

// up-down: from application to link to session (to send a disposition)
Expand Down Expand Up @@ -457,7 +459,7 @@ internal virtual void OnIoEvent(IoEvent ioEvent)
}
}

public bool Invoke(Delivery delivery)
bool IWorkDelegate<Delivery>.Invoke(Delivery delivery)
{
return this.DoActionIfNotClosed(
(thisPtr, paramDelivery, p1, p2, p3) => thisPtr.SendDelivery(paramDelivery),
Expand Down Expand Up @@ -682,6 +684,22 @@ protected bool DoActionIfNotClosed<T1, T2, T3, T4>(Func<AmqpLink, T1, T2, T3, T4
}
}

void StartSendDelivery(Delivery delivery)
{
delivery.PrepareForSend();
delivery.Settled = this.settings.SettleType == SettleMode.SettleOnSend;
if (!delivery.Settled)
{
lock (this.syncRoot)
{
this.unsettledMap.Add(delivery.DeliveryTag, delivery);
}
}

delivery.Link = this;
this.inflightDeliveries.DoWork(delivery);
}

void DisposeDeliveryInternal(Delivery delivery, bool settled, DeliveryState state, bool noFlush)
{
AmqpTrace.Provider.AmqpDispose(this, delivery.DeliveryId.Value, settled, state);
Expand Down Expand Up @@ -750,7 +768,8 @@ AmqpObjectState SendDetach()

this.Session.SendCommand(detach);

AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "S:DETACH", detach.Error != null ? detach.Error.Condition.Value : string.Empty);
AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "S:DETACH",
detach.Error != null ? detach.Error.Condition.Value : string.Empty);

return transition.To;
}
Expand Down Expand Up @@ -846,7 +865,8 @@ void OnReceiveAttach(Attach attach)

void OnReceiveDetach(Detach detach)
{
AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "R:DETACH", detach.Error != null ? detach.Error.Condition.Value : string.Empty);
AmqpTrace.Provider.AmqpLinkDetach(this, this.Name, this.LocalHandle ?? 0u, "R:DETACH",
detach.Error != null ? detach.Error.Condition.Value : string.Empty);

this.OnReceiveCloseCommand("R:DETACH", detach.Error);
}
Expand Down
10 changes: 4 additions & 6 deletions Microsoft.Azure.Amqp/Amqp/AmqpMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,7 @@ public AmqpMessage Clone()
{
if (this.BytesTransfered > 0)
{
var exception = new InvalidOperationException(AmqpResources.AmqpCannotCloneSentMessage);
AmqpTrace.Provider.AmqpThrowingExceptionWarning(ExceptionTrace.GetDetailsForThrownException(exception));
throw Fx.Exception.AsWarning(exception);
throw new InvalidOperationException(AmqpResources.AmqpCannotCloneSentMessage);
}

bool more;
Expand Down Expand Up @@ -1281,7 +1279,7 @@ public AmqpStreamMessage(BufferListStream messageStream, bool payloadInitialized
{
if (messageStream == null)
{
throw Fx.Exception.ArgumentNullOrEmpty("bufferStream");
throw new ArgumentNullException("bufferStream");
}

this.messageStream = messageStream;
Expand All @@ -1297,7 +1295,7 @@ public AmqpStreamMessage(Stream nonBodySections, Stream bodySection, bool forceC
// Currently message always has header stream, may change in the future
if (nonBodySections == null)
{
throw Fx.Exception.ArgumentNull("nonBodySections");
throw new ArgumentNullException("nonBodySections");
}

this.messageStream = BufferListStream.Create(nonBodySections, AmqpConstants.SegmentSize, forceCopyStream);
Expand Down Expand Up @@ -1576,7 +1574,7 @@ public AmqpStreamMessageHeader(BufferListStream headerStream)
{
if (headerStream == null)
{
throw Fx.Exception.ArgumentNullOrEmpty("headerStream");
throw new ArgumentNullException("headerStream");
}

this.bufferStream = headerStream;
Expand Down
10 changes: 3 additions & 7 deletions Microsoft.Azure.Amqp/Amqp/AmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ public void Open(TimeSpan timeout)
{
if (this.openCalled)
{
var exception = new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
AmqpTrace.Provider.AmqpThrowingExceptionWarning(ExceptionTrace.GetDetailsForThrownException(exception));
throw Fx.Exception.AsWarning(exception);
throw new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
}

this.openCalled = true;
Expand Down Expand Up @@ -180,9 +178,7 @@ public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object s
{
if (this.openCalled)
{
var exception = new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
AmqpTrace.Provider.AmqpThrowingExceptionWarning(ExceptionTrace.GetDetailsForThrownException(exception));
throw Fx.Exception.AsWarning(exception);
throw new InvalidOperationException(AmqpResources.GetString(AmqpResources.AmqpInvalidReOpenOperation, this, this.State));
}

this.openCalled = true;
Expand Down Expand Up @@ -297,7 +293,7 @@ public void Abort()
catch (Exception exception)
{
// No one is supposed to throw but it someone does, we need to investigate
AmqpTrace.Provider.AmqpThrowingExceptionError(exception.ToStringSlim());
AmqpTrace.Provider.AmqpAbortThrowingException(exception.ToStringSlim());
throw;
}
finally
Expand Down
Loading