Skip to content

Commit

Permalink
Minor async changes #15 looking to fix change feed bug in cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
bchavez committed Nov 22, 2015
1 parent 1dc1699 commit 34c25b4
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public void basic_change_feed_with_reactive_extensions()
result.ChangesAs<JObject>().Dump();

var changes = r.db(DbName).table(TableName)
.changes()[new {include_states = true, include_initial = true}]
//.changes()[new {include_states = true, include_initial = true}]
.changes()[new { include_states = true }]
//.runCursor<Change<JObject>>(conn);
.runChanges<JObject>(conn);

Expand Down
8 changes: 8 additions & 0 deletions Source/RethinkDb.Driver.Tests/TestingCommon.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ public Err(string errorType, string errorMessage, object obj)

public override bool Equals(object obj)
{
if( obj is AggregateException )
{
obj = (obj as AggregateException).InnerException;
}
if( obj.GetType() != this.clazz )
{
obj.GetType().Should().Be(this.clazz);
Expand Down Expand Up @@ -652,6 +656,10 @@ public ErrRegex(String classname, String message_rgx)

public override bool Equals(Object other)
{
if( other is AggregateException )
{
other = (other as AggregateException).InnerException;
}
var otherClass = other.GetType().Name;

if( !otherClass.EndsWith(this.clazz) )
Expand Down
1 change: 0 additions & 1 deletion Source/RethinkDb.Driver/Net/ConnectionInstance.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using RethinkDb.Driver.Ast;
Expand Down
6 changes: 6 additions & 0 deletions Source/RethinkDb.Driver/Net/Cursor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Newtonsoft.Json.Linq;
using RethinkDb.Driver.Ast;
using RethinkDb.Driver.Proto;
Expand Down Expand Up @@ -32,6 +33,7 @@ public abstract class Cursor<T> : IEnumerable<T>, IEnumerator<T>, ICursor
protected internal Exception error = null;
public bool IsFeed { get; }


protected Cursor(Connection connection, Query query, Response firstResponse)
{
this.connection = connection;
Expand Down Expand Up @@ -158,6 +160,10 @@ public override bool MoveNext()
//finished awaiting.
var result = connection.AwaitResponseAsync(query, NetUtil.Deadline(timeout))
.RunSync();

//there's a problem here....... with change feed types,
//of cursors, data can arrive without having to "send";

this.Extend(result);
}

Expand Down
62 changes: 33 additions & 29 deletions Source/RethinkDb.Driver/Net/SocketWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public virtual void Connect(byte[] handshake)
throw new ReqlDriverError($"Server dropped connection with message: '{msg}'");
}

Task.Factory.StartNew(ResponseLoop, TaskCreationOptions.LongRunning);
Task.Factory.StartNew(ResponsePump, TaskCreationOptions.LongRunning);
}
catch when( !taskComplete )
{
Expand Down Expand Up @@ -101,44 +101,48 @@ private string ReadNullTerminatedString(TimeSpan? deadline)
/// <summary>
/// Started just after connect.
/// </summary>
private void ResponseLoop()
private void ResponsePump()
{
pump = new CancellationTokenSource();

while( true )
using( pump )
{
if( pump.Token.IsCancellationRequested )
while( true )
{
Log.Trace("Response Loop: shutting down. Cancel is requested.");
break;
}
if( this.Closed )
{
Log.Trace("Response Loop: The connected socket is not open. Response Loop exiting.");
break;
}
if( pump.Token.IsCancellationRequested )
{
Log.Trace("Response Loop: shutting down. Cancel is requested.");
break;
}
if( this.Closed )
{
Log.Trace("Response Loop: The connected socket is not open. Response Loop exiting.");
break;
}

try
{
var response = this.Read();
TaskCompletionSource<Response> awaitingTask;
if( awaiters.TryRemove(response.Token, out awaitingTask) )
try
{
//Push, don't block.
Task.Run(() => awaitingTask.SetResult(response));
//See ya...
var response = this.Read();
Log.Trace($"Message Pump: Read {response.Token}");
TaskCompletionSource<Response> awaitingTask;
if( awaiters.TryRemove(response.Token, out awaitingTask) )
{
//Push, don't block.
Task.Run(() => awaitingTask.SetResult(response));
//See ya later alligator
}
else
{
//Wow, there's nobody waiting for this response.
Log.Debug($"Response Loop: There are no awaiters waiting for {response.Token} token.");
//I guess we'll ignore for now, perhaps a cursor was killed
}
}
else
catch( Exception e ) when( !pump.Token.IsCancellationRequested )
{
//Wow, there's nobody waiting for this response.
Log.Debug($"Response Loop: There are no awaiters waiting for {response.Token} token.");
//I guess we'll ignore for now, perhaps a cursor was killed
Log.Debug($"Response Loop: Exception - {e.Message}");
}
}
catch( Exception e ) when( !pump.Token.IsCancellationRequested )
{
Log.Debug($"Response Loop: Exception - {e.Message}");
}
}

//clean up.
Expand Down Expand Up @@ -191,7 +195,7 @@ public virtual void WriteQuery(long token, string json, bool assignAwaiter = tru

public virtual void Close()
{
this.pump.Cancel();
this.pump?.Cancel();
try
{
#if DNXCORE50
Expand Down

0 comments on commit 34c25b4

Please sign in to comment.