Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preview observable with monitor lock #218

Merged
merged 4 commits into from
Apr 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions DynamicData.Tests/Cache/ObservableCachePreviewFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using DynamicData.Tests.Domain;
using Xunit;

namespace DynamicData.Tests.Cache
{
public class ObservableCachePreviewFixture : IDisposable
{
private readonly ISourceCache<Person, string> _source;
private readonly ChangeSetAggregator<Person, string> _results;

public ObservableCachePreviewFixture()
{
_source = new SourceCache<Person, string>(p => p.Name);
_results = _source.Connect().AsAggregator();
}

public void Dispose()
{
_source.Dispose();
_results.Dispose();
}

[Fact]
public void NoChangesAllowedDuringPreview()
{
// On preview, try adding an arbitrary item
var d = _source.Preview().Subscribe(_ =>
{
Assert.Throws<InvalidOperationException>(() => _source.AddOrUpdate(new Person("A", 1)));
});

// Trigger a change
_source.AddOrUpdate(new Person("B", 2));

// Cleanup
d.Dispose();
}

[Fact]
public void RecursiveEditsWork()
{
var person = new Person("A", 1);

_source.Edit(l =>
{
_source.AddOrUpdate(person);
Assert.True(_source.Items.SequenceEqual(new[] { person }));
Assert.True(l.Items.SequenceEqual(new[] { person }));
});

Assert.True(_source.Items.SequenceEqual(new[] { person }));
}

[Fact]
public void RecursiveEditsHavePostponedEvents()
{
var person = new Person("A", 1);

var preview = _source.Preview().AsAggregator();
var connect = _source.Connect().AsAggregator();
_source.Edit(l =>
{
_source.Edit(l2 => l2.AddOrUpdate(person));
Assert.Equal(0, preview.Messages.Count);
Assert.Equal(0, connect.Messages.Count);
});

Assert.Equal(1, preview.Messages.Count);
Assert.Equal(1, connect.Messages.Count);

Assert.True(_source.Items.SequenceEqual(new[] { person }));
}

[Fact]
public void PreviewEventsAreCorrect()
{
var person = new Person("A", 1);

var preview = _source.Preview().AsAggregator();
var connect = _source.Connect().AsAggregator();
_source.Edit(l =>
{
_source.Edit(l2 => l2.AddOrUpdate(person));
l.Remove(person);
l.AddOrUpdate(new[] { new Person("B", 2), new Person("C", 3) });
});

Assert.True(preview.Messages.SequenceEqual(connect.Messages));
Assert.True(_source.KeyValues.OrderBy(t => t.Value.Age).Select(t => t.Value.Age).SequenceEqual(new[] { 2, 3 }));
}

[Fact]
public void ChangesAreNotYetAppliedDuringPreview()
{
_source.Clear();

// On preview, make sure the list is empty
var d = _source.Preview().Subscribe(_ =>
{
Assert.True(_source.Count == 0);
Assert.True(_source.Items.Count() == 0);
});

// Trigger a change
_source.AddOrUpdate(new Person("A", 1));

// Cleanup
d.Dispose();
}

[Fact]
public void ConnectPreviewPredicateIsApplied()
{
_source.Clear();

// Collect preview messages about even numbers only
var aggregator = _source.Preview(i => i.Age == 2).AsAggregator();

// Trigger changes
_source.AddOrUpdate(new Person("A", 1));
_source.AddOrUpdate(new Person("B", 2));
_source.AddOrUpdate(new Person("C", 3));

Assert.True(aggregator.Messages.Count == 1);
Assert.True(aggregator.Messages[0].Count == 1);
Assert.True(aggregator.Messages[0].First().Key == "B");
Assert.True(aggregator.Messages[0].First().Reason == ChangeReason.Add);

// Cleanup
aggregator.Dispose();
}
}
}
130 changes: 130 additions & 0 deletions DynamicData.Tests/List/SourceListPreviewFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using FluentAssertions;
using Xunit;

namespace DynamicData.Tests.List
{
public class SourceListPreviewFixture : IDisposable
{
private readonly ISourceList<int> _source;

public SourceListPreviewFixture()
{
_source = new SourceList<int>();
}

public void Dispose()
{
_source.Dispose();
}

[Fact]
public void NoChangesAllowedDuringPreview()
{
// On preview, try adding an arbitrary item
var d = _source.Preview().Subscribe(_ =>
{
Assert.Throws<InvalidOperationException>(() => _source.Add(1));
});

// Trigger a change
_source.Add(1);

// Cleanup
d.Dispose();
}

[Fact]
public void RecursiveEditsWork()
{
_source.Edit(l =>
{
_source.Edit(l2 => l2.Add(1));
Assert.True(_source.Items.SequenceEqual(new[] { 1 }));
Assert.True(l.SequenceEqual(new[] { 1 }));
});

Assert.True(_source.Items.SequenceEqual(new []{1}));
}

[Fact]
public void RecursiveEditsHavePostponedEvents()
{
var preview = _source.Preview().AsAggregator();
var connect = _source.Connect().AsAggregator();
_source.Edit(l =>
{
_source.Edit(l2 => l2.Add(1));
Assert.Equal(0, preview.Messages.Count);
Assert.Equal(0, connect.Messages.Count);
});

Assert.Equal(1, preview.Messages.Count);
Assert.Equal(1, connect.Messages.Count);

Assert.True(_source.Items.SequenceEqual(new[] { 1 }));
}

[Fact]
public void PreviewEventsAreCorrect()
{
var preview = _source.Preview().AsAggregator();
var connect = _source.Connect().AsAggregator();
_source.Edit(l =>
{
l.Add(1);
_source.Edit(l2 => l2.Add(2));
l.Remove(2);
l.AddRange(new []{3, 4, 5});
l.Move(1, 0);
});

Assert.True(preview.Messages.SequenceEqual(connect.Messages));
Assert.True(_source.Items.SequenceEqual(new[] { 3, 1, 4, 5 }));
}

[Fact]
public void ChangesAreNotYetAppliedDuringPreview()
{
_source.Clear();

// On preview, make sure the list is empty
var d = _source.Preview().Subscribe(_ =>
{
Assert.True(_source.Count == 0);
Assert.True(_source.Items.Count() == 0);
});

// Trigger a change
_source.Add(1);

// Cleanup
d.Dispose();
}

[Fact]
public void ConnectPreviewPredicateIsApplied()
{
_source.Clear();

// Collect preview messages about even numbers only
var aggregator = _source.Preview(i => i % 2 == 0) .AsAggregator();

// Trigger changes
_source.Add(1);
_source.Add(2);

Assert.True(aggregator.Messages.Count == 1);
Assert.True(aggregator.Messages[0].Count == 1);
Assert.True(aggregator.Messages[0].First().Item.Current == 2);
Assert.True(aggregator.Messages[0].First().Reason == ListChangeReason.Add);

// Cleanup
aggregator.Dispose();
}
}
}
7 changes: 7 additions & 0 deletions DynamicData/Cache/IObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ public interface IConnectableCache<TObject, TKey>
/// <param name="predicate">The result will be filtered using the specified predicate.</param>
IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null);

/// <summary>
/// Returns a filtered stream of cache changes.
/// Unlike Connect(), the returned observable is not prepended with the caches initial items.
/// </summary>
/// <param name="predicate">The result will be filtered using the specified predicate.</param>
IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool> predicate = null);

/// <summary>
/// A count changed observable starting with the current count
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion DynamicData/Cache/ISourceCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace DynamicData
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
/// <typeparam name="TKey">The type of the key.</typeparam>
public interface ISourceCache<TObject, TKey> : IObservableCache<TObject, TKey>, ICollectionSubject
public interface ISourceCache<TObject, TKey> : IObservableCache<TObject, TKey>
{
/// <summary>
/// Action to apply a batch update to a cache. Multiple update methods can be invoked within a single batch operation.
Expand Down
14 changes: 10 additions & 4 deletions DynamicData/Cache/IntermediateCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,26 +52,32 @@ public void Edit(Action<ICacheUpdater<TObject, TKey>> updateAction)
public IObservable<int> CountChanged => _innnerCache.CountChanged;

/// <summary>
/// Returns a filtered changeset of cache changes preceeded with the initial state
/// Returns a filtered changeset of cache changes preceded with the initial state
/// </summary>
/// <param name="predicate">The precdicate.</param>
/// <param name="predicate">The predicate.</param>
/// <returns></returns>
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate)
{
return _innnerCache.Connect(predicate);
}

/// <summary>
/// Returns a observable of cache changes preceeded with the initital cache state
/// Returns a observable of cache changes preceded with the initial cache state
/// </summary>
/// <returns></returns>
public IObservable<IChangeSet<TObject, TKey>> Connect()
{
return _innnerCache.Connect();
}

/// <inheritdoc />
public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool> predicate = null)
{
return _innnerCache.Preview(predicate);
}

/// <summary>
/// Returns an observable of any changes which match the specified key. The sequence starts with the inital item in the cache (if there is one).
/// Returns an observable of any changes which match the specified key. The sequence starts with the initial item in the cache (if there is one).
/// </summary>
/// <param name="key">The key.</param>
/// <returns></returns>
Expand Down
5 changes: 5 additions & 0 deletions DynamicData/Cache/Internal/AnonymousObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predic
return _cache.Connect(predicate);
}

public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool> predicate = null)
{
return _cache.Preview(predicate);
}

public IEnumerable<TKey> Keys => _cache.Keys;

public IEnumerable<TObject> Items => _cache.Items;
Expand Down
16 changes: 12 additions & 4 deletions DynamicData/Cache/Internal/LockFreeObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace DynamicData.Cache.Internal
{

/// <summary>
/// An observable cache which exposes an update API. Used at the root
/// An observable cache which exposes an update API. Used at the root
/// of all observable chains
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
Expand All @@ -20,6 +20,7 @@ public class LockFreeObservableCache<TObject, TKey> : IObservableCache<TObject,
private readonly ChangeAwareCache<TObject, TKey> _innerCache = new ChangeAwareCache<TObject, TKey>();
private readonly ICacheUpdater<TObject, TKey> _updater;
private readonly ISubject<IChangeSet<TObject, TKey>> _changes = new Subject<IChangeSet<TObject, TKey>>();
private readonly ISubject<IChangeSet<TObject, TKey>> _changesPreview = new Subject<IChangeSet<TObject, TKey>>();
private readonly ISubject<int> _countChanged = new Subject<int>();
private readonly IDisposable _cleanUp;

Expand All @@ -40,6 +41,7 @@ public LockFreeObservableCache(IObservable<IChangeSet<TObject, TKey>> source)
_cleanUp = Disposable.Create(() =>
{
loader.Dispose();
_changesPreview.OnCompleted();
_changes.OnCompleted();
_countChanged.OnCompleted();
});
Expand All @@ -61,9 +63,9 @@ public LockFreeObservableCache()


/// <summary>
/// Returns a observable of cache changes preceeded with the initital cache state
/// Returns a observable of cache changes preceded with the initial cache state
/// </summary>
/// <param name="predicate">The result will be filtered using the specfied predicate.</param>
/// <param name="predicate">The result will be filtered using the specified predicate.</param>
/// <returns></returns>
public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null)
{
Expand All @@ -77,8 +79,14 @@ public IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predic
});
}

/// <inheritdoc />
public IObservable<IChangeSet<TObject, TKey>> Preview(Func<TObject, bool> predicate = null)
{
return predicate == null ? _changesPreview : _changesPreview.Filter(predicate);
}

/// <summary>
/// Returns an observable of any changes which match the specified key. The sequence starts with the inital item in the cache (if there is one).
/// Returns an observable of any changes which match the specified key. The sequence starts with the initial item in the cache (if there is one).
/// </summary>
/// <param name="key">The key.</param>
/// <returns></returns>
Expand Down
Loading