From d6e42f8ff3a76c8db5dceccb9521abb148b234d9 Mon Sep 17 00:00:00 2001 From: xinchen Date: Thu, 8 Feb 2018 17:51:09 -0800 Subject: [PATCH] [#77] throw exception in send/receive if the object is closed. --- Microsoft.Azure.Amqp/Amqp/AmqpObject.cs | 8 ++++++++ Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs | 5 ++++- Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs | 3 ++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/Microsoft.Azure.Amqp/Amqp/AmqpObject.cs b/Microsoft.Azure.Amqp/Amqp/AmqpObject.cs index de0886ec..dfeae7e2 100644 --- a/Microsoft.Azure.Amqp/Amqp/AmqpObject.cs +++ b/Microsoft.Azure.Amqp/Amqp/AmqpObject.cs @@ -482,6 +482,14 @@ protected void OnReceiveCloseCommand(string command, Error error) } } + protected void ThrowIfClosed() + { + if (this.closeCalled) + { + throw new AmqpException(AmqpErrorCode.IllegalState, $"'{this.name}' is closed"); + } + } + static void OnSafeCloseComplete(IAsyncResult result) { AmqpObject thisPtr = (AmqpObject)result.AsyncState; diff --git a/Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs b/Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs index 7b5e2e61..7f112fe7 100644 --- a/Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs +++ b/Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs @@ -36,7 +36,7 @@ public ReceivingAmqpLink(AmqpLinkSettings settings) } public ReceivingAmqpLink(AmqpSession session, AmqpLinkSettings settings) : - base(session, settings) + base("receiver", session, settings) { } @@ -157,6 +157,7 @@ public IAsyncResult BeginReceiveMessages(int messageCount, TimeSpan timeout, Asy IAsyncResult BeginReceiveMessages(int messageCount, TimeSpan batchWaitTimeout, TimeSpan timeout, AsyncCallback callback, object state) { + this.ThrowIfClosed(); List messages = new List(); lock (this.SyncRoot) { @@ -249,6 +250,7 @@ public Task DisposeMessageAsync(ArraySegment deliveryTag, Outcome public IAsyncResult BeginDisposeMessage(ArraySegment deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout, AsyncCallback callback, object state) { + this.ThrowIfClosed(); return new DisposeAsyncResult(this, deliveryTag, outcome, batchable, timeout, callback, state); } @@ -293,6 +295,7 @@ public void ModifyMessage(AmqpMessage message, bool deliveryFailed, bool deliver public void DisposeMessage(AmqpMessage message, DeliveryState state, bool settled, bool batchable) { + this.ThrowIfClosed(); message.Batchable = batchable; this.DisposeDelivery(message, settled, state); } diff --git a/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs b/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs index d66233c3..c20b7d1b 100644 --- a/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs +++ b/Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs @@ -24,7 +24,7 @@ public SendingAmqpLink(AmqpLinkSettings settings) } public SendingAmqpLink(AmqpSession session, AmqpLinkSettings settings) - : base(session, settings) + : base("sender", session, settings) { // TODO: Add capability negotiation logic for BatchedMessageFormat to this.Settings this.pendingDeliveries = new SerializedWorker(this); @@ -76,6 +76,7 @@ public IAsyncResult BeginSendMessage( AsyncCallback callback, object state) { + this.ThrowIfClosed(); if (this.dispositionListener != null) { throw new InvalidOperationException(CommonResources.DispositionListenerSetNotSupported);