Skip to content

Commit

Permalink
[#77] throw exception in send/receive if the object is closed.
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Feb 12, 2018
1 parent c042f90 commit d6e42f8
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 2 deletions.
8 changes: 8 additions & 0 deletions Microsoft.Azure.Amqp/Amqp/AmqpObject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,14 @@ protected void OnReceiveCloseCommand(string command, Error error)
}
}

protected void ThrowIfClosed()
{
if (this.closeCalled)
{
throw new AmqpException(AmqpErrorCode.IllegalState, $"'{this.name}' is closed");
}
}

static void OnSafeCloseComplete(IAsyncResult result)
{
AmqpObject thisPtr = (AmqpObject)result.AsyncState;
Expand Down
5 changes: 4 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ReceivingAmqpLink(AmqpLinkSettings settings)
}

public ReceivingAmqpLink(AmqpSession session, AmqpLinkSettings settings) :
base(session, settings)
base("receiver", session, settings)
{
}

Expand Down Expand Up @@ -157,6 +157,7 @@ public IAsyncResult BeginReceiveMessages(int messageCount, TimeSpan timeout, Asy

IAsyncResult BeginReceiveMessages(int messageCount, TimeSpan batchWaitTimeout, TimeSpan timeout, AsyncCallback callback, object state)
{
this.ThrowIfClosed();
List<AmqpMessage> messages = new List<AmqpMessage>();
lock (this.SyncRoot)
{
Expand Down Expand Up @@ -249,6 +250,7 @@ public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome

public IAsyncResult BeginDisposeMessage(ArraySegment<byte> deliveryTag, Outcome outcome, bool batchable, TimeSpan timeout, AsyncCallback callback, object state)
{
this.ThrowIfClosed();
return new DisposeAsyncResult(this, deliveryTag, outcome, batchable, timeout, callback, state);
}

Expand Down Expand Up @@ -293,6 +295,7 @@ public void ModifyMessage(AmqpMessage message, bool deliveryFailed, bool deliver

public void DisposeMessage(AmqpMessage message, DeliveryState state, bool settled, bool batchable)
{
this.ThrowIfClosed();
message.Batchable = batchable;
this.DisposeDelivery(message, settled, state);
}
Expand Down
3 changes: 2 additions & 1 deletion Microsoft.Azure.Amqp/Amqp/SendingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public SendingAmqpLink(AmqpLinkSettings settings)
}

public SendingAmqpLink(AmqpSession session, AmqpLinkSettings settings)
: base(session, settings)
: base("sender", session, settings)
{
// TODO: Add capability negotiation logic for BatchedMessageFormat to this.Settings
this.pendingDeliveries = new SerializedWorker<AmqpMessage>(this);
Expand Down Expand Up @@ -76,6 +76,7 @@ public IAsyncResult BeginSendMessage(
AsyncCallback callback,
object state)
{
this.ThrowIfClosed();
if (this.dispositionListener != null)
{
throw new InvalidOperationException(CommonResources.DispositionListenerSetNotSupported);
Expand Down

0 comments on commit d6e42f8

Please sign in to comment.