Skip to content

Commit

Permalink
More threading improvments #15, use SendQuery instead.
Browse files Browse the repository at this point in the history
  • Loading branch information
bchavez committed Nov 24, 2015
1 parent 381f447 commit 82626b8
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 116 deletions.
3 changes: 2 additions & 1 deletion HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
* Database connection thread-safety.
* EnsureSuccess() renamed to AssertNoErrors()
* Assert: Deleted(), Inserted(), skipped(), replaced, etc.. helpers.
*
* Better Reactive Extension (Rx) compatibility.
* Added Cursor.MoveNext(Timeout) for manual cursor movement.

## v0.0.7-alpha6
* Added run helpers: runResult(), runChanges<T>()
Expand Down
41 changes: 41 additions & 0 deletions Source/RethinkDb.Driver.Tests/ReQL/AsyncAwaitTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
using FluentAssertions;
using NUnit.Framework;

namespace RethinkDb.Driver.Tests.ReQL
{
[TestFixture]
public class AsyncAwaitTests : QueryTestFixture
{
[Test]
public async void basic_test()
{
bool b = await r.expr(true).runAsync<bool>(conn);

b.Should().Be(true);
}

[Test]
public async void async_insert()
{

}

[Test]
public void asnync_()
{

}
}


[TestFixture]
public class ChangeFeedTests : QueryTestFixture
{
[Test]
public void Test()
{

}
}

}
91 changes: 77 additions & 14 deletions Source/RethinkDb.Driver.Tests/ReQL/RxReactiveExtensionTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using System;
using System.CodeDom.Compiler;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using Newtonsoft.Json;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Newtonsoft.Json.Linq;
using NUnit.Framework;
using RethinkDb.Driver.Model;
Expand All @@ -10,47 +12,108 @@
namespace RethinkDb.Driver.Tests.ReQL
{
[TestFixture]
[Explicit]
public class RxReactiveExtensionTests : QueryTestFixture
{
[Test]
public void basic_change_feed_with_reactive_extensions()
{
var result = r.db(DbName).table(TableName)
.delete()[new {return_changes = true}]
.runResult(conn)
.AssertNoErrors();
var onCompleted = 0;
var onError = 0;
var onNext = 0;

result.Dump();
var result = r.db(DbName).table(TableName)
.delete()[new { return_changes = true }]
.runResult(conn)
.AssertNoErrors();

result.ChangesAs<JObject>().Dump();

var changes = r.db(DbName).table(TableName)
//.changes()[new {include_states = true, include_initial = true}]
.changes()[new { include_states = true }]
//.runCursor<Change<JObject>>(conn);
.changes()
.runChanges<JObject>(conn);

var observable = changes.ToObservable();

observable.Subscribe(OnNext, OnError, OnCompleted);
//use a new thread if you want to continue,
//otherwise, subscription will block.
observable.SubscribeOn(NewThreadScheduler.Default)
.Subscribe(
x => OnNext(x, ref onNext),
e => OnError(e, ref onError),
() => OnCompleted(ref onCompleted)
);

Thread.Sleep(3000);

Task.Run(() =>
{
r.db(DbName).table(TableName)
.insert(new { foo = "bar" })
.run(conn);
});

Thread.Sleep(3000);

Task.Run(() =>
{
r.db(DbName).table(TableName)
.insert(new { foo = "bar" })
.run(conn);
});

Thread.Sleep(3000);

Task.Run(() =>
{
r.db(DbName).table(TableName)
.insert(new { foo = "bar" })
.run(conn);
});

Thread.Sleep(3000);

changes.close();

onCompleted.Should().Be(1);
onNext.Should().Be(3);
onError.Should().Be(0);
}

private void OnCompleted()
private void OnCompleted(ref int onCompleted)
{
Console.WriteLine("On Completed.");
onCompleted++;
}

private void OnError(Exception obj)
private void OnError(Exception obj, ref int onError)
{
Console.WriteLine("On Error");
Console.WriteLine(obj.Message);
onError++;
}

private void OnNext(Change<JObject> obj)
private void OnNext(Change<JObject> obj, ref int onNext)
{
Console.WriteLine("On Next");
obj.Dump();
onNext++;
}

[Test]
[Explicit]
public void change_feeds_without_rx()
{
var result = r.db(DbName).table(TableName)
.delete()[new {return_changes = true}]
.runResult(conn)
.AssertNoErrors();

var changes = r.db(DbName).table(TableName)
.changes()[new {include_states = true}]
.runChanges<JObject>(conn);

changes.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
<Compile Include="AppSettings.cs" />
<Compile Include="Network\ConnectionTest.cs" />
<Compile Include="QueryTestFixture.cs" />
<Compile Include="ReQL\AsyncAwaitTests.cs" />
<Compile Include="ReQL\DateAndTimeTests.cs" />
<Compile Include="ReQL\Examples.cs" />
<Compile Include="ReQL\ExperimentalTests.cs" />
Expand Down
52 changes: 26 additions & 26 deletions Source/RethinkDb.Driver/Net/Connection.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using RethinkDb.Driver.Ast;
using RethinkDb.Driver.Model;

Expand Down Expand Up @@ -35,10 +32,10 @@ internal Connection(Builder builder)
using( var ms = new MemoryStream() )
using( var bw = new BinaryWriter(ms) )
{
bw.Write((int)RethinkDb.Driver.Proto.Version.V0_4);
bw.Write((int)Proto.Version.V0_4);
bw.Write(authKeyBytes.Length);
bw.Write(authKeyBytes);
bw.Write((int)RethinkDb.Driver.Proto.Protocol.JSON);
bw.Write((int)Proto.Protocol.JSON);
bw.Flush();
handshake = ms.ToArray();
}
Expand Down Expand Up @@ -145,39 +142,36 @@ private long NewToken()
return Interlocked.Increment(ref nextToken);
}

internal virtual async Task<Response> AwaitResponseAsync(Query query, long? deadline = null)
{
return await checkOpen().AwaitResponseAsync(query, deadline);
}

internal async virtual Task<Cursor<T>> RunQueryCursorAsync<T>(Query query)
{
var inst = checkOpen();
if( inst.Socket == null ) throw new ReqlDriverError("No socket open.");
inst.Socket.WriteQuery(query.Token, query.Serialize());
Response res = await inst.AwaitResponseAsync(query);
var res = await RunQuery(query, awaitResponse: true);
if( res.IsPartial || res.IsSequence )
{
return Cursor<T>.create(this, query, res);
}
throw new ReqlDriverError("The query response can't be converted to a Cursor<T>. The response is not a sequence or partial. Use `.run` instead.");
}

internal virtual void RunQueryNoreply(Query query, bool assignAwaiter)
private Task<Response> RunQuery(Query query, bool awaitResponse)
{
var inst = checkOpen();
if( inst.Socket == null ) throw new ReqlDriverError("No socket open.");
inst.Socket.WriteQuery(query.Token, query.Serialize(), assignAwaiter);
return inst.Socket.SendQuery(query.Token, query.Serialize(), awaitResponse);
}

internal async virtual Task<dynamic> RunQueryAsync<T>(Query query)
internal virtual Task<Response> RunQueryReply(Query query)
{
var inst = checkOpen();
if( inst.Socket == null ) throw new ReqlDriverError("No socket open.");
return RunQuery(query, awaitResponse: true);
}

inst.Socket.WriteQuery(query.Token, query.Serialize());
internal virtual void RunQueryNoReply(Query query)
{
RunQuery(query, awaitResponse: false);
}

Response res = await inst.AwaitResponseAsync(query);
internal async virtual Task<dynamic> RunQueryAsync<T>(Query query)
{
var res = await RunQuery(query, awaitResponse: true);

if( res.IsAtom )
{
Expand Down Expand Up @@ -253,17 +247,23 @@ public void runNoReply(ReqlAst term, object globalOpts)
var opts = OptArgs.fromAnonType(globalOpts);
SetDefaultDb(opts);
opts.with("noreply", true);
RunQueryNoreply(Query.Start(NewToken(), term, opts), assignAwaiter: false);
RunQueryNoReply(Query.Start(NewToken(), term, opts));
}

internal virtual void Continue(ICursor cursor)
internal virtual Task<Response> Continue(ICursor cursor)
{
RunQueryNoreply(Query.Continue(cursor.Token), assignAwaiter: true);
return RunQueryReply(Query.Continue(cursor.Token));
}

internal virtual void Stop(ICursor cursor)
internal virtual Task<Response> Stop(ICursor cursor)
{
RunQueryNoreply(Query.Stop(cursor.Token), assignAwaiter: false);
/*
neumino: The END query itself doesn't come back with a response
cowboy: ..... a Query[token,STOP], is like sending a very last CONTINUE, r:[] would contain the last bits of the finished seq
neumino: Yes a STOP is like a very last CONTINUE
neumino: If you have a pending CONTINUE, and send a STOP, you should get back two SUCCESS_SEQUENCE
*/
return RunQueryReply(Query.Stop(cursor.Token));
}


Expand Down
16 changes: 1 addition & 15 deletions Source/RethinkDb.Driver/Net/ConnectionInstance.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
using RethinkDb.Driver.Ast;

namespace RethinkDb.Driver.Net
{
Expand All @@ -20,10 +18,7 @@ public virtual void Connect(string hostname, int port, byte[] handshake, TimeSpa
Socket = sock;
}

public virtual bool Open
{
get { return Socket?.Open ?? false; }
}
public virtual bool Open => this.Socket?.Open ?? false;

public virtual void Close()
{
Expand All @@ -50,14 +45,5 @@ internal virtual void RemoveFromCache(long token)
}
}

internal virtual async Task<Response> AwaitResponseAsync(Query query, long? deadline = null)
{
if( Socket == null )
throw new ReqlError("Socket not open");

long token = query.Token;
//we always get the response we're looking for. :)
return await this.Socket.AwaitResponseAsync(token);
}
}
}
Loading

0 comments on commit 82626b8

Please sign in to comment.