diff --git a/DynamicData.ReactiveUI/DynamicData.ReactiveUI.csproj b/DynamicData.ReactiveUI/DynamicData.ReactiveUI.csproj index 68b5254ed..804eadfb0 100644 --- a/DynamicData.ReactiveUI/DynamicData.ReactiveUI.csproj +++ b/DynamicData.ReactiveUI/DynamicData.ReactiveUI.csproj @@ -14,7 +14,7 @@ - Dynamic Data / Reactive UI intergration + Dynamic Data / Reactive UI inte gration Make reactive ui even more powerful by integrating with dynamic data and making use of it's numerious operators. Dynamic data provides an observable cache and an observable list with at least 50 collection specific operators for each. diff --git a/DynamicData.Tests/Cache/ObservableToObservableChangeSetFixture.cs b/DynamicData.Tests/Cache/ObservableToObservableChangeSetFixture.cs index 56645362a..cb399c031 100644 --- a/DynamicData.Tests/Cache/ObservableToObservableChangeSetFixture.cs +++ b/DynamicData.Tests/Cache/ObservableToObservableChangeSetFixture.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Reactive.Linq; using System.Reactive.Subjects; using DynamicData.Kernel; using DynamicData.Tests.Domain; @@ -9,9 +10,39 @@ namespace DynamicData.Tests.Cache { - + public class QuickAndDirtyPerformanceMeasure + { + private static readonly Person[] _people = Enumerable.Range(1, 56_000).Select(i => new Person($"Name {i}", i)).ToArray(); + private readonly SourceCache _peopleCache = new SourceCache(p=> p.Name); + + [Fact] + public void AddLotsOfItems() + { + _peopleCache.AddOrUpdate(_people); + } + + [Fact] + public void DoSomeStuffWithAnExtraOrdinarilySimplisticMeansOfMeasuringPerformance() + { + var mySubscriptions = _peopleCache + .Connect() + .Do(_ => { }) + .Transform(x => x) // + .Do(_ => { }) + .Subscribe(); + + _peopleCache.AddOrUpdate(_people); + } + } + + + public class ObservableToObservableChangeSetFixture { + + + + [Fact] public void OnNextFiresAdd() { @@ -95,10 +126,79 @@ public void ExpireAfterTime() scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks); - results.Messages.Count.Should().Be(201, "Should be 300 messages"); + results.Messages.Count.Should().Be(201, "Should be 201 messages"); results.Messages.Sum(x => x.Adds).Should().Be(200, "Should be 200 adds"); - results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 100 removes"); + results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 200 removes"); results.Data.Count.Should().Be(0, "Should be no data in the cache"); } + + [Fact] + public void ExpireAfterTimeWithKey() + { + var subject = new Subject(); + var scheduler = new TestScheduler(); + var results = subject.ToObservableChangeSet(p => p.Key, expireAfter: t => TimeSpan.FromMinutes(1), scheduler: scheduler).AsAggregator(); + + var items = Enumerable.Range(1, 200).Select(i => new Person("p" + i.ToString("000"), i)).ToArray(); + foreach (var person in items) + { + subject.OnNext(person); + } + + scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks); + + results.Messages.Count.Should().Be(201, "Should be 201 messages"); + results.Messages.Sum(x => x.Adds).Should().Be(200, "Should be 200 adds"); + results.Messages.Sum(x => x.Removes).Should().Be(200, "Should be 200 removes"); + results.Data.Count.Should().Be(0, "Should be no data in the cache"); + } + + [Fact] + public void ExpireAfterTimeDynamic() + { + var scheduler = new TestScheduler(); + var source = + Observable.Interval(TimeSpan.FromSeconds(1), scheduler: scheduler) + .Take(30) + .Select(i => (int)i) + .Select(i => new Person("p" + i.ToString("000"), i)); + + var results = source.ToObservableChangeSet(expireAfter: t => TimeSpan.FromSeconds(10), scheduler: scheduler).AsAggregator(); + + scheduler.AdvanceBy(TimeSpan.FromSeconds(30).Ticks); + + Console.WriteLine(results.Messages.Count); + Console.WriteLine(results.Messages.Sum(x => x.Adds)); + Console.WriteLine(results.Messages.Sum(x => x.Removes)); + + results.Messages.Count.Should().Be(50, "Should be 50 messages"); + results.Messages.Sum(x => x.Adds).Should().Be(30, "Should be 30 adds"); + results.Messages.Sum(x => x.Removes).Should().Be(20, "Should be 20 removes"); + results.Data.Count.Should().Be(10, "Should be 10 items in the cache"); + } + + [Fact] + public void ExpireAfterTimeDynamicWithKey() + { + var scheduler = new TestScheduler(); + var source = + Observable.Interval(TimeSpan.FromSeconds(1), scheduler: scheduler) + .Take(30) + .Select(i => (int)i) + .Select(i => new Person("p" + i.ToString("000"), i)); + + var results = source.ToObservableChangeSet(p => p.Key, expireAfter: t => TimeSpan.FromSeconds(10), scheduler: scheduler).AsAggregator(); + + scheduler.AdvanceBy(TimeSpan.FromSeconds(30).Ticks); + + Console.WriteLine(results.Messages.Count); + Console.WriteLine(results.Messages.Sum(x => x.Adds)); + Console.WriteLine(results.Messages.Sum(x => x.Removes)); + + results.Messages.Count.Should().Be(50, "Should be 50 messages"); + results.Messages.Sum(x => x.Adds).Should().Be(30, "Should be 30 adds"); + results.Messages.Sum(x => x.Removes).Should().Be(20, "Should be 20 removes"); + results.Data.Count.Should().Be(10, "Should be 10 items in the cache"); + } } -} +} \ No newline at end of file diff --git a/DynamicData.Tests/Cache/SourceCacheFixture.cs b/DynamicData.Tests/Cache/SourceCacheFixture.cs index 242083a4f..3fa8c5412 100644 --- a/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -6,13 +6,13 @@ namespace DynamicData.Tests.Cache { - - public class SourceCacheFixture: IDisposable + + public class SourceCacheFixture : IDisposable { private readonly ChangeSetAggregator _results; private readonly ISourceCache _source; - public SourceCacheFixture() + public SourceCacheFixture() { _source = new SourceCache(p => p.Key); _results = _source.Connect().AsAggregator(); @@ -63,7 +63,7 @@ public void CountChangedShouldAlwaysInvokeUponeSubscription() } [Fact] - public void CountChangedShouldReflectContentsOfCacheInvokeUponeSubscription() + public void CountChangedShouldReflectContentsOfCacheInvokeUponSubscription() { var generator = new RandomPersonGenerator(); int? result = null; @@ -82,7 +82,7 @@ public void SubscribesDisposesCorrectly() bool called = false; bool errored = false; bool completed = false; - IDisposable subscription = _source.Connect() + var subscription = _source.Connect() .Finally(() => completed = true) .Subscribe(updates => { called = true; }, ex => errored = true, () => completed = true); _source.AddOrUpdate(new Person("Adult1", 40)); @@ -94,5 +94,29 @@ public void SubscribesDisposesCorrectly() called.Should().BeTrue(); completed.Should().BeTrue(); } + + [Fact] + public void CountChanged() + { + int count = 0; + int invoked = 0; + using (_source.CountChanged.Subscribe(c => + { + count = c; + invoked++; + })) + { + invoked.Should().Be(1); + count.Should().Be(0); + + _source.AddOrUpdate(new RandomPersonGenerator().Take(100)); + invoked.Should().Be(2); + count.Should().Be(100); + + _source.Clear(); + invoked.Should().Be(3); + count.Should().Be(0); + } + } } } diff --git a/DynamicData/Aggregation/AggregateItem.cs b/DynamicData/Aggregation/AggregateItem.cs index ae2a2b258..016a85d41 100644 --- a/DynamicData/Aggregation/AggregateItem.cs +++ b/DynamicData/Aggregation/AggregateItem.cs @@ -2,7 +2,7 @@ namespace DynamicData.Aggregation { /// - /// An object representing added and removed itemsin a continous aggregation stream + /// An object representing added and removed items in a continuous aggregation stream /// /// The type of the object. public readonly struct AggregateItem diff --git a/DynamicData/Cache/ChangeSet.cs b/DynamicData/Cache/ChangeSet.cs index f51218561..ba77c4490 100644 --- a/DynamicData/Cache/ChangeSet.cs +++ b/DynamicData/Cache/ChangeSet.cs @@ -9,9 +9,9 @@ internal static class CacheChangeSetEx /// /// IChangeSet is flawed because it automatically means allocations when enumerating. /// This extension is a crazy hack to cast to the concrete changeset which means we no longer allocate - /// as changset now inherits from List which has allocation free enumerations. + /// as changset now inherits from List which has allocation free enumerations. /// - /// IChangeSet will be removed in V7 and instead Changesets will be used directly + /// IChangeSet will be removed in V7 and instead Change sets will be used directly /// /// In the mean time I am banking that no-one has implemented a custom change set - personally I think it is very unlikely /// diff --git a/DynamicData/Cache/IObservableCache.cs b/DynamicData/Cache/IObservableCache.cs index 1a03e395e..6028d6815 100644 --- a/DynamicData/Cache/IObservableCache.cs +++ b/DynamicData/Cache/IObservableCache.cs @@ -15,14 +15,12 @@ public interface IConnectableCache /// Returns an observable of any changes which match the specified key. The sequence starts with the inital item in the cache (if there is one). /// /// The key. - /// IObservable> Watch(TKey key); /// - /// Returns a observable of cache changes preceeded with the initital cache state + /// Returns a filtered stream of cache changes preceded with the initial filtered state /// - /// The result will be filtered using the specfied predicate. - /// + /// The result will be filtered using the specified predicate. IObservable> Connect(Func predicate = null); /// diff --git a/DynamicData/Cache/Internal/ChangesReducer.cs b/DynamicData/Cache/Internal/ChangesReducer.cs index d0b96be9a..4e54c3e44 100644 --- a/DynamicData/Cache/Internal/ChangesReducer.cs +++ b/DynamicData/Cache/Internal/ChangesReducer.cs @@ -3,41 +3,30 @@ namespace DynamicData.Cache.Internal { - internal class ChangesReducer + internal static class ChangesReducer { [Pure] - public static Optional> Reduce( - Optional> previous, - Change next) + public static Optional> Reduce(Optional> previous, Change next) { if (!previous.HasValue) return next; var previousValue = previous.Value; - if (previousValue.Reason == ChangeReason.Add && next.Reason == ChangeReason.Remove) + switch (previousValue.Reason) { - return Optional>.None; - } - else if (previousValue.Reason == ChangeReason.Remove && next.Reason == ChangeReason.Add) - { - return Optional.Some( - new Change(ChangeReason.Update, next.Key, next.Current, previousValue.Current, - next.CurrentIndex, previousValue.CurrentIndex) - ); - } - else if (previousValue.Reason == ChangeReason.Add && next.Reason == ChangeReason.Update) - { - return Optional.Some(new Change(ChangeReason.Add, next.Key, next.Current, next.CurrentIndex)); - } - else if (previousValue.Reason == ChangeReason.Update && next.Reason == ChangeReason.Update) - { - return Optional.Some( - new Change(ChangeReason.Update, previousValue.Key, next.Current, previousValue.Previous, - next.CurrentIndex, previousValue.PreviousIndex) - ); - } - else - { - return next; + case ChangeReason.Add when next.Reason == ChangeReason.Remove: + return Optional>.None; + + case ChangeReason.Remove when next.Reason == ChangeReason.Add: + return new Change(ChangeReason.Update, next.Key, next.Current, previousValue.Current, next.CurrentIndex, previousValue.CurrentIndex); + + case ChangeReason.Add when next.Reason == ChangeReason.Update: + return new Change(ChangeReason.Add, next.Key, next.Current, next.CurrentIndex); + + case ChangeReason.Update when next.Reason == ChangeReason.Update: + return new Change(ChangeReason.Update, previousValue.Key, next.Current, previousValue.Previous, next.CurrentIndex, previousValue.PreviousIndex); + + default: + return next; } } } diff --git a/DynamicData/Cache/Internal/DynamicCombiner.cs b/DynamicData/Cache/Internal/DynamicCombiner.cs index ca5cf4992..a79a3e331 100644 --- a/DynamicData/Cache/Internal/DynamicCombiner.cs +++ b/DynamicData/Cache/Internal/DynamicCombiner.cs @@ -11,6 +11,10 @@ namespace DynamicData.Cache.Internal internal sealed class DynamicCombiner { private readonly IObservableList>> _source; + + + + private readonly CombineOperator _type; public DynamicCombiner([NotNull] IObservableList>> source, CombineOperator type) @@ -19,6 +23,16 @@ public DynamicCombiner([NotNull] IObservableList>> source, CombineOperator type) + { + // _source = source ?? throw new ArgumentNullException(nameof(source)); + + var xxx = + + _type = type; + } + public IObservable> Run() { return Observable.Create>(observer => @@ -36,13 +50,15 @@ public IObservable> Run() .Transform(changeset => new MergeContainer(changeset)) .AsObservableList(); + var sharedLists = sourceLists.Connect().Publish(); + //merge the items back together - var allChanges = sourceLists.Connect() + var allChanges = sharedLists .MergeMany(mc => mc.Source) .Synchronize(locker) .Subscribe(changes => { - //Populate result list and chck for changes + //Populate result list and check for changes UpdateResultList(resultCache, sourceLists.Items.AsArray(), changes); var notifications = resultCache.CaptureChanges(); @@ -51,7 +67,7 @@ public IObservable> Run() }); //when an list is removed, need to - var removedItem = sourceLists.Connect() + var removedItem = sharedLists .OnItemRemoved(mc => { //Remove items if required @@ -70,7 +86,7 @@ public IObservable> Run() .Subscribe(); //when an list is added or removed, need to - var sourceChanged = sourceLists.Connect() + var sourceChanged = sharedLists .WhereReasonsAre(ListChangeReason.Add, ListChangeReason.AddRange) .ForEachItemChange(mc => { @@ -85,7 +101,7 @@ public IObservable> Run() }) .Subscribe(); - return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged); + return new CompositeDisposable(sourceLists, allChanges, removedItem, sourceChanged, sharedLists.Connect()); }); } diff --git a/DynamicData/Cache/ObservableCache.cs b/DynamicData/Cache/ObservableCache.cs index 73837484e..6884617fb 100644 --- a/DynamicData/Cache/ObservableCache.cs +++ b/DynamicData/Cache/ObservableCache.cs @@ -72,8 +72,8 @@ private void InvokeNext(ChangeSet changes) { lock (_locker) { - if (changes.Count == 0) return; - _changes.OnNext(changes); + if (changes.Count != 0) + _changes.OnNext(changes); if (_countChanged.IsValueCreated) _countChanged.Value.OnNext(_readerWriter.Count); diff --git a/DynamicData/Cache/ObservableCacheEx.cs b/DynamicData/Cache/ObservableCacheEx.cs index 04d6111a2..013b69b0c 100644 --- a/DynamicData/Cache/ObservableCacheEx.cs +++ b/DynamicData/Cache/ObservableCacheEx.cs @@ -1868,6 +1868,32 @@ public static IObservable> Or(this IObs return sources.Combine(CombineOperator.Or); } + //public static IObservable> Or(this IObservable> source, + // Func>> manyselector) + //{ + // return new TransformMany(source, manyselector, keySelector).Run(); + //} + + //public static IObservable> Or(this IObservable> source, + // Func>> manyselector) + //{ + // return new TransformMany(source, manyselector, keySelector).Run(); + //} + + //public static IObservable> TransformMany(this IObservable> source, + // Func> manyselector) + //{ + // return new TransformMany(source, manyselector, keySelector).Run(); + //} + + //public static IObservable> Or(this IObservable> sources) + //{ + // if (sources == null) throw new ArgumentNullException(nameof(sources)); + + // return sources.Combine(CombineOperator.Or); + //} + + /// /// Dynamically apply a logical Or operator between the items in the outer observable list. /// Items which are in any of the sources are included in the result @@ -2593,6 +2619,8 @@ public static IObservable> TransformMa } + + #endregion #region Transform safe diff --git a/DynamicData/Cache/SourceCache.cs b/DynamicData/Cache/SourceCache.cs index f12351c94..d89462d6c 100644 --- a/DynamicData/Cache/SourceCache.cs +++ b/DynamicData/Cache/SourceCache.cs @@ -30,7 +30,7 @@ public SourceCache(Func keySelector) /// /// /// Add, update and remove api via an action method. Enables the consumer to perform queries and updates - /// safely within the innner caches lock. + /// safely within the inner caches lock. /// The result of the action will produce appropriate notifications. /// /// The update action. @@ -58,11 +58,7 @@ public void OnError(Exception exception) /// public IObservable CountChanged => _innerCache.CountChanged; - /// - /// Returns a filtered stream of cache changes preceeded with the initital filtered state - /// - /// The predicate. - /// + /// public IObservable> Connect(Func predicate) { return _innerCache.Connect(predicate); @@ -75,7 +71,7 @@ public IObservable> Connect() } /// - /// Returns an observable of any changes which match the specified key, preceeded with the initital cache state + /// Returns an observable of any changes which match the specified key, preceded with the initial cache state /// /// The key. /// diff --git a/DynamicData/List/ObservableListEx.cs b/DynamicData/List/ObservableListEx.cs index bd77010d9..e210284ba 100644 --- a/DynamicData/List/ObservableListEx.cs +++ b/DynamicData/List/ObservableListEx.cs @@ -225,7 +225,7 @@ public static IObservable> AutoRefresh(t TimeSpan? changeSetBuffer = null, TimeSpan? propertyChangeThrottle = null, IScheduler scheduler = null) - where TObject : INotifyPropertyChanged + where TObject : INotifyPropertyChanged { if (source == null) throw new ArgumentNullException(nameof(source)); if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); diff --git a/DynamicData/List/RangeChange.cs b/DynamicData/List/RangeChange.cs index 0edcd2217..05ac1d591 100644 --- a/DynamicData/List/RangeChange.cs +++ b/DynamicData/List/RangeChange.cs @@ -7,7 +7,7 @@ namespace DynamicData { /// - /// Multipe change container + /// Multiple change container /// /// public sealed class RangeChange : IEnumerable