Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed Dec 4, 2018
1 parent 8a437d6 commit 6abcb2d
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 1,023 deletions.
316 changes: 77 additions & 239 deletions src/Cbs/AmqpCbsLink.cs

Large diffs are not rendered by default.

560 changes: 0 additions & 560 deletions src/Fx/IteratorAsyncResult.cs

This file was deleted.

2 changes: 1 addition & 1 deletion src/Fx/Singleton.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Task CloseAsync()
{
this.Dispose();

return TaskHelpers.CompletedTask;
return Task.CompletedTask;
}

public void Close()
Expand Down
212 changes: 0 additions & 212 deletions src/Fx/TaskHelpers.cs

This file was deleted.

6 changes: 3 additions & 3 deletions src/ReceivingAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public IAsyncResult BeginReceiveRemoteMessages(int messageCount, TimeSpan batchW

public Task<AmqpMessage> ReceiveMessageAsync(TimeSpan timeout)
{
return TaskHelpers.CreateTask(
return Task.Factory.FromAsync(
(c, s) => ((ReceivingAmqpLink)s).BeginReceiveMessage(timeout, c, s),
(a) =>
{
Expand Down Expand Up @@ -244,8 +244,8 @@ public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, Outcome

public Task<Outcome> DisposeMessageAsync(ArraySegment<byte> deliveryTag, ArraySegment<byte> txnId, Outcome outcome, bool batchable, TimeSpan timeout)
{
return TaskHelpers.CreateTask(
(c, s) => this.BeginDisposeMessage(deliveryTag, txnId, outcome, batchable, timeout, c, s),
return Task.Factory.FromAsync(
(c, s) => ((ReceivingAmqpLink)s).BeginDisposeMessage(deliveryTag, txnId, outcome, batchable, timeout, c, s),
a => ((ReceivingAmqpLink)a.AsyncState).EndDisposeMessage(a),
this);
}
Expand Down
2 changes: 1 addition & 1 deletion src/RequestResponseAmqpLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public AmqpSession Session

public Task<AmqpMessage> RequestAsync(AmqpMessage request, TimeSpan timeout)
{
return TaskHelpers.CreateTask(
return Task.Factory.FromAsync(
(c, s) => ((RequestResponseAmqpLink)s).BeginRequest(request, timeout, c, s),
(r) => ((RequestResponseAmqpLink)r.AsyncState).EndRequest(r),
this);
Expand Down
12 changes: 6 additions & 6 deletions src/Transport/TransportStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ public override void SetLength(long value)

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return TaskHelpers.CreateTask(
(c, s) => this.BeginWrite(buffer, offset, count, c, s),
(a) => this.EndWrite(a),
return Task.Factory.FromAsync(
(c, s) => ((Stream)s).BeginWrite(buffer, offset, count, c, s),
(a) => ((Stream)a.AsyncState).EndWrite(a),
this);
}

Expand Down Expand Up @@ -124,9 +124,9 @@ public override void EndWrite(IAsyncResult asyncResult)

public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
return TaskHelpers.CreateTask(
(c, s) => this.BeginRead(buffer, offset, count, c, s),
(a) => this.EndRead(a),
return Task.Factory.FromAsync(
(c, s) => ((Stream)s).BeginRead(buffer, offset, count, c, s),
(a) => ((Stream)a.AsyncState).EndRead(a),
this);
}

Expand Down
50 changes: 49 additions & 1 deletion src/Transport/WebSocketTransportInitiator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ public override bool ConnectAsync(TimeSpan timeout, TransportAsyncCallbackArgs c
cws.Options.Proxy = this.settings.Proxy;
}

Task task = cws.ConnectAsync(this.settings.Uri, CancellationToken.None).WithTimeout(timeout, () => "timeout");
var task = new TimeoutTaskSource<ClientWebSocket>(
cws,
s => s.ConnectAsync(this.settings.Uri, CancellationToken.None),
s => s.Abort(),
timeout).Task;

if (task.IsCompleted)
{
callbackArgs.Transport = new WebSocketTransport(cws, this.settings.Uri);
Expand All @@ -53,5 +58,48 @@ public override bool ConnectAsync(TimeSpan timeout, TransportAsyncCallbackArgs c
});
return true;
}

sealed class TimeoutTaskSource<T> : TaskCompletionSource<T> where T : class
{
readonly T t;
readonly TimeSpan timeout;
readonly ITimer timer;
readonly Action<T> onTimeout;

public TimeoutTaskSource(T t, Func<T, Task> onStart, Action<T> onTimeout, TimeSpan timeout)
{
this.t = t;
this.onTimeout = onTimeout;
this.timeout = timeout;
this.timer = SystemTimerFactory.Default.Create(OnTimer, this, timeout);

Task task = onStart(t);
task.ContinueWith((_t, _s) => ((TimeoutTaskSource<T>)_s).OnTask(_t), this);
}

static void OnTimer(object state)
{
var thisPtr = (TimeoutTaskSource<T>)state;
thisPtr.onTimeout(thisPtr.t);
thisPtr.TrySetException(new TimeoutException(AmqpResources.GetString(AmqpResources.AmqpTimeout, thisPtr.timeout, typeof(T).Name)));
}

void OnTask(Task inner)
{
this.timer.Cancel();
if (inner.IsFaulted)
{
this.TrySetException(inner.Exception.InnerException);
}
else if (inner.IsCanceled)
{
this.TrySetCanceled();
}
else
{
this.TrySetResult(this.t);
}
}
}
}
}

0 comments on commit 6abcb2d

Please sign in to comment.