Skip to content

Commit

Permalink
Eliminated count changed regression issue. Fixes #188
Browse files Browse the repository at this point in the history
  • Loading branch information
RolandPheasant committed Dec 16, 2018
1 parent 4833449 commit 44cc416
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 61 deletions.
2 changes: 1 addition & 1 deletion DynamicData.ReactiveUI/DynamicData.ReactiveUI.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</ItemGroup>

<PropertyGroup>
<Title>Dynamic Data / Reactive UI intergration</Title>
<Title>Dynamic Data / Reactive UI inte gration</Title>
<Description>
Make reactive ui even more powerful by integrating with dynamic data and making use of it's numerious operators.
Dynamic data provides an observable cache and an observable list with at least 50 collection specific operators for each.
Expand Down
108 changes: 104 additions & 4 deletions DynamicData.Tests/Cache/ObservableToObservableChangeSetFixture.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using DynamicData.Kernel;
using DynamicData.Tests.Domain;
Expand All @@ -9,9 +10,39 @@

namespace DynamicData.Tests.Cache
{

public class QuickAndDirtyPerformanceMeasure
{
private static readonly Person[] _people = Enumerable.Range(1, 56_000).Select(i => new Person($"Name {i}", i)).ToArray();
private readonly SourceCache<Person, string> _peopleCache = new SourceCache<Person, string>(p=> p.Name);

[Fact]
public void AddLotsOfItems()
{
_peopleCache.AddOrUpdate(_people);
}

[Fact]
public void DoSomeStuffWithAnExtraOrdinarilySimplisticMeansOfMeasuringPerformance()
{
var mySubscriptions = _peopleCache
.Connect()
.Do(_ => { })
.Transform(x => x) //
.Do(_ => { })
.Subscribe();

_peopleCache.AddOrUpdate(_people);
}
}



public class ObservableToObservableChangeSetFixture
{




[Fact]
public void OnNextFiresAdd()
{
Expand Down Expand Up @@ -95,10 +126,79 @@ public void ExpireAfterTime()

scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks);

results.Messages.Count.Should().Be(201, "Should be 300 messages");
results.Messages.Count.Should().Be(201, "Should be 201 messages");
results.Messages.Sum(x => x.Adds).Should().Be(200, "Should be 200 adds");
results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 100 removes");
results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 200 removes");
results.Data.Count.Should().Be(0, "Should be no data in the cache");
}

[Fact]
public void ExpireAfterTimeWithKey()
{
var subject = new Subject<Person>();
var scheduler = new TestScheduler();
var results = subject.ToObservableChangeSet(p => p.Key, expireAfter: t => TimeSpan.FromMinutes(1), scheduler: scheduler).AsAggregator();

var items = Enumerable.Range(1, 200).Select(i => new Person("p" + i.ToString("000"), i)).ToArray();
foreach (var person in items)
{
subject.OnNext(person);
}

scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks);

results.Messages.Count.Should().Be(201, "Should be 201 messages");
results.Messages.Sum(x => x.Adds).Should().Be(200, "Should be 200 adds");
results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 200 removes");
results.Data.Count.Should().Be(0, "Should be no data in the cache");
}

[Fact]
public void ExpireAfterTimeDynamic()
{
var scheduler = new TestScheduler();
var source =
Observable.Interval(TimeSpan.FromSeconds(1), scheduler: scheduler)
.Take(30)
.Select(i => (int)i)
.Select(i => new Person("p" + i.ToString("000"), i));

var results = source.ToObservableChangeSet(expireAfter: t => TimeSpan.FromSeconds(10), scheduler: scheduler).AsAggregator();

scheduler.AdvanceBy(TimeSpan.FromSeconds(30).Ticks);

Console.WriteLine(results.Messages.Count);
Console.WriteLine(results.Messages.Sum(x => x.Adds));
Console.WriteLine(results.Messages.Sum(x => x.Removes));

results.Messages.Count.Should().Be(50, "Should be 50 messages");
results.Messages.Sum(x => x.Adds).Should().Be(30, "Should be 30 adds");
results.Messages.Sum(x => x.Removes).Should().Be(20, "Should be 20 removes");
results.Data.Count.Should().Be(10, "Should be 10 items in the cache");
}

[Fact]
public void ExpireAfterTimeDynamicWithKey()
{
var scheduler = new TestScheduler();
var source =
Observable.Interval(TimeSpan.FromSeconds(1), scheduler: scheduler)
.Take(30)
.Select(i => (int)i)
.Select(i => new Person("p" + i.ToString("000"), i));

var results = source.ToObservableChangeSet(p => p.Key, expireAfter: t => TimeSpan.FromSeconds(10), scheduler: scheduler).AsAggregator();

scheduler.AdvanceBy(TimeSpan.FromSeconds(30).Ticks);

Console.WriteLine(results.Messages.Count);
Console.WriteLine(results.Messages.Sum(x => x.Adds));
Console.WriteLine(results.Messages.Sum(x => x.Removes));

results.Messages.Count.Should().Be(50, "Should be 50 messages");
results.Messages.Sum(x => x.Adds).Should().Be(30, "Should be 30 adds");
results.Messages.Sum(x => x.Removes).Should().Be(20, "Should be 20 removes");
results.Data.Count.Should().Be(10, "Should be 10 items in the cache");
}
}
}
}
34 changes: 29 additions & 5 deletions DynamicData.Tests/Cache/SourceCacheFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

namespace DynamicData.Tests.Cache
{
public class SourceCacheFixture: IDisposable

public class SourceCacheFixture : IDisposable
{
private readonly ChangeSetAggregator<Person, string> _results;
private readonly ISourceCache<Person, string> _source;

public SourceCacheFixture()
public SourceCacheFixture()
{
_source = new SourceCache<Person, string>(p => p.Key);
_results = _source.Connect().AsAggregator();
Expand Down Expand Up @@ -63,7 +63,7 @@ public void CountChangedShouldAlwaysInvokeUponeSubscription()
}

[Fact]
public void CountChangedShouldReflectContentsOfCacheInvokeUponeSubscription()
public void CountChangedShouldReflectContentsOfCacheInvokeUponSubscription()
{
var generator = new RandomPersonGenerator();
int? result = null;
Expand All @@ -82,7 +82,7 @@ public void SubscribesDisposesCorrectly()
bool called = false;
bool errored = false;
bool completed = false;
IDisposable subscription = _source.Connect()
var subscription = _source.Connect()
.Finally(() => completed = true)
.Subscribe(updates => { called = true; }, ex => errored = true, () => completed = true);
_source.AddOrUpdate(new Person("Adult1", 40));
Expand All @@ -94,5 +94,29 @@ public void SubscribesDisposesCorrectly()
called.Should().BeTrue();
completed.Should().BeTrue();
}

[Fact]
public void CountChanged()
{
int count = 0;
int invoked = 0;
using (_source.CountChanged.Subscribe(c =>
{
count = c;
invoked++;
}))
{
invoked.Should().Be(1);
count.Should().Be(0);

_source.AddOrUpdate(new RandomPersonGenerator().Take(100));
invoked.Should().Be(2);
count.Should().Be(100);

_source.Clear();
invoked.Should().Be(3);
count.Should().Be(0);
}
}
}
}
2 changes: 1 addition & 1 deletion DynamicData/Aggregation/AggregateItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
namespace DynamicData.Aggregation
{
/// <summary>
/// An object representing added and removed itemsin a continous aggregation stream
/// An object representing added and removed items in a continuous aggregation stream
/// </summary>
/// <typeparam name="TObject">The type of the object.</typeparam>
public readonly struct AggregateItem<TObject>
Expand Down
4 changes: 2 additions & 2 deletions DynamicData/Cache/ChangeSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ internal static class CacheChangeSetEx
/// <summary>
/// IChangeSet is flawed because it automatically means allocations when enumerating.
/// This extension is a crazy hack to cast to the concrete changeset which means we no longer allocate
/// as changset now inherits from List which has allocation free enumerations.
/// as changset now inherits from List which has allocation free enumerations.
///
/// IChangeSet will be removed in V7 and instead Changesets will be used directly
/// IChangeSet will be removed in V7 and instead Change sets will be used directly
///
/// In the mean time I am banking that no-one has implemented a custom change set - personally I think it is very unlikely
/// </summary>
Expand Down
6 changes: 2 additions & 4 deletions DynamicData/Cache/IObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ public interface IConnectableCache<TObject, TKey>
/// Returns an observable of any changes which match the specified key. The sequence starts with the inital item in the cache (if there is one).
/// </summary>
/// <param name="key">The key.</param>
/// <returns></returns>
IObservable<Change<TObject, TKey>> Watch(TKey key);

/// <summary>
/// Returns a observable of cache changes preceeded with the initital cache state
/// Returns a filtered stream of cache changes preceded with the initial filtered state
/// </summary>
/// <param name="predicate">The result will be filtered using the specfied predicate.</param>
/// <returns></returns>
/// <param name="predicate">The result will be filtered using the specified predicate.</param>
IObservable<IChangeSet<TObject, TKey>> Connect(Func<TObject, bool> predicate = null);

/// <summary>
Expand Down
45 changes: 17 additions & 28 deletions DynamicData/Cache/Internal/ChangesReducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,30 @@

namespace DynamicData.Cache.Internal
{
internal class ChangesReducer
internal static class ChangesReducer
{
[Pure]
public static Optional<Change<TObject, TKey>> Reduce<TObject, TKey>(
Optional<Change<TObject, TKey>> previous,
Change<TObject, TKey> next)
public static Optional<Change<TObject, TKey>> Reduce<TObject, TKey>(Optional<Change<TObject, TKey>> previous, Change<TObject, TKey> next)
{
if (!previous.HasValue) return next;
var previousValue = previous.Value;

if (previousValue.Reason == ChangeReason.Add && next.Reason == ChangeReason.Remove)
switch (previousValue.Reason)
{
return Optional<Change<TObject, TKey>>.None;
}
else if (previousValue.Reason == ChangeReason.Remove && next.Reason == ChangeReason.Add)
{
return Optional.Some(
new Change<TObject, TKey>(ChangeReason.Update, next.Key, next.Current, previousValue.Current,
next.CurrentIndex, previousValue.CurrentIndex)
);
}
else if (previousValue.Reason == ChangeReason.Add && next.Reason == ChangeReason.Update)
{
return Optional.Some(new Change<TObject, TKey>(ChangeReason.Add, next.Key, next.Current, next.CurrentIndex));
}
else if (previousValue.Reason == ChangeReason.Update && next.Reason == ChangeReason.Update)
{
return Optional.Some(
new Change<TObject, TKey>(ChangeReason.Update, previousValue.Key, next.Current, previousValue.Previous,
next.CurrentIndex, previousValue.PreviousIndex)
);
}
else
{
return next;
case ChangeReason.Add when next.Reason == ChangeReason.Remove:
return Optional<Change<TObject, TKey>>.None;

case ChangeReason.Remove when next.Reason == ChangeReason.Add:
return new Change<TObject, TKey>(ChangeReason.Update, next.Key, next.Current, previousValue.Current, next.CurrentIndex, previousValue.CurrentIndex);

case ChangeReason.Add when next.Reason == ChangeReason.Update:
return new Change<TObject, TKey>(ChangeReason.Add, next.Key, next.Current, next.CurrentIndex);

case ChangeReason.Update when next.Reason == ChangeReason.Update:
return new Change<TObject, TKey>(ChangeReason.Update, previousValue.Key, next.Current, previousValue.Previous, next.CurrentIndex, previousValue.PreviousIndex);

default:
return next;
}
}
}
Expand Down
26 changes: 21 additions & 5 deletions DynamicData/Cache/Internal/DynamicCombiner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ namespace DynamicData.Cache.Internal
internal sealed class DynamicCombiner<TObject, TKey>
{
private readonly IObservableList<IObservable<IChangeSet<TObject, TKey>>> _source;




private readonly CombineOperator _type;

public DynamicCombiner([NotNull] IObservableList<IObservable<IChangeSet<TObject, TKey>>> source, CombineOperator type)
Expand All @@ -19,6 +23,16 @@ public DynamicCombiner([NotNull] IObservableList<IObservable<IChangeSet<TObject,
_type = type;
}


public DynamicCombiner([NotNull] IObservable<IObservable<IChangeSet<TObject, TKey>>> source, CombineOperator type)
{
// _source = source ?? throw new ArgumentNullException(nameof(source));

var xxx =

_type = type;
}

public IObservable<IChangeSet<TObject, TKey>> Run()
{
return Observable.Create<IChangeSet<TObject, TKey>>(observer =>
Expand All @@ -36,13 +50,15 @@ public IObservable<IChangeSet<TObject, TKey>> Run()
.Transform(changeset => new MergeContainer(changeset))
.AsObservableList();

var sharedLists = sourceLists.Connect().Publish();

//merge the items back together
var allChanges = sourceLists.Connect()
var allChanges = sharedLists
.MergeMany(mc => mc.Source)
.Synchronize(locker)
.Subscribe(changes =>
{
//Populate result list and chck for changes
//Populate result list and check for changes
UpdateResultList(resultCache, sourceLists.Items.AsArray(), changes);

var notifications = resultCache.CaptureChanges();
Expand All @@ -51,7 +67,7 @@ public IObservable<IChangeSet<TObject, TKey>> Run()
});

//when an list is removed, need to
var removedItem = sourceLists.Connect()
var removedItem = sharedLists
.OnItemRemoved(mc =>
{
//Remove items if required
Expand All @@ -70,7 +86,7 @@ public IObservable<IChangeSet<TObject, TKey>> Run()
.Subscribe();

//when an list is added or removed, need to
var sourceChanged = sourceLists.Connect()
var sourceChanged = sharedLists
.WhereReasonsAre(ListChangeReason.Add, ListChangeReason.AddRange)
.ForEachItemChange(mc =>
{
Expand All @@ -85,7 +101,7 @@ public IObservable<IChangeSet<TObject, TKey>> Run()
})
.Subscribe();

return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged);
return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged, sharedLists.Connect());
});
}

Expand Down
4 changes: 2 additions & 2 deletions DynamicData/Cache/ObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ private void InvokeNext(ChangeSet<TObject, TKey> changes)
{
lock (_locker)
{
if (changes.Count == 0) return;
_changes.OnNext(changes);
if (changes.Count != 0)
_changes.OnNext(changes);

if (_countChanged.IsValueCreated)
_countChanged.Value.OnNext(_readerWriter.Count);
Expand Down
Loading

0 comments on commit 44cc416

Please sign in to comment.