diff --git a/DynamicData.Tests/Cache/ToSortedCollectionFixture.cs b/DynamicData.Tests/Cache/ToSortedCollectionFixture.cs new file mode 100644 index 000000000..367c12af2 --- /dev/null +++ b/DynamicData.Tests/Cache/ToSortedCollectionFixture.cs @@ -0,0 +1,105 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using DynamicData.Binding; +using DynamicData.Tests.Domain; +using FluentAssertions; +using Microsoft.Reactive.Testing; +using Xunit; + +namespace DynamicData.Tests.Cache +{ + public class ToSortedCollectionFixture : IDisposable + { + private readonly SourceCache _cache; + private readonly List _sortedCollection = new List(); + private readonly List _unsortedCollection = new List(); + private readonly CompositeDisposable _cleanup = new CompositeDisposable(); + + public ToSortedCollectionFixture() + { + _cache = new SourceCache(p => p.Age); + _cache.AddOrUpdate(Enumerable.Range(1, 10).Select(i => new Person("Name" + i, i)).ToArray()); + } + + public void Dispose() + { + _cache.Dispose(); + _cleanup.Dispose(); + } + + [Fact] + public void SortAscending() + { + TestScheduler testScheduler = new TestScheduler(); + + _cleanup.Add(_cache.Connect() + .ObserveOn(testScheduler) + .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .ToCollection() + .Do(persons => + { + _unsortedCollection.Clear(); + _unsortedCollection.AddRange(persons); + }) + .Subscribe()); + + _cleanup.Add(_cache.Connect() + .ObserveOn(testScheduler) + .ToSortedCollection(p => p.Age) + .Do(persons => + { + _sortedCollection.Clear(); + _sortedCollection.AddRange(persons); + }) + .Subscribe()); + + // Insert an item with a lower sort order + _cache.AddOrUpdate(new Person("Name", 0)); + + testScheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); + + _cache.Items.Should().Equal(_unsortedCollection); + _cache.Items.Should().NotEqual(_sortedCollection); + _cache.Items.OrderBy(p => p.Age).Should().Equal(_sortedCollection); + } + + [Fact] + public void SortDescending() + { + TestScheduler testScheduler = new TestScheduler(); + + _cleanup.Add(_cache.Connect() + .ObserveOn(testScheduler) + .Sort(SortExpressionComparer.Ascending(p => p.Age)) + .ToCollection() + .Do(persons => + { + _unsortedCollection.Clear(); + _unsortedCollection.AddRange(persons); + }) + .Subscribe()); + + _cleanup.Add(_cache.Connect() + .ObserveOn(testScheduler) + .ToSortedCollection(p => p.Age, SortDirection.Descending) + .Do(persons => + { + _sortedCollection.Clear(); + _sortedCollection.AddRange(persons); + }) + .Subscribe()); + + // Insert an item with a lower sort order + _cache.AddOrUpdate(new Person("Name", 0)); + + testScheduler.AdvanceBy(TimeSpan.FromSeconds(2).Ticks); + + _cache.Items.Should().Equal(_unsortedCollection); + _cache.Items.Should().NotEqual(_sortedCollection); + _cache.Items.OrderByDescending(p => p.Age).Should().Equal(_sortedCollection); + } + } +} \ No newline at end of file diff --git a/DynamicData/Cache/ObservableCacheEx.cs b/DynamicData/Cache/ObservableCacheEx.cs index 2a791ebb4..768260931 100644 --- a/DynamicData/Cache/ObservableCacheEx.cs +++ b/DynamicData/Cache/ObservableCacheEx.cs @@ -1199,6 +1199,43 @@ public static IObservable> ToCollection new ReadOnlyCollectionLight(query.Items)); } + + /// + /// Converts the changeset into a fully formed sorted collection. Each change in the source results in a new sorted collection + /// + /// The type of the object. + /// The type of the key. + /// The sort key + /// The source. + /// The sort function + /// The sort order. Defaults to ascending + /// + public static IObservable> ToSortedCollection(this IObservable> source, + Func sort, SortDirection sortOrder = SortDirection.Ascending) + { + return source.QueryWhenChanged(query => sortOrder == SortDirection.Ascending + ? new ReadOnlyCollectionLight(query.Items.OrderBy(sort)) + : new ReadOnlyCollectionLight(query.Items.OrderByDescending(sort))); + } + + /// + /// Converts the changeset into a fully formed sorted collection. Each change in the source results in a new sorted collection + /// + /// The type of the object. + /// The type of the key. + /// The source. + /// The sort comparer + /// + public static IObservable> ToSortedCollection(this IObservable> source, + IComparer comparer) + { + return source.QueryWhenChanged(query => + { + var items = query.Items.AsList(); + items.Sort(comparer); + return new ReadOnlyCollectionLight(items); + }); + } #endregion diff --git a/DynamicData/List/ObservableListEx.cs b/DynamicData/List/ObservableListEx.cs index 8d49c6da2..fd3512253 100644 --- a/DynamicData/List/ObservableListEx.cs +++ b/DynamicData/List/ObservableListEx.cs @@ -1,1986 +1,2023 @@ -using System; -using System.Collections.Generic; -using System.Collections.ObjectModel; -using System.ComponentModel; -using System.Linq; -using System.Linq.Expressions; -using System.Reactive; -using System.Reactive.Concurrency; -using System.Reactive.Disposables; -using System.Reactive.Linq; -using System.Reactive.Subjects; -using System.Threading.Tasks; -using DynamicData.Annotations; -using DynamicData.Binding; -using DynamicData.Cache.Internal; -using DynamicData.Kernel; -using DynamicData.List.Internal; -using DynamicData.List.Linq; - -// ReSharper disable once CheckNamespace - -namespace DynamicData -{ - /// - /// Extensions for ObservableList - /// - public static class ObservableListEx - { - #region Populate change set from standard rx observable - - /// - /// Converts the observable to an observable changeset. - /// - /// The type of the object. - /// The source. - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable source, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - - return ToObservableChangeSet(source, null, -1, scheduler); - } - - /// - /// Converts the observable to an observable changeset, allowing time expiry to be specified - /// - /// The type of the object. - /// The source. - /// Specify on a per object level the maximum time before an object expires from a cache - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable source, - Func expireAfter, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (expireAfter == null) throw new ArgumentNullException(nameof(expireAfter)); - - return ToObservableChangeSet(source, expireAfter, -1, scheduler); - } - - /// - /// Converts the observable to an observable changeset, with a specified limit of how large the list can be. - /// - /// The type of the object. - /// The source. - /// Remove the oldest items when the size has reached this limit - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable source, - int limitSizeTo, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return ToObservableChangeSet(source, null, limitSizeTo, scheduler); - } - - /// - /// Converts the observable to an observable changeset, allowing size and time limit to be specified - /// - /// The type of the object. - /// The source. - /// Specify on a per object level the maximum time before an object expires from a cache - /// Remove the oldest items when the size has reached this limit - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable source, - Func expireAfter, - int limitSizeTo, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new ToObservableChangeSet(source, expireAfter, limitSizeTo, scheduler).Run(); - } - - /// - /// Converts the observable to an observable changeset. - /// - /// The type of the object. - /// The source. - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable> source, - IScheduler scheduler = null) - { - return ToObservableChangeSet(source, null, -1, scheduler); - } - - /// - /// Converts the observable to an observable changeset, allowing size and time limit to be specified - /// - /// The type of the object. - /// The source. - /// Remove the oldest items when the size has reached this limit - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable> source, - int limitSizeTo, - IScheduler scheduler = null) - { - return ToObservableChangeSet(source, null, limitSizeTo, scheduler); - } - - /// - /// Converts the observable to an observable changeset, allowing size to be specified - /// - /// The type of the object. - /// The source. - /// Specify on a per object level the maximum time before an object expires from a cache - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable> source, - Func expireAfter, - IScheduler scheduler = null) - { - return ToObservableChangeSet(source, expireAfter, 0, scheduler); - } - - /// - /// Converts the observable to an observable changeset, allowing size and time limit to be specified - /// - /// The type of the object. - /// The source. - /// Specify on a per object level the maximum time before an object expires from a cache - /// Remove the oldest items when the size has reached this limit - /// The scheduler (only used for time expiry). - /// - /// source - /// or - /// keySelector - public static IObservable> ToObservableChangeSet(this IObservable> source, - Func expireAfter, - int limitSizeTo, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new ToObservableChangeSet(source, expireAfter, limitSizeTo, scheduler).Run(); - } - - #endregion - - #region Auto Refresh - - /// - /// Automatically refresh downstream operators when any property changes. - /// - /// The source observable - /// Batch up changes by specifying the buffer. This greatly increases performance when many elements have sucessive property changes - /// When observing on multiple property changes, apply a throttle to prevent excessive refesh invocations - /// The scheduler - /// An observable change set with additional refresh changes - public static IObservable> AutoRefresh(this IObservable> source, - TimeSpan? changeSetBuffer = null, - TimeSpan? propertyChangeThrottle = null, - IScheduler scheduler = null) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - - return source.AutoRefreshOnObservable(t => - { - if (propertyChangeThrottle == null) - return t.WhenAnyPropertyChanged(); - - return t.WhenAnyPropertyChanged() - .Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default); - - }, changeSetBuffer, scheduler); - } - - /// - /// Automatically refresh downstream operators when properties change. - /// - /// The source observable - /// Specify a property to observe changes. When it changes a Refresh is invoked - /// Batch up changes by specifying the buffer. This greatly increases performance when many elements have sucessive property changes - /// When observing on multiple property changes, apply a throttle to prevent excessive refesh invocations - /// The scheduler - /// An observable change set with additional refresh changes - public static IObservable> AutoRefresh(this IObservable> source, - Expression> propertyAccessor, - TimeSpan? changeSetBuffer = null, - TimeSpan? propertyChangeThrottle = null, - IScheduler scheduler = null) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); - - return source.AutoRefreshOnObservable(t => - { - if (propertyChangeThrottle == null) - return t.WhenPropertyChanged(propertyAccessor, false); - - return t.WhenPropertyChanged(propertyAccessor,false) - .Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default); - - }, changeSetBuffer, scheduler); - } - - /// - /// Automatically refresh downstream operator. The refresh is triggered when the observable receives a notification - /// - /// The source observable change set - /// An observable which acts on items within the collection and produces a value when the item should be refreshed - /// Batch up changes by specifying the buffer. This greatly increases performance when many elements require a refresh - /// The scheduler - /// An observable change set with additional refresh changes - public static IObservable> AutoRefreshOnObservable(this IObservable> source, - Func> reevaluator, - TimeSpan? changeSetBuffer = null, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (reevaluator == null) throw new ArgumentNullException(nameof(reevaluator)); - return new AutoRefresh(source, reevaluator, changeSetBuffer, scheduler).Run(); - } - - - /// - /// Supress refresh notifications - /// - /// The source observable change set - /// - public static IObservable> SupressRefresh(this IObservable> source) - { - return source.WhereReasonsAreNot(ListChangeReason.Refresh); - } - - - #endregion - - #region Conversion - - /// - /// Removes the index from all changes. - /// - /// NB: This operator has been introduced as a temporary fix for creating an Or operator using merge many. - /// - /// The type of the object. - /// The source. - /// - /// - public static IObservable> RemoveIndex([NotNull] this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.Select(changes => new ChangeSet(changes.YieldWithoutIndex())); - } - - /// - /// Adds a key to the change set result which enables all observable cache features of dynamic data - /// - /// - /// All indexed changes are dropped i.e. sorting is not supported by this function - /// - /// The type of object. - /// The type of key. - /// The source. - /// The key selector. - /// - /// - /// - public static IObservable> AddKey( - [NotNull] this IObservable> source, [NotNull] Func keySelector) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); - return source.Select(changes => new ChangeSet(new AddKeyEnumerator(changes, keySelector))); - } - - /// - /// Convert the object using the sepcified conversion function. - /// - /// This is a lighter equivalent of Transform and is designed to be used with non-disposable objects - /// - /// The type of the object. - /// The type of the destination. - /// The source. - /// The conversion factory. - /// - /// - /// - [Obsolete("Prefer Cast as it is does the same thing but is semantically correct")] - public static IObservable> Convert( - [NotNull] this IObservable> source, - [NotNull] Func conversionFactory) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (conversionFactory == null) throw new ArgumentNullException(nameof(conversionFactory)); - return source.Select(changes => changes.Transform(conversionFactory)); - } - - - - /// - /// Cast the underlying type of an object. Use before a Cast function - /// - /// - /// The source. - /// - public static IObservable> CastToObject(this IObservable> source) - { - return source.Select(changes => - { - var items = changes.Transform(t => (object)t); - return new ChangeSet(items); - }); - } - - /// - /// Cast the changes to another form - /// - /// The type of the destination. - /// The source. - /// - /// - /// - public static IObservable> Cast([NotNull] this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.Select(changes => changes.Transform(t=>(TDestination)t)); - } - - /// - /// Cast the changes to another form - /// - /// Alas, I had to add the converter due to type inference issues. The converter can be avoided by CastToObject() first - /// - /// The type of the object. - /// The type of the destination. - /// The source. - /// The conversion factory. - /// - /// - /// - public static IObservable> Cast([NotNull] this IObservable> source, - [NotNull] Func conversionFactory) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (conversionFactory == null) throw new ArgumentNullException(nameof(conversionFactory)); - return source.Select(changes => changes.Transform(conversionFactory)); - } - - #endregion - - #region Binding - - /// - /// Binds a clone of the observable changeset to the target observable collection - /// - /// - /// The source. - /// The target collection. - /// The reset threshold. - /// - /// - /// source - /// or - /// targetCollection - /// - public static IObservable> Bind([NotNull] this IObservable> source, - [NotNull] IObservableCollection targetCollection, int resetThreshold = 25) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (targetCollection == null) throw new ArgumentNullException(nameof(targetCollection)); - - var adaptor = new ObservableCollectionAdaptor(targetCollection, resetThreshold); - return source.Adapt(adaptor); - } - - /// - /// Creates a binding to a readonly observable collection which is specified as an 'out' parameter - /// - /// - /// The source. - /// The resulting read only observable collection. - /// The reset threshold. - /// A continuation of the source stream - /// - /// - public static IObservable> Bind([NotNull] this IObservable> source, - out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = 25) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - - var target = new ObservableCollectionExtended(); - var result = new ReadOnlyObservableCollection(target); - var adaptor = new ObservableCollectionAdaptor(target, resetThreshold); - readOnlyObservableCollection = result; - return source.Adapt(adaptor); - } - -#if SUPPORTS_BINDINGLIST - - /// - /// Binds a clone of the observable changeset to the target observable collection - /// - /// - /// The source. - /// The target binding list - /// The reset threshold. - /// - /// source - /// or - /// targetCollection - /// - public static IObservable> Bind([NotNull] this IObservable> source, - [NotNull] BindingList bindingList, int resetThreshold = 25) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (bindingList == null) throw new ArgumentNullException(nameof(bindingList)); - - return source.Adapt(new BindingListAdaptor(bindingList, resetThreshold)); - } - -#endif - - - /// - /// Injects a side effect into a changeset observable - /// - /// - /// The source. - /// The adaptor. - /// - /// - /// source - /// or - /// adaptor - /// - public static IObservable> Adapt([NotNull] this IObservable> source, - [NotNull] IChangeSetAdaptor adaptor) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (adaptor == null) throw new ArgumentNullException(nameof(adaptor)); - - - return Observable.Create>(observer => - { - var locker = new object(); - return source - .Synchronize(locker) - .Select(changes => - { - adaptor.Adapt(changes); - return changes; - }).SubscribeSafe(observer); - }); - } - - #endregion - - #region Populate into an observable list - - /// - /// list. - /// - /// The type of the object. - /// The source. - /// The destination. - /// - /// - /// source - /// or - /// destination - /// - /// source - /// or - /// destination - public static IDisposable PopulateInto([NotNull] this IObservable> source, - [NotNull] ISourceList destination) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (destination == null) throw new ArgumentNullException(nameof(destination)); - - return source.Subscribe(changes => destination.Edit(updater => updater.Clone(changes))); - } - - /// - /// Converts the source list to an read only observable list - /// - /// - /// The source. - /// - /// source - public static IObservableList AsObservableList([NotNull] this ISourceList source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new AnonymousObservableList(source); - } - - /// - /// Converts the source observable to an read only observable list - /// - /// - /// The source. - /// - /// source - public static IObservableList AsObservableList([NotNull] this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new AnonymousObservableList(source); - } - - /// - /// List equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber. - /// - /// - /// The source. - /// - /// - public static IObservable> RefCount([NotNull] this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new RefCount(source).Run(); - } - - #endregion - - #region Core List Operators - - /// - /// Filters the source using the specified valueSelector - /// - /// - /// The source. - /// The valueSelector. - /// - /// source - public static IObservable> Filter(this IObservable> source, Func predicate) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (predicate == null) throw new ArgumentNullException(nameof(predicate)); - return new Filter(source, predicate).Run(); - } - - /// - /// Filters source using the specified filter observable predicate. - /// - /// - /// The source. - /// - /// Should the filter clear and replace, or calculate a diff-set - /// - /// source - /// or - /// filterController - public static IObservable> Filter([NotNull] this IObservable> source, [NotNull] IObservable> predicate, ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (predicate == null) throw new ArgumentNullException(nameof(predicate)); - - return new Filter(source, predicate, filterPolicy).Run(); - } - - - /// - /// Filters source on the specified property using the specified predicate. - /// - /// The filter will automatically reapply when a property changes - /// - /// The type of the object. - /// The type of the property. - /// The source. - /// The property selector. When the property changes the filter specified will be re-evaluated - /// A predicate based on the object which contains the changed property - /// The property changed throttle. - /// The scheduler used when throttling - /// - /// - /// - [Obsolete("Use AutoRefresh(), followed by Filter() instead")] - public static IObservable> FilterOnProperty(this IObservable> source, - Expression> propertySelector, - Func predicate, - TimeSpan? propertyChangedThrottle = null, - IScheduler scheduler = null) where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (propertySelector == null) throw new ArgumentNullException(nameof(propertySelector)); - if (predicate == null) throw new ArgumentNullException(nameof(predicate)); - return new FilterOnProperty(source, propertySelector, predicate, propertyChangedThrottle, scheduler).Run(); - } - - - /// - /// Reverse sort of the changset - /// - /// - /// The source. - /// - /// - /// source - /// or - /// comparer - /// - public static IObservable> Reverse(this IObservable> source) - { - var reverser = new Reverser(); - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.Select(changes => new ChangeSet(reverser.Reverse(changes))); - } - - /// - /// Projects each update item to a new form using the specified transform function - /// - /// The type of the source. - /// The type of the destination. - /// The source. - /// The transform factory. - /// Should a new transform be applied when a refresh event is received - /// - /// - /// source - /// or - /// valueSelector - /// - public static IObservable> Transform(this IObservable> source, - Func transformFactory, - bool transformOnRefresh = false) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); - return source.Transform((t, previous, idx) => transformFactory(t), transformOnRefresh); - } - - /// - /// Projects each update item to a new form using the specified transform function - /// - /// The type of the source. - /// The type of the destination. - /// The source. - /// The transform fuunction - /// Should a new transform be applied when a refresh event is received - /// A an observable changeset of the transformed object - /// - /// source - /// or - /// valueSelector - /// - public static IObservable> Transform(this IObservable> source, - Func transformFactory, - bool transformOnRefresh = false) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); - - return source.Transform((t, previous, idx) => transformFactory(t,idx),transformOnRefresh); - } - - /// - /// Projects each update item to a new form using the specified transform function. - /// - /// *** Annoyingly when using this overload you will have to explicitly specify the generic type arguments as type inference fails - /// - /// The type of the source. - /// The type of the destination. - /// The source. - /// The transform function - /// Should a new transform be applied when a refresh event is received - /// A an observable changeset of the transformed object - /// - /// source - /// or - /// valueSelector - /// - public static IObservable> Transform(this IObservable> source, - Func, TDestination> transformFactory, - bool transformOnRefresh = false) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); - - return source.Transform((t, previous, idx) => transformFactory(t, previous), transformOnRefresh); - } - - /// - /// Projects each update item to a new form using the specified transform function - /// - /// *** Annoyingly when using this overload you will have to explicy specify the generic type arguments as type inference fails - /// - /// The type of the source. - /// The type of the destination. - /// The source. - /// The transform factory. - /// Should a new transform be applied when a refresh event is received - /// A an observable changeset of the transformed object - /// - /// source - /// or - /// valueSelector - /// - public static IObservable> Transform(this IObservable> source, - Func, int, TDestination> transformFactory, bool transformOnRefresh = false) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); - - return new Transformer(source, transformFactory, transformOnRefresh).Run(); - } - - - /// - /// Projects each update item to a new form using the specified transform function - /// - /// The type of the source. - /// The type of the destination. - /// The source. - /// The transform factory. - /// A an observable changeset of the transformed object - /// - /// source - /// or - /// valueSelector - /// - public static IObservable> TransformAsync( - this IObservable> source, Func> transformFactory) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); - - return new TransformAsync(source, transformFactory).Run(); - } - - /// - /// Equivalent to a select many transform. To work, the key must individually identify each child. - /// - /// The type of the destination. - /// The type of the source. - /// The source. - /// The manyselector. - /// Used when an item has been replaced to determine whether child items are the same as previous children - /// - /// - /// source - /// or - /// manyselector - /// - public static IObservable> TransformMany( [NotNull] this IObservable> source, - [NotNull] Func> manyselector, - IEqualityComparer equalityComparer = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (manyselector == null) throw new ArgumentNullException(nameof(manyselector)); - return new TransformMany(source, manyselector, equalityComparer).Run(); - } - - /// - /// Flatten the nested observable collection, and observe subsequentl observable collection changes - /// - /// The type of the destination. - /// The type of the source. - /// The source. - /// The manyselector. - /// Used when an item has been replaced to determine whether child items are the same as previous children - public static IObservable> TransformMany( this IObservable> source, - Func> manyselector, - IEqualityComparer equalityComparer = null) - { - return new TransformMany(source,manyselector, equalityComparer).Run(); - } - - /// - /// Flatten the nested observable collection, and observe subsequentl observable collection changes - /// - /// The type of the destination. - /// The type of the source. - /// The source. - /// The manyselector. - /// Used when an item has been replaced to determine whether child items are the same as previous children - public static IObservable> TransformMany(this IObservable> source, - Func> manyselector, - IEqualityComparer equalityComparer = null) - { - return new TransformMany(source, manyselector, equalityComparer).Run(); - } - - /// - /// Flatten the nested observable list, and observe subsequent observable collection changes - /// - /// The type of the destination. - /// The type of the source. - /// The source. - /// The manyselector. - /// Used when an item has been replaced to determine whether child items are the same as previous children - public static IObservable> TransformMany(this IObservable> source, - Func> manyselector, - IEqualityComparer equalityComparer = null) - { - return new TransformMany(source, manyselector, equalityComparer).Run(); - } - - /// - /// Selects distinct values from the source, using the specified value selector - /// - /// The type of the source. - /// The type of the destination. - /// The source. - /// The transform factory. - /// - /// - /// source - /// or - /// valueSelector - /// - public static IObservable> DistinctValues( - this IObservable> source, - Func valueSelector) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); - return new Distinct(source, valueSelector).Run(); - } - - /// - /// Groups the source on the value returned by group selector factory. The groupings contains an inner observable list. - /// - /// The type of the object. - /// The type of the group. - /// The source. - /// The group selector. - /// Force the grouping function to recalculate the group value. - /// For example if you have a time based grouping with values like `Last Minute', 'Last Hour', 'Today' etc regrouper is used to refresh these groupings - /// - /// - /// source - /// or - /// groupSelector - /// - public static IObservable>> GroupOn( - this IObservable> source, Func groupSelector, - IObservable regrouper = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (groupSelector == null) throw new ArgumentNullException(nameof(groupSelector)); - return new GroupOn(source, groupSelector, regrouper).Run(); - } - - /// - /// Groups the source on the value returned by group selector factory. Each update produces immuatable grouping. - /// - /// The type of the object. - /// The type of the group key. - /// The source. - /// The group selector key. - /// Force the grouping function to recalculate the group value. - /// For example if you have a time based grouping with values like `Last Minute', 'Last Hour', 'Today' etc regrouper is used to refresh these groupings - /// - /// - /// - /// source - /// or - /// groupSelectorKey - /// - public static IObservable>> GroupWithImmutableState - (this IObservable> source, - Func groupSelectorKey, - IObservable regrouper = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (groupSelectorKey == null) throw new ArgumentNullException(nameof(groupSelectorKey)); - - return new GroupOnImmutable(source, groupSelectorKey, regrouper).Run(); - } - - - /// - /// Groups the source using the property specified by the property selector. The resulting groupings contains an inner observable list. - /// Groups are re-applied when the property value changed. - /// When there are likely to be a large number of group property changes specify a throttle to improve performance - /// - /// The type of the object. - /// The type of the group. - /// The source. - /// The property selector used to group the items - /// The property changed throttle. - /// The scheduler. - /// - /// - /// - public static IObservable>> GroupOnProperty( - this IObservable> source, - Expression> propertySelector, - TimeSpan? propertyChangedThrottle = null, - IScheduler scheduler = null) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (propertySelector == null) throw new ArgumentNullException(nameof(propertySelector)); - return - new GroupOnProperty(source, propertySelector, propertyChangedThrottle, scheduler).Run(); - } - - /// - /// Groups the source using the property specified by the property selector. The resulting groupings are immutable. - /// Groups are re-applied when the property value changed. - /// When there are likely to be a large number of group property changes specify a throttle to improve performance - /// - /// The type of the object. - /// The type of the group. - /// The source. - /// The property selector used to group the items - /// The property changed throttle. - /// The scheduler. - /// - /// - /// - public static IObservable>> GroupOnPropertyWithImmutableState(this IObservable> source, - Expression> propertySelector, - TimeSpan? propertyChangedThrottle = null, - IScheduler scheduler = null) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (propertySelector == null) throw new ArgumentNullException(nameof(propertySelector)); - return new GroupOnPropertyWithImmutableState(source, propertySelector, propertyChangedThrottle, scheduler).Run(); - } - - /// - /// Prevents an empty notification - /// - /// - /// The source. - /// - /// source - public static IObservable> NotEmpty(this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.Where(s => s.Count != 0); - } - - /// - /// Clones the target list as a side effect of the stream - /// - /// - /// The source. - /// - /// - /// source - public static IObservable> Clone(this IObservable> source, IList target) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.Do(target.Clone); - } - - #endregion - - #region Sort - - /// - /// Sorts the sequence using the specified comparer. - /// - /// - /// The source. - /// The comparer used for sorting - /// For improved performance, specify SortOptions.UseBinarySearch. This can only be used when the values which are sorted on are immutable - /// Since sorting can be slow for large record sets, the reset threshold is used to force the list re-ordered - /// OnNext of this observable causes data to resort. This is required when the value which is sorted on mutable - /// An observable comparer used to change the comparer on which the sorted list i - /// - /// source - /// or - /// comparer - public static IObservable> Sort(this IObservable> source, - IComparer comparer, - SortOptions options = SortOptions.None, - IObservable resort = null, - IObservable> comparerChanged = null, - int resetThreshold = 50) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (comparer == null) throw new ArgumentNullException(nameof(comparer)); - - return new Sort(source, comparer, options, resort, comparerChanged, resetThreshold).Run(); - } - - /// - /// Sorts the sequence using the specified observable comparer. - /// - /// - /// The source. - /// For improved performance, specify SortOptions.UseBinarySearch. This can only be used when the values which are sorted on are immutable - /// Since sorting can be slow for large record sets, the reset threshold is used to force the list re-ordered - /// OnNext of this observable causes data to resort. This is required when the value which is sorted on mutable - /// An observable comparer used to change the comparer on which the sorted list i - /// - /// source - /// or - /// comparer - public static IObservable> Sort(this IObservable> source, - IObservable> comparerChanged, - SortOptions options = SortOptions.None, - IObservable resort = null, - int resetThreshold = 50) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (comparerChanged == null) throw new ArgumentNullException(nameof(comparerChanged)); - - return new Sort(source, null, options, resort, comparerChanged, resetThreshold).Run(); - } - - #endregion - - #region Item operators - - /// - /// Provides a call back for each item change. - /// - /// The type of the object. - /// The source. - /// The action. - /// - /// - /// - public static IObservable> ForEachChange( - [NotNull] this IObservable> source, - [NotNull] Action> action) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (action == null) throw new ArgumentNullException(nameof(action)); - return source.Do(changes => changes.ForEach(action)); - } - - /// - /// Provides a call back for each item change. - /// - /// Range changes are flattened, so there is only need to check for Add, Replace, Remove and Clear - /// - /// The type of the object. - /// The source. - /// The action. - /// - /// - /// - public static IObservable> ForEachItemChange( - [NotNull] this IObservable> source, - [NotNull] Action> action) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (action == null) throw new ArgumentNullException(nameof(action)); - return source.Do(changes => changes.Flatten().ForEach(action)); - } - - /// - /// Dynamically merges the observable which is selected from each item in the stream, and unmerges the item - /// when it is no longer part of the stream. - /// - /// The type of the object. - /// The type of the destination. - /// The source. - /// The observable selector. - /// - /// source - /// or - /// observableSelector - public static IObservable MergeMany( - [NotNull] this IObservable> source, - [NotNull] Func> observableSelector) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector)); - - return new MergeMany(source, observableSelector).Run(); - } - - /// - /// Watches each item in the collection and notifies when any of them has changed - /// - /// - /// The type of the value. - /// The source. - /// The property accessor. - /// if set to true [notify on initial value]. - /// - /// - /// - public static IObservable WhenValueChanged( - [NotNull] this IObservable> source, - [NotNull] Expression> propertyAccessor, - bool notifyOnInitialValue = true) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); - - var factory = propertyAccessor.GetFactory(); - return source.MergeMany(t => factory(t, notifyOnInitialValue).Select(pv=>pv.Value)); - } - - /// - /// Watches each item in the collection and notifies when any of them has changed - /// - /// - /// The type of the value. - /// The source. - /// The property accessor. - /// if set to true [notify on initial value]. - /// - /// - /// - public static IObservable> WhenPropertyChanged( - [NotNull] this IObservable> source, - [NotNull] Expression> propertyAccessor, - bool notifyOnInitialValue = true) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); - - var factory = propertyAccessor.GetFactory(); - return source.MergeMany(t => factory(t, notifyOnInitialValue)); - } - - /// - /// Watches each item in the collection and notifies when any of them has changed - /// - /// The type of the object. - /// The source. - /// specify properties to Monitor, or omit to monitor all property changes - /// - /// - /// - public static IObservable WhenAnyPropertyChanged([NotNull] this IObservable> source, params string[] propertiesToMonitor) - where TObject : INotifyPropertyChanged - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.MergeMany(t => t.WhenAnyPropertyChanged(propertiesToMonitor)); - } - - /// - /// Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed - /// - /// The type of the object. - /// The source. - /// The subsription function - /// - /// source - /// or - /// subscriptionFactory - /// - /// Subscribes to each item when it is added or updates and unsubcribes when it is removed - /// - public static IObservable> SubscribeMany(this IObservable> source, - Func subscriptionFactory) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (subscriptionFactory == null) throw new ArgumentNullException(nameof(subscriptionFactory)); - return new SubscribeMany(source, subscriptionFactory).Run(); - } - - /// - /// Disposes each item when no longer required. - /// - /// Individual items are disposed when removed or replaced. All items - /// are disposed when the stream is disposed - /// - /// - /// - /// The type of the object. - /// The source. - /// A continuation of the original stream - /// source - public static IObservable> DisposeMany(this IObservable> source) - { - return source.OnItemRemoved(t => - { - var d = t as IDisposable; - d?.Dispose(); - }); - } - - /// - /// Callback for each item as and when it is being removed from the stream - /// - /// The type of the object. - /// The source. - /// The remove action. - /// - /// - /// source - /// or - /// removeAction - /// - public static IObservable> OnItemRemoved(this IObservable> source, - Action removeAction) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (removeAction == null) throw new ArgumentNullException(nameof(removeAction)); - - return new OnBeingRemoved(source, removeAction).Run(); - } - - /// - /// Callback for each item as and when it is being added to the stream - /// - /// - /// The source. - /// The add action. - /// - public static IObservable> OnItemAdded(this IObservable> source, - Action addAction) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (addAction == null) throw new ArgumentNullException(nameof(addAction)); - return new OnBeingAdded(source, addAction).Run(); - } - - #endregion - - #region Reason filtering - - /// - /// Includes changes for the specified reasons only - /// - /// - /// The source. - /// The reasons. - /// - /// Must enter at least 1 reason - public static IObservable> WhereReasonsAre(this IObservable> source, - params ListChangeReason[] reasons) - { - if (reasons.Length == 0) - throw new ArgumentException("Must enter at least 1 reason", nameof(reasons)); - - var matches = new HashSet(reasons); - return source.Select(changes => - { - var filtered = changes.Where(change => matches.Contains(change.Reason)).YieldWithoutIndex(); - return new ChangeSet(filtered); - }).NotEmpty(); - } - - /// - /// Excludes updates for the specified reasons - /// - /// - /// The source. - /// The reasons. - /// - /// Must enter at least 1 reason - public static IObservable> WhereReasonsAreNot(this IObservable> source, - params ListChangeReason[] reasons) - { - if (reasons.Length == 0) - throw new ArgumentException("Must enter at least 1 reason", nameof(reasons)); - - var matches = new HashSet(reasons); - return source.Select(updates => - { - var filtered = updates.Where(u => !matches.Contains(u.Reason)).YieldWithoutIndex(); - return new ChangeSet(filtered); - }).NotEmpty(); - } - - #endregion - - #region Buffering - - /// - /// Buffers changes for an intial period only. After the period has elapsed, not further buffering occurs. - /// - /// The source changeset - /// The period to buffer, measure from the time that the first item arrives - /// The scheduler to buffer on - public static IObservable> BufferInitial(this IObservable> source, TimeSpan initalBuffer, IScheduler scheduler = null) - { - return source.DeferUntilLoaded().Publish(shared => - { - var initial = shared.Buffer(initalBuffer, scheduler ?? Scheduler.Default) - .FlattenBufferResult() - .Take(1); - - return initial.Concat(shared); - }); - } - - /// - /// Convert the result of a buffer operation to a change set - /// - /// - /// The source. - /// - public static IObservable> FlattenBufferResult(this IObservable>> source) - { - return source - .Where(x => x.Count != 0) - .Select(updates => new ChangeSet(updates.SelectMany(u => u))); - } - - /// - /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. - /// When a resume signal has been received the batched updates will be fired. - /// - /// The type of the object. - /// The source. - /// When true, observable begins to buffer and when false, window closes and buffered result if notified - /// The scheduler. - /// - /// source - public static IObservable> BufferIf([NotNull] this IObservable> source, - [NotNull] IObservable pauseIfTrueSelector, - IScheduler scheduler = null) - { - return BufferIf(source, pauseIfTrueSelector, false, scheduler); - } - - /// - /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. - /// When a resume signal has been received the batched updates will be fired. - /// - /// The type of the object. - /// The source. - /// When true, observable begins to buffer and when false, window closes and buffered result if notified - /// if set to true [intial pause state]. - /// The scheduler. - /// - /// source - public static IObservable> BufferIf([NotNull] this IObservable> source, - [NotNull] IObservable pauseIfTrueSelector, - bool intialPauseState = false, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (pauseIfTrueSelector == null) throw new ArgumentNullException(nameof(pauseIfTrueSelector)); - return BufferIf(source, pauseIfTrueSelector, intialPauseState, null, scheduler); - } - - /// - /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. - /// When a resume signal has been received the batched updates will be fired. - /// - /// The type of the object. - /// The source. - /// When true, observable begins to buffer and when false, window closes and buffered result if notified - /// Specify a time to ensure the buffer window does not stay open for too long - /// The scheduler. - /// - /// source - public static IObservable> BufferIf(this IObservable> source, - IObservable pauseIfTrueSelector, - TimeSpan? timeOut = null, - IScheduler scheduler = null) - { - return BufferIf(source, pauseIfTrueSelector, false, timeOut, scheduler); - } - - /// - /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. - /// When a resume signal has been received the batched updates will be fired. - /// - /// The type of the object. - /// The source. - /// When true, observable begins to buffer and when false, window closes and buffered result if notified - /// if set to true [intial pause state]. - /// Specify a time to ensure the buffer window does not stay open for too long - /// The scheduler. - /// - /// source - public static IObservable> BufferIf(this IObservable> source, - IObservable pauseIfTrueSelector, - bool intialPauseState = false, - TimeSpan? timeOut = null, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (pauseIfTrueSelector == null) throw new ArgumentNullException(nameof(pauseIfTrueSelector)); - return new BufferIf(source, pauseIfTrueSelector, intialPauseState, timeOut, scheduler).Run(); - } - - /// - /// The latest copy of the cache is exposed for querying after each modification to the underlying data - /// - /// The type of the object. - /// The type of the destination. - /// The source. - /// The result selector. - /// - /// - /// source - /// or - /// resultSelector - /// - public static IObservable QueryWhenChanged( - this IObservable> source, - Func, TDestination> resultSelector) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); - - return source.QueryWhenChanged().Select(resultSelector); - } - - /// - /// The latest copy of the cache is exposed for querying i) after each modification to the underlying data ii) upon subscription - /// - /// The type of the object. - /// The source. - /// - /// source - public static IObservable> QueryWhenChanged([NotNull] this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new QueryWhenChanged(source).Run(); - } - - /// - /// Converts the changeset into a fully formed collection. Each change in the source results in a new collection - /// - /// The type of the object. - /// The source. - /// - public static IObservable> ToCollection(this IObservable> source) - { - return source.QueryWhenChanged(items => items); - } - - /// - /// Defer the subscribtion until loaded and skip initial changeset - /// - /// The type of the object. - /// The source. - /// - /// source - public static IObservable> SkipInitial(this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.DeferUntilLoaded().Skip(1); - } - - /// - /// Defer the subscription until the stream has been inflated with data - /// - /// The type of the object. - /// The source. - /// - public static IObservable> DeferUntilLoaded([NotNull] this IObservable> source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return new DeferUntilLoaded(source).Run(); - } - - /// - /// Defer the subscription until the cache has been inflated with data - /// - /// The type of the object. - /// The source. - /// - public static IObservable> DeferUntilLoaded(this IObservableList source) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - return source.Connect().DeferUntilLoaded(); - } - - #endregion - - #region Virtualisation / Paging - - /// - /// Virtualises the source using parameters provided via the requests observable - /// - /// - /// The source. - /// The requests. - /// - public static IObservable> Virtualise([NotNull] this IObservable> source, - [NotNull] IObservable requests) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (requests == null) throw new ArgumentNullException(nameof(requests)); - return new Virtualiser(source, requests).Run(); - } - - /// - /// Limits the size of the result set to the specified number of items - /// - /// - /// The source. - /// The number of items. - /// - public static IObservable> Top([NotNull] this IObservable> source, - int numberOfItems) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (numberOfItems <= 0) - throw new ArgumentOutOfRangeException(nameof(numberOfItems), - "Number of items should be greater than zero"); - - return source.Virtualise(Observable.Return(new VirtualRequest(0, numberOfItems))); - } - - - /// - /// Applies paging to the the data source - /// - /// - /// The source. - /// Observable to control page requests - /// - public static IObservable> Page([NotNull] this IObservable> source, - [NotNull] IObservable requests) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (requests == null) throw new ArgumentNullException(nameof(requests)); - return new Pager(source, requests).Run(); - } - - #endregion - - #region Expiry / size limiter - - /// - /// Limits the size of the source cache to the specified limit. - /// Notifies which items have been removed from the source list. - /// - /// - /// The source. - /// The size limit. - /// The scheduler. - /// - /// sizeLimit cannot be zero - /// source - /// sizeLimit cannot be zero - public static IObservable> LimitSizeTo([NotNull] this ISourceList source, int sizeLimit, - IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (sizeLimit <= 0) throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit)); - - var locker = new object(); - var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? Scheduler.Default, locker); - - return limiter.Run().Synchronize(locker).Do(source.RemoveMany); - } - - /// - /// Removes items from the cache according to the value specified by the time selector function - /// - /// - /// The source. - /// Selector returning when to expire the item. Return null for non-expiring item - /// The scheduler - /// - /// - /// - public static IObservable> ExpireAfter([NotNull] this ISourceList source, - [NotNull] Func timeSelector, IScheduler scheduler = null) - { - return source.ExpireAfter(timeSelector, null, scheduler); - } - - /// - /// Removes items from the cache according to the value specified by the time selector function - /// - /// - /// The source. - /// Selector returning when to expire the item. Return null for non-expiring item - /// Enter the polling interval to optimise expiry timers, if ommited 1 timer is created for each unique expiry time - /// The scheduler - /// - /// - /// - public static IObservable> ExpireAfter([NotNull] this ISourceList source, - [NotNull] Func timeSelector, TimeSpan? pollingInterval = null, IScheduler scheduler = null) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); - - var locker = new object(); - var limiter = new ExpireAfter(source, timeSelector, pollingInterval, scheduler ?? Scheduler.Default, - locker); - - return limiter.Run().Synchronize(locker).Do(source.RemoveMany); - } - - #endregion - - #region Logical collection operators - - /// - /// Apply a logical Or operator between the collections. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Or([NotNull] this ICollection>> sources) - { - return sources.Combine(CombineOperator.Or); - } - - /// - /// Apply a logical Or operator between the collections. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// The others. - /// - public static IObservable> Or([NotNull] this IObservable> source, - params IObservable>[] others) - { - return source.Combine(CombineOperator.Or, others); - } - - /// - /// Dynamically apply a logical Or operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Or( - [NotNull] this IObservableList>> sources) - { - return sources.Combine(CombineOperator.Or); - } - - /// - /// Dynamically apply a logical Or operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Or([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.Or); - } - - /// - /// Dynamically apply a logical Or operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Or([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.Or); - } - - /// - /// Apply a logical Xor operator between the collections. - /// Items which are only in one of the sources are included in the result - /// - /// - /// The source. - /// The others. - /// - public static IObservable> Xor([NotNull] this IObservable> source, - params IObservable>[] others) - { - return source.Combine(CombineOperator.Xor, others); - } - - /// - /// Apply a logical Xor operator between the collections. - /// Items which are only in one of the sources are included in the result - /// - /// - /// The sources. - /// > - public static IObservable> Xor([NotNull] this ICollection>> sources) - { - return sources.Combine(CombineOperator.Xor); - } - - /// - /// Dynamically apply a logical Xor operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Xor( - [NotNull] this IObservableList>> sources) - { - return sources.Combine(CombineOperator.Xor); - } - - /// - /// Dynamically apply a logical Xor operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Xor([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.Xor); - } - - /// - /// Dynamically apply a logical Xor operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> Xor([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.Xor); - } - - /// - /// Apply a logical And operator between the collections. - /// Items which are in all of the sources are included in the result - /// - /// - /// The source. - /// The others. - /// - public static IObservable> And([NotNull] this IObservable> source, - params IObservable>[] others) - { - return source.Combine(CombineOperator.And, others); - } - - /// - /// Apply a logical And operator between the collections. - /// Items which are in all of the sources are included in the result - /// - /// - /// The sources. - /// > - public static IObservable> And([NotNull] this ICollection>> sources) - { - return sources.Combine(CombineOperator.And); - } - - /// - /// Dynamically apply a logical And operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> And( - [NotNull] this IObservableList>> sources) - { - return sources.Combine(CombineOperator.And); - } - - /// - /// Dynamically apply a logical And operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> And([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.And); - } - - /// - /// Dynamically apply a logical And operator between the items in the outer observable list. - /// Items which are in any of the sources are included in the result - /// - /// - /// The source. - /// - public static IObservable> And([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.And); - } - - /// - /// Apply a logical Except operator between the collections. - /// Items which are in the source and not in the others are included in the result - /// - /// - /// The source. - /// The others. - /// - public static IObservable> Except([NotNull] this IObservable> source, - params IObservable>[] others) - { - return source.Combine(CombineOperator.Except, others); - } - - /// - /// Apply a logical Except operator between the collections. - /// Items which are in the source and not in the others are included in the result - /// - /// - /// The sources. - /// > - public static IObservable> Except( - [NotNull] this ICollection>> sources) - { - return sources.Combine(CombineOperator.Except); - } - - /// - /// Dynamically apply a logical Except operator. Items from the first observable list are included when an equivalent item does not exist in the other sources. - /// - /// - /// The source. - /// - public static IObservable> Except( - [NotNull] this IObservableList>> sources) - { - return sources.Combine(CombineOperator.Except); - } - - /// - /// Dynamically apply a logical Except operator. Items from the first observable list are included when an equivalent item does not exist in the other sources. - /// - /// - /// The source. - /// - public static IObservable> Except([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.Except); - } - - /// - /// Dynamically apply a logical Except operator. Items from the first observable list are included when an equivalent item does not exist in the other sources. - /// - /// - /// The source. - /// - public static IObservable> Except([NotNull] this IObservableList> sources) - { - return sources.Combine(CombineOperator.Except); - } - - private static IObservable> Combine( - [NotNull] this ICollection>> sources, - CombineOperator type) - { - if (sources == null) throw new ArgumentNullException(nameof(sources)); - - return new Combiner(sources, type).Run(); - } - - private static IObservable> Combine([NotNull] this IObservable> source, - CombineOperator type, - params IObservable>[] others) - { - if (source == null) throw new ArgumentNullException(nameof(source)); - if (others.Length == 0) - throw new ArgumentException("Must be at least one item to combine with", nameof(others)); - - var items = source.EnumerateOne().Union(others).ToList(); - return new Combiner(items, type).Run(); - } - - private static IObservable> Combine([NotNull] this IObservableList> sources, - CombineOperator type) - { - if (sources == null) throw new ArgumentNullException(nameof(sources)); - - return Observable.Create>(observer => - { - var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList(); - var subscriber = changesSetList.Combine(type).SubscribeSafe(observer); - return new CompositeDisposable(changesSetList, subscriber); - }); - } - - private static IObservable> Combine([NotNull] this IObservableList> sources, - CombineOperator type) - { - if (sources == null) throw new ArgumentNullException(nameof(sources)); - - return Observable.Create>(observer => - { - var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList(); - var subscriber = changesSetList.Combine(type).SubscribeSafe(observer); - return new CompositeDisposable(changesSetList, subscriber); - }); - } - - private static IObservable> Combine( - [NotNull] this IObservableList>> sources, CombineOperator type) - { - if (sources == null) throw new ArgumentNullException(nameof(sources)); - return new DynamicCombiner(sources, type).Run(); - } - - #endregion - - #region Switch - - /// - /// Transforms an observable sequence of observable lists into a single sequence - /// producing values only from the most recent observable sequence. - /// Each time a new inner observable sequence is received, unsubscribe from the - /// previous inner observable sequence and clear the existing result set - /// - /// The type of the object. - /// The source. - /// - /// The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. - /// - /// - /// is null. - public static IObservable> Switch(this IObservable> sources) - { - if (sources == null) throw new ArgumentNullException(nameof(sources)); - return sources.Select(cache => cache.Connect()).Switch(); - } - - /// - /// Transforms an observable sequence of observable changes sets into an observable sequence - /// producing values only from the most recent observable sequence. - /// Each time a new inner observable sequence is received, unsubscribe from the - /// previous inner observable sequence and clear the existing resukt set - /// - /// The type of the object. - /// The source. - /// - /// The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. - /// - /// - /// is null. - public static IObservable> Switch(this IObservable>> sources) - { - if (sources == null) throw new ArgumentNullException(nameof(sources)); - return new Switch(sources).Run(); - } - - #endregion - - - #region Start with - - /// - /// Prepends an empty changeset to the source - /// - public static IObservable> StartWithEmpty(this IObservable> source) - { - return source.StartWith(ChangeSet.Empty); - } - - - - #endregion - } -} +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.ComponentModel; +using System.Linq; +using System.Linq.Expressions; +using System.Reactive; +using System.Reactive.Concurrency; +using System.Reactive.Disposables; +using System.Reactive.Linq; +using System.Reactive.Subjects; +using System.Threading.Tasks; +using DynamicData.Annotations; +using DynamicData.Binding; +using DynamicData.Cache.Internal; +using DynamicData.Kernel; +using DynamicData.List.Internal; +using DynamicData.List.Linq; + +// ReSharper disable once CheckNamespace + +namespace DynamicData +{ + /// + /// Extensions for ObservableList + /// + public static class ObservableListEx + { + #region Populate change set from standard rx observable + + /// + /// Converts the observable to an observable changeset. + /// + /// The type of the object. + /// The source. + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable source, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + + return ToObservableChangeSet(source, null, -1, scheduler); + } + + /// + /// Converts the observable to an observable changeset, allowing time expiry to be specified + /// + /// The type of the object. + /// The source. + /// Specify on a per object level the maximum time before an object expires from a cache + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable source, + Func expireAfter, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (expireAfter == null) throw new ArgumentNullException(nameof(expireAfter)); + + return ToObservableChangeSet(source, expireAfter, -1, scheduler); + } + + /// + /// Converts the observable to an observable changeset, with a specified limit of how large the list can be. + /// + /// The type of the object. + /// The source. + /// Remove the oldest items when the size has reached this limit + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable source, + int limitSizeTo, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return ToObservableChangeSet(source, null, limitSizeTo, scheduler); + } + + /// + /// Converts the observable to an observable changeset, allowing size and time limit to be specified + /// + /// The type of the object. + /// The source. + /// Specify on a per object level the maximum time before an object expires from a cache + /// Remove the oldest items when the size has reached this limit + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable source, + Func expireAfter, + int limitSizeTo, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new ToObservableChangeSet(source, expireAfter, limitSizeTo, scheduler).Run(); + } + + /// + /// Converts the observable to an observable changeset. + /// + /// The type of the object. + /// The source. + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable> source, + IScheduler scheduler = null) + { + return ToObservableChangeSet(source, null, -1, scheduler); + } + + /// + /// Converts the observable to an observable changeset, allowing size and time limit to be specified + /// + /// The type of the object. + /// The source. + /// Remove the oldest items when the size has reached this limit + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable> source, + int limitSizeTo, + IScheduler scheduler = null) + { + return ToObservableChangeSet(source, null, limitSizeTo, scheduler); + } + + /// + /// Converts the observable to an observable changeset, allowing size to be specified + /// + /// The type of the object. + /// The source. + /// Specify on a per object level the maximum time before an object expires from a cache + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable> source, + Func expireAfter, + IScheduler scheduler = null) + { + return ToObservableChangeSet(source, expireAfter, 0, scheduler); + } + + /// + /// Converts the observable to an observable changeset, allowing size and time limit to be specified + /// + /// The type of the object. + /// The source. + /// Specify on a per object level the maximum time before an object expires from a cache + /// Remove the oldest items when the size has reached this limit + /// The scheduler (only used for time expiry). + /// + /// source + /// or + /// keySelector + public static IObservable> ToObservableChangeSet(this IObservable> source, + Func expireAfter, + int limitSizeTo, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new ToObservableChangeSet(source, expireAfter, limitSizeTo, scheduler).Run(); + } + + #endregion + + #region Auto Refresh + + /// + /// Automatically refresh downstream operators when any property changes. + /// + /// The source observable + /// Batch up changes by specifying the buffer. This greatly increases performance when many elements have sucessive property changes + /// When observing on multiple property changes, apply a throttle to prevent excessive refesh invocations + /// The scheduler + /// An observable change set with additional refresh changes + public static IObservable> AutoRefresh(this IObservable> source, + TimeSpan? changeSetBuffer = null, + TimeSpan? propertyChangeThrottle = null, + IScheduler scheduler = null) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + + return source.AutoRefreshOnObservable(t => + { + if (propertyChangeThrottle == null) + return t.WhenAnyPropertyChanged(); + + return t.WhenAnyPropertyChanged() + .Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default); + + }, changeSetBuffer, scheduler); + } + + /// + /// Automatically refresh downstream operators when properties change. + /// + /// The source observable + /// Specify a property to observe changes. When it changes a Refresh is invoked + /// Batch up changes by specifying the buffer. This greatly increases performance when many elements have sucessive property changes + /// When observing on multiple property changes, apply a throttle to prevent excessive refesh invocations + /// The scheduler + /// An observable change set with additional refresh changes + public static IObservable> AutoRefresh(this IObservable> source, + Expression> propertyAccessor, + TimeSpan? changeSetBuffer = null, + TimeSpan? propertyChangeThrottle = null, + IScheduler scheduler = null) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); + + return source.AutoRefreshOnObservable(t => + { + if (propertyChangeThrottle == null) + return t.WhenPropertyChanged(propertyAccessor, false); + + return t.WhenPropertyChanged(propertyAccessor,false) + .Throttle(propertyChangeThrottle.Value, scheduler ?? Scheduler.Default); + + }, changeSetBuffer, scheduler); + } + + /// + /// Automatically refresh downstream operator. The refresh is triggered when the observable receives a notification + /// + /// The source observable change set + /// An observable which acts on items within the collection and produces a value when the item should be refreshed + /// Batch up changes by specifying the buffer. This greatly increases performance when many elements require a refresh + /// The scheduler + /// An observable change set with additional refresh changes + public static IObservable> AutoRefreshOnObservable(this IObservable> source, + Func> reevaluator, + TimeSpan? changeSetBuffer = null, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (reevaluator == null) throw new ArgumentNullException(nameof(reevaluator)); + return new AutoRefresh(source, reevaluator, changeSetBuffer, scheduler).Run(); + } + + + /// + /// Supress refresh notifications + /// + /// The source observable change set + /// + public static IObservable> SupressRefresh(this IObservable> source) + { + return source.WhereReasonsAreNot(ListChangeReason.Refresh); + } + + + #endregion + + #region Conversion + + /// + /// Removes the index from all changes. + /// + /// NB: This operator has been introduced as a temporary fix for creating an Or operator using merge many. + /// + /// The type of the object. + /// The source. + /// + /// + public static IObservable> RemoveIndex([NotNull] this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.Select(changes => new ChangeSet(changes.YieldWithoutIndex())); + } + + /// + /// Adds a key to the change set result which enables all observable cache features of dynamic data + /// + /// + /// All indexed changes are dropped i.e. sorting is not supported by this function + /// + /// The type of object. + /// The type of key. + /// The source. + /// The key selector. + /// + /// + /// + public static IObservable> AddKey( + [NotNull] this IObservable> source, [NotNull] Func keySelector) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (keySelector == null) throw new ArgumentNullException(nameof(keySelector)); + return source.Select(changes => new ChangeSet(new AddKeyEnumerator(changes, keySelector))); + } + + /// + /// Convert the object using the sepcified conversion function. + /// + /// This is a lighter equivalent of Transform and is designed to be used with non-disposable objects + /// + /// The type of the object. + /// The type of the destination. + /// The source. + /// The conversion factory. + /// + /// + /// + [Obsolete("Prefer Cast as it is does the same thing but is semantically correct")] + public static IObservable> Convert( + [NotNull] this IObservable> source, + [NotNull] Func conversionFactory) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (conversionFactory == null) throw new ArgumentNullException(nameof(conversionFactory)); + return source.Select(changes => changes.Transform(conversionFactory)); + } + + + + /// + /// Cast the underlying type of an object. Use before a Cast function + /// + /// + /// The source. + /// + public static IObservable> CastToObject(this IObservable> source) + { + return source.Select(changes => + { + var items = changes.Transform(t => (object)t); + return new ChangeSet(items); + }); + } + + /// + /// Cast the changes to another form + /// + /// The type of the destination. + /// The source. + /// + /// + /// + public static IObservable> Cast([NotNull] this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.Select(changes => changes.Transform(t=>(TDestination)t)); + } + + /// + /// Cast the changes to another form + /// + /// Alas, I had to add the converter due to type inference issues. The converter can be avoided by CastToObject() first + /// + /// The type of the object. + /// The type of the destination. + /// The source. + /// The conversion factory. + /// + /// + /// + public static IObservable> Cast([NotNull] this IObservable> source, + [NotNull] Func conversionFactory) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (conversionFactory == null) throw new ArgumentNullException(nameof(conversionFactory)); + return source.Select(changes => changes.Transform(conversionFactory)); + } + + #endregion + + #region Binding + + /// + /// Binds a clone of the observable changeset to the target observable collection + /// + /// + /// The source. + /// The target collection. + /// The reset threshold. + /// + /// + /// source + /// or + /// targetCollection + /// + public static IObservable> Bind([NotNull] this IObservable> source, + [NotNull] IObservableCollection targetCollection, int resetThreshold = 25) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (targetCollection == null) throw new ArgumentNullException(nameof(targetCollection)); + + var adaptor = new ObservableCollectionAdaptor(targetCollection, resetThreshold); + return source.Adapt(adaptor); + } + + /// + /// Creates a binding to a readonly observable collection which is specified as an 'out' parameter + /// + /// + /// The source. + /// The resulting read only observable collection. + /// The reset threshold. + /// A continuation of the source stream + /// + /// + public static IObservable> Bind([NotNull] this IObservable> source, + out ReadOnlyObservableCollection readOnlyObservableCollection, int resetThreshold = 25) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + + var target = new ObservableCollectionExtended(); + var result = new ReadOnlyObservableCollection(target); + var adaptor = new ObservableCollectionAdaptor(target, resetThreshold); + readOnlyObservableCollection = result; + return source.Adapt(adaptor); + } + +#if SUPPORTS_BINDINGLIST + + /// + /// Binds a clone of the observable changeset to the target observable collection + /// + /// + /// The source. + /// The target binding list + /// The reset threshold. + /// + /// source + /// or + /// targetCollection + /// + public static IObservable> Bind([NotNull] this IObservable> source, + [NotNull] BindingList bindingList, int resetThreshold = 25) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (bindingList == null) throw new ArgumentNullException(nameof(bindingList)); + + return source.Adapt(new BindingListAdaptor(bindingList, resetThreshold)); + } + +#endif + + + /// + /// Injects a side effect into a changeset observable + /// + /// + /// The source. + /// The adaptor. + /// + /// + /// source + /// or + /// adaptor + /// + public static IObservable> Adapt([NotNull] this IObservable> source, + [NotNull] IChangeSetAdaptor adaptor) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (adaptor == null) throw new ArgumentNullException(nameof(adaptor)); + + + return Observable.Create>(observer => + { + var locker = new object(); + return source + .Synchronize(locker) + .Select(changes => + { + adaptor.Adapt(changes); + return changes; + }).SubscribeSafe(observer); + }); + } + + #endregion + + #region Populate into an observable list + + /// + /// list. + /// + /// The type of the object. + /// The source. + /// The destination. + /// + /// + /// source + /// or + /// destination + /// + /// source + /// or + /// destination + public static IDisposable PopulateInto([NotNull] this IObservable> source, + [NotNull] ISourceList destination) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (destination == null) throw new ArgumentNullException(nameof(destination)); + + return source.Subscribe(changes => destination.Edit(updater => updater.Clone(changes))); + } + + /// + /// Converts the source list to an read only observable list + /// + /// + /// The source. + /// + /// source + public static IObservableList AsObservableList([NotNull] this ISourceList source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new AnonymousObservableList(source); + } + + /// + /// Converts the source observable to an read only observable list + /// + /// + /// The source. + /// + /// source + public static IObservableList AsObservableList([NotNull] this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new AnonymousObservableList(source); + } + + /// + /// List equivalent to Publish().RefCount(). The source is cached so long as there is at least 1 subscriber. + /// + /// + /// The source. + /// + /// + public static IObservable> RefCount([NotNull] this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new RefCount(source).Run(); + } + + #endregion + + #region Core List Operators + + /// + /// Filters the source using the specified valueSelector + /// + /// + /// The source. + /// The valueSelector. + /// + /// source + public static IObservable> Filter(this IObservable> source, Func predicate) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (predicate == null) throw new ArgumentNullException(nameof(predicate)); + return new Filter(source, predicate).Run(); + } + + /// + /// Filters source using the specified filter observable predicate. + /// + /// + /// The source. + /// + /// Should the filter clear and replace, or calculate a diff-set + /// + /// source + /// or + /// filterController + public static IObservable> Filter([NotNull] this IObservable> source, [NotNull] IObservable> predicate, ListFilterPolicy filterPolicy = ListFilterPolicy.CalculateDiff) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (predicate == null) throw new ArgumentNullException(nameof(predicate)); + + return new Filter(source, predicate, filterPolicy).Run(); + } + + + /// + /// Filters source on the specified property using the specified predicate. + /// + /// The filter will automatically reapply when a property changes + /// + /// The type of the object. + /// The type of the property. + /// The source. + /// The property selector. When the property changes the filter specified will be re-evaluated + /// A predicate based on the object which contains the changed property + /// The property changed throttle. + /// The scheduler used when throttling + /// + /// + /// + [Obsolete("Use AutoRefresh(), followed by Filter() instead")] + public static IObservable> FilterOnProperty(this IObservable> source, + Expression> propertySelector, + Func predicate, + TimeSpan? propertyChangedThrottle = null, + IScheduler scheduler = null) where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (propertySelector == null) throw new ArgumentNullException(nameof(propertySelector)); + if (predicate == null) throw new ArgumentNullException(nameof(predicate)); + return new FilterOnProperty(source, propertySelector, predicate, propertyChangedThrottle, scheduler).Run(); + } + + + /// + /// Reverse sort of the changset + /// + /// + /// The source. + /// + /// + /// source + /// or + /// comparer + /// + public static IObservable> Reverse(this IObservable> source) + { + var reverser = new Reverser(); + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.Select(changes => new ChangeSet(reverser.Reverse(changes))); + } + + /// + /// Projects each update item to a new form using the specified transform function + /// + /// The type of the source. + /// The type of the destination. + /// The source. + /// The transform factory. + /// Should a new transform be applied when a refresh event is received + /// + /// + /// source + /// or + /// valueSelector + /// + public static IObservable> Transform(this IObservable> source, + Func transformFactory, + bool transformOnRefresh = false) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); + return source.Transform((t, previous, idx) => transformFactory(t), transformOnRefresh); + } + + /// + /// Projects each update item to a new form using the specified transform function + /// + /// The type of the source. + /// The type of the destination. + /// The source. + /// The transform fuunction + /// Should a new transform be applied when a refresh event is received + /// A an observable changeset of the transformed object + /// + /// source + /// or + /// valueSelector + /// + public static IObservable> Transform(this IObservable> source, + Func transformFactory, + bool transformOnRefresh = false) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); + + return source.Transform((t, previous, idx) => transformFactory(t,idx),transformOnRefresh); + } + + /// + /// Projects each update item to a new form using the specified transform function. + /// + /// *** Annoyingly when using this overload you will have to explicitly specify the generic type arguments as type inference fails + /// + /// The type of the source. + /// The type of the destination. + /// The source. + /// The transform function + /// Should a new transform be applied when a refresh event is received + /// A an observable changeset of the transformed object + /// + /// source + /// or + /// valueSelector + /// + public static IObservable> Transform(this IObservable> source, + Func, TDestination> transformFactory, + bool transformOnRefresh = false) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); + + return source.Transform((t, previous, idx) => transformFactory(t, previous), transformOnRefresh); + } + + /// + /// Projects each update item to a new form using the specified transform function + /// + /// *** Annoyingly when using this overload you will have to explicy specify the generic type arguments as type inference fails + /// + /// The type of the source. + /// The type of the destination. + /// The source. + /// The transform factory. + /// Should a new transform be applied when a refresh event is received + /// A an observable changeset of the transformed object + /// + /// source + /// or + /// valueSelector + /// + public static IObservable> Transform(this IObservable> source, + Func, int, TDestination> transformFactory, bool transformOnRefresh = false) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); + + return new Transformer(source, transformFactory, transformOnRefresh).Run(); + } + + + /// + /// Projects each update item to a new form using the specified transform function + /// + /// The type of the source. + /// The type of the destination. + /// The source. + /// The transform factory. + /// A an observable changeset of the transformed object + /// + /// source + /// or + /// valueSelector + /// + public static IObservable> TransformAsync( + this IObservable> source, Func> transformFactory) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (transformFactory == null) throw new ArgumentNullException(nameof(transformFactory)); + + return new TransformAsync(source, transformFactory).Run(); + } + + /// + /// Equivalent to a select many transform. To work, the key must individually identify each child. + /// + /// The type of the destination. + /// The type of the source. + /// The source. + /// The manyselector. + /// Used when an item has been replaced to determine whether child items are the same as previous children + /// + /// + /// source + /// or + /// manyselector + /// + public static IObservable> TransformMany( [NotNull] this IObservable> source, + [NotNull] Func> manyselector, + IEqualityComparer equalityComparer = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (manyselector == null) throw new ArgumentNullException(nameof(manyselector)); + return new TransformMany(source, manyselector, equalityComparer).Run(); + } + + /// + /// Flatten the nested observable collection, and observe subsequentl observable collection changes + /// + /// The type of the destination. + /// The type of the source. + /// The source. + /// The manyselector. + /// Used when an item has been replaced to determine whether child items are the same as previous children + public static IObservable> TransformMany( this IObservable> source, + Func> manyselector, + IEqualityComparer equalityComparer = null) + { + return new TransformMany(source,manyselector, equalityComparer).Run(); + } + + /// + /// Flatten the nested observable collection, and observe subsequentl observable collection changes + /// + /// The type of the destination. + /// The type of the source. + /// The source. + /// The manyselector. + /// Used when an item has been replaced to determine whether child items are the same as previous children + public static IObservable> TransformMany(this IObservable> source, + Func> manyselector, + IEqualityComparer equalityComparer = null) + { + return new TransformMany(source, manyselector, equalityComparer).Run(); + } + + /// + /// Flatten the nested observable list, and observe subsequent observable collection changes + /// + /// The type of the destination. + /// The type of the source. + /// The source. + /// The manyselector. + /// Used when an item has been replaced to determine whether child items are the same as previous children + public static IObservable> TransformMany(this IObservable> source, + Func> manyselector, + IEqualityComparer equalityComparer = null) + { + return new TransformMany(source, manyselector, equalityComparer).Run(); + } + + /// + /// Selects distinct values from the source, using the specified value selector + /// + /// The type of the source. + /// The type of the destination. + /// The source. + /// The transform factory. + /// + /// + /// source + /// or + /// valueSelector + /// + public static IObservable> DistinctValues( + this IObservable> source, + Func valueSelector) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (valueSelector == null) throw new ArgumentNullException(nameof(valueSelector)); + return new Distinct(source, valueSelector).Run(); + } + + /// + /// Groups the source on the value returned by group selector factory. The groupings contains an inner observable list. + /// + /// The type of the object. + /// The type of the group. + /// The source. + /// The group selector. + /// Force the grouping function to recalculate the group value. + /// For example if you have a time based grouping with values like `Last Minute', 'Last Hour', 'Today' etc regrouper is used to refresh these groupings + /// + /// + /// source + /// or + /// groupSelector + /// + public static IObservable>> GroupOn( + this IObservable> source, Func groupSelector, + IObservable regrouper = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (groupSelector == null) throw new ArgumentNullException(nameof(groupSelector)); + return new GroupOn(source, groupSelector, regrouper).Run(); + } + + /// + /// Groups the source on the value returned by group selector factory. Each update produces immuatable grouping. + /// + /// The type of the object. + /// The type of the group key. + /// The source. + /// The group selector key. + /// Force the grouping function to recalculate the group value. + /// For example if you have a time based grouping with values like `Last Minute', 'Last Hour', 'Today' etc regrouper is used to refresh these groupings + /// + /// + /// + /// source + /// or + /// groupSelectorKey + /// + public static IObservable>> GroupWithImmutableState + (this IObservable> source, + Func groupSelectorKey, + IObservable regrouper = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (groupSelectorKey == null) throw new ArgumentNullException(nameof(groupSelectorKey)); + + return new GroupOnImmutable(source, groupSelectorKey, regrouper).Run(); + } + + + /// + /// Groups the source using the property specified by the property selector. The resulting groupings contains an inner observable list. + /// Groups are re-applied when the property value changed. + /// When there are likely to be a large number of group property changes specify a throttle to improve performance + /// + /// The type of the object. + /// The type of the group. + /// The source. + /// The property selector used to group the items + /// The property changed throttle. + /// The scheduler. + /// + /// + /// + public static IObservable>> GroupOnProperty( + this IObservable> source, + Expression> propertySelector, + TimeSpan? propertyChangedThrottle = null, + IScheduler scheduler = null) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (propertySelector == null) throw new ArgumentNullException(nameof(propertySelector)); + return + new GroupOnProperty(source, propertySelector, propertyChangedThrottle, scheduler).Run(); + } + + /// + /// Groups the source using the property specified by the property selector. The resulting groupings are immutable. + /// Groups are re-applied when the property value changed. + /// When there are likely to be a large number of group property changes specify a throttle to improve performance + /// + /// The type of the object. + /// The type of the group. + /// The source. + /// The property selector used to group the items + /// The property changed throttle. + /// The scheduler. + /// + /// + /// + public static IObservable>> GroupOnPropertyWithImmutableState(this IObservable> source, + Expression> propertySelector, + TimeSpan? propertyChangedThrottle = null, + IScheduler scheduler = null) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (propertySelector == null) throw new ArgumentNullException(nameof(propertySelector)); + return new GroupOnPropertyWithImmutableState(source, propertySelector, propertyChangedThrottle, scheduler).Run(); + } + + /// + /// Prevents an empty notification + /// + /// + /// The source. + /// + /// source + public static IObservable> NotEmpty(this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.Where(s => s.Count != 0); + } + + /// + /// Clones the target list as a side effect of the stream + /// + /// + /// The source. + /// + /// + /// source + public static IObservable> Clone(this IObservable> source, IList target) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.Do(target.Clone); + } + + #endregion + + #region Sort + + /// + /// Sorts the sequence using the specified comparer. + /// + /// + /// The source. + /// The comparer used for sorting + /// For improved performance, specify SortOptions.UseBinarySearch. This can only be used when the values which are sorted on are immutable + /// Since sorting can be slow for large record sets, the reset threshold is used to force the list re-ordered + /// OnNext of this observable causes data to resort. This is required when the value which is sorted on mutable + /// An observable comparer used to change the comparer on which the sorted list i + /// + /// source + /// or + /// comparer + public static IObservable> Sort(this IObservable> source, + IComparer comparer, + SortOptions options = SortOptions.None, + IObservable resort = null, + IObservable> comparerChanged = null, + int resetThreshold = 50) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (comparer == null) throw new ArgumentNullException(nameof(comparer)); + + return new Sort(source, comparer, options, resort, comparerChanged, resetThreshold).Run(); + } + + /// + /// Sorts the sequence using the specified observable comparer. + /// + /// + /// The source. + /// For improved performance, specify SortOptions.UseBinarySearch. This can only be used when the values which are sorted on are immutable + /// Since sorting can be slow for large record sets, the reset threshold is used to force the list re-ordered + /// OnNext of this observable causes data to resort. This is required when the value which is sorted on mutable + /// An observable comparer used to change the comparer on which the sorted list i + /// + /// source + /// or + /// comparer + public static IObservable> Sort(this IObservable> source, + IObservable> comparerChanged, + SortOptions options = SortOptions.None, + IObservable resort = null, + int resetThreshold = 50) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (comparerChanged == null) throw new ArgumentNullException(nameof(comparerChanged)); + + return new Sort(source, null, options, resort, comparerChanged, resetThreshold).Run(); + } + + #endregion + + #region Item operators + + /// + /// Provides a call back for each item change. + /// + /// The type of the object. + /// The source. + /// The action. + /// + /// + /// + public static IObservable> ForEachChange( + [NotNull] this IObservable> source, + [NotNull] Action> action) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (action == null) throw new ArgumentNullException(nameof(action)); + return source.Do(changes => changes.ForEach(action)); + } + + /// + /// Provides a call back for each item change. + /// + /// Range changes are flattened, so there is only need to check for Add, Replace, Remove and Clear + /// + /// The type of the object. + /// The source. + /// The action. + /// + /// + /// + public static IObservable> ForEachItemChange( + [NotNull] this IObservable> source, + [NotNull] Action> action) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (action == null) throw new ArgumentNullException(nameof(action)); + return source.Do(changes => changes.Flatten().ForEach(action)); + } + + /// + /// Dynamically merges the observable which is selected from each item in the stream, and unmerges the item + /// when it is no longer part of the stream. + /// + /// The type of the object. + /// The type of the destination. + /// The source. + /// The observable selector. + /// + /// source + /// or + /// observableSelector + public static IObservable MergeMany( + [NotNull] this IObservable> source, + [NotNull] Func> observableSelector) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (observableSelector == null) throw new ArgumentNullException(nameof(observableSelector)); + + return new MergeMany(source, observableSelector).Run(); + } + + /// + /// Watches each item in the collection and notifies when any of them has changed + /// + /// + /// The type of the value. + /// The source. + /// The property accessor. + /// if set to true [notify on initial value]. + /// + /// + /// + public static IObservable WhenValueChanged( + [NotNull] this IObservable> source, + [NotNull] Expression> propertyAccessor, + bool notifyOnInitialValue = true) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); + + var factory = propertyAccessor.GetFactory(); + return source.MergeMany(t => factory(t, notifyOnInitialValue).Select(pv=>pv.Value)); + } + + /// + /// Watches each item in the collection and notifies when any of them has changed + /// + /// + /// The type of the value. + /// The source. + /// The property accessor. + /// if set to true [notify on initial value]. + /// + /// + /// + public static IObservable> WhenPropertyChanged( + [NotNull] this IObservable> source, + [NotNull] Expression> propertyAccessor, + bool notifyOnInitialValue = true) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (propertyAccessor == null) throw new ArgumentNullException(nameof(propertyAccessor)); + + var factory = propertyAccessor.GetFactory(); + return source.MergeMany(t => factory(t, notifyOnInitialValue)); + } + + /// + /// Watches each item in the collection and notifies when any of them has changed + /// + /// The type of the object. + /// The source. + /// specify properties to Monitor, or omit to monitor all property changes + /// + /// + /// + public static IObservable WhenAnyPropertyChanged([NotNull] this IObservable> source, params string[] propertiesToMonitor) + where TObject : INotifyPropertyChanged + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.MergeMany(t => t.WhenAnyPropertyChanged(propertiesToMonitor)); + } + + /// + /// Subscribes to each item when it is added to the stream and unsubcribes when it is removed. All items will be unsubscribed when the stream is disposed + /// + /// The type of the object. + /// The source. + /// The subsription function + /// + /// source + /// or + /// subscriptionFactory + /// + /// Subscribes to each item when it is added or updates and unsubcribes when it is removed + /// + public static IObservable> SubscribeMany(this IObservable> source, + Func subscriptionFactory) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (subscriptionFactory == null) throw new ArgumentNullException(nameof(subscriptionFactory)); + return new SubscribeMany(source, subscriptionFactory).Run(); + } + + /// + /// Disposes each item when no longer required. + /// + /// Individual items are disposed when removed or replaced. All items + /// are disposed when the stream is disposed + /// + /// + /// + /// The type of the object. + /// The source. + /// A continuation of the original stream + /// source + public static IObservable> DisposeMany(this IObservable> source) + { + return source.OnItemRemoved(t => + { + var d = t as IDisposable; + d?.Dispose(); + }); + } + + /// + /// Callback for each item as and when it is being removed from the stream + /// + /// The type of the object. + /// The source. + /// The remove action. + /// + /// + /// source + /// or + /// removeAction + /// + public static IObservable> OnItemRemoved(this IObservable> source, + Action removeAction) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (removeAction == null) throw new ArgumentNullException(nameof(removeAction)); + + return new OnBeingRemoved(source, removeAction).Run(); + } + + /// + /// Callback for each item as and when it is being added to the stream + /// + /// + /// The source. + /// The add action. + /// + public static IObservable> OnItemAdded(this IObservable> source, + Action addAction) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (addAction == null) throw new ArgumentNullException(nameof(addAction)); + return new OnBeingAdded(source, addAction).Run(); + } + + #endregion + + #region Reason filtering + + /// + /// Includes changes for the specified reasons only + /// + /// + /// The source. + /// The reasons. + /// + /// Must enter at least 1 reason + public static IObservable> WhereReasonsAre(this IObservable> source, + params ListChangeReason[] reasons) + { + if (reasons.Length == 0) + throw new ArgumentException("Must enter at least 1 reason", nameof(reasons)); + + var matches = new HashSet(reasons); + return source.Select(changes => + { + var filtered = changes.Where(change => matches.Contains(change.Reason)).YieldWithoutIndex(); + return new ChangeSet(filtered); + }).NotEmpty(); + } + + /// + /// Excludes updates for the specified reasons + /// + /// + /// The source. + /// The reasons. + /// + /// Must enter at least 1 reason + public static IObservable> WhereReasonsAreNot(this IObservable> source, + params ListChangeReason[] reasons) + { + if (reasons.Length == 0) + throw new ArgumentException("Must enter at least 1 reason", nameof(reasons)); + + var matches = new HashSet(reasons); + return source.Select(updates => + { + var filtered = updates.Where(u => !matches.Contains(u.Reason)).YieldWithoutIndex(); + return new ChangeSet(filtered); + }).NotEmpty(); + } + + #endregion + + #region Buffering + + /// + /// Buffers changes for an intial period only. After the period has elapsed, not further buffering occurs. + /// + /// The source changeset + /// The period to buffer, measure from the time that the first item arrives + /// The scheduler to buffer on + public static IObservable> BufferInitial(this IObservable> source, TimeSpan initalBuffer, IScheduler scheduler = null) + { + return source.DeferUntilLoaded().Publish(shared => + { + var initial = shared.Buffer(initalBuffer, scheduler ?? Scheduler.Default) + .FlattenBufferResult() + .Take(1); + + return initial.Concat(shared); + }); + } + + /// + /// Convert the result of a buffer operation to a change set + /// + /// + /// The source. + /// + public static IObservable> FlattenBufferResult(this IObservable>> source) + { + return source + .Where(x => x.Count != 0) + .Select(updates => new ChangeSet(updates.SelectMany(u => u))); + } + + /// + /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. + /// When a resume signal has been received the batched updates will be fired. + /// + /// The type of the object. + /// The source. + /// When true, observable begins to buffer and when false, window closes and buffered result if notified + /// The scheduler. + /// + /// source + public static IObservable> BufferIf([NotNull] this IObservable> source, + [NotNull] IObservable pauseIfTrueSelector, + IScheduler scheduler = null) + { + return BufferIf(source, pauseIfTrueSelector, false, scheduler); + } + + /// + /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. + /// When a resume signal has been received the batched updates will be fired. + /// + /// The type of the object. + /// The source. + /// When true, observable begins to buffer and when false, window closes and buffered result if notified + /// if set to true [intial pause state]. + /// The scheduler. + /// + /// source + public static IObservable> BufferIf([NotNull] this IObservable> source, + [NotNull] IObservable pauseIfTrueSelector, + bool intialPauseState = false, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (pauseIfTrueSelector == null) throw new ArgumentNullException(nameof(pauseIfTrueSelector)); + return BufferIf(source, pauseIfTrueSelector, intialPauseState, null, scheduler); + } + + /// + /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. + /// When a resume signal has been received the batched updates will be fired. + /// + /// The type of the object. + /// The source. + /// When true, observable begins to buffer and when false, window closes and buffered result if notified + /// Specify a time to ensure the buffer window does not stay open for too long + /// The scheduler. + /// + /// source + public static IObservable> BufferIf(this IObservable> source, + IObservable pauseIfTrueSelector, + TimeSpan? timeOut = null, + IScheduler scheduler = null) + { + return BufferIf(source, pauseIfTrueSelector, false, timeOut, scheduler); + } + + /// + /// Batches the underlying updates if a pause signal (i.e when the buffer selector return true) has been received. + /// When a resume signal has been received the batched updates will be fired. + /// + /// The type of the object. + /// The source. + /// When true, observable begins to buffer and when false, window closes and buffered result if notified + /// if set to true [intial pause state]. + /// Specify a time to ensure the buffer window does not stay open for too long + /// The scheduler. + /// + /// source + public static IObservable> BufferIf(this IObservable> source, + IObservable pauseIfTrueSelector, + bool intialPauseState = false, + TimeSpan? timeOut = null, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (pauseIfTrueSelector == null) throw new ArgumentNullException(nameof(pauseIfTrueSelector)); + return new BufferIf(source, pauseIfTrueSelector, intialPauseState, timeOut, scheduler).Run(); + } + + /// + /// The latest copy of the cache is exposed for querying after each modification to the underlying data + /// + /// The type of the object. + /// The type of the destination. + /// The source. + /// The result selector. + /// + /// + /// source + /// or + /// resultSelector + /// + public static IObservable QueryWhenChanged( + this IObservable> source, + Func, TDestination> resultSelector) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (resultSelector == null) throw new ArgumentNullException(nameof(resultSelector)); + + return source.QueryWhenChanged().Select(resultSelector); + } + + /// + /// The latest copy of the cache is exposed for querying i) after each modification to the underlying data ii) upon subscription + /// + /// The type of the object. + /// The source. + /// + /// source + public static IObservable> QueryWhenChanged([NotNull] this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new QueryWhenChanged(source).Run(); + } + + /// + /// Converts the changeset into a fully formed collection. Each change in the source results in a new collection + /// + /// The type of the object. + /// The source. + /// + public static IObservable> ToCollection(this IObservable> source) + { + return source.QueryWhenChanged(items => items); + } + + /// + /// Converts the changeset into a fully formed sorted collection. Each change in the source results in a new sorted collection + /// + /// The type of the object. + /// The sort key + /// The source. + /// The sort function + /// The sort order. Defaults to ascending + /// + public static IObservable> ToSortedCollection(this IObservable> source, + Func sort, SortDirection sortOrder = SortDirection.Ascending) + { + return source.QueryWhenChanged(query => sortOrder == SortDirection.Ascending + ? new ReadOnlyCollectionLight(query.OrderBy(sort)) + : new ReadOnlyCollectionLight(query.OrderByDescending(sort))); + } + + /// + /// Converts the changeset into a fully formed sorted collection. Each change in the source results in a new sorted collection + /// + /// The type of the object. + /// The type of the key. + /// The source. + /// The sort comparer + /// + public static IObservable> ToSortedCollection(this IObservable> source, + IComparer comparer) + { + return source.QueryWhenChanged(query => + { + var items = query.AsList(); + items.Sort(comparer); + return new ReadOnlyCollectionLight(items); + }); + } + + + /// + /// Defer the subscribtion until loaded and skip initial changeset + /// + /// The type of the object. + /// The source. + /// + /// source + public static IObservable> SkipInitial(this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.DeferUntilLoaded().Skip(1); + } + + /// + /// Defer the subscription until the stream has been inflated with data + /// + /// The type of the object. + /// The source. + /// + public static IObservable> DeferUntilLoaded([NotNull] this IObservable> source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return new DeferUntilLoaded(source).Run(); + } + + /// + /// Defer the subscription until the cache has been inflated with data + /// + /// The type of the object. + /// The source. + /// + public static IObservable> DeferUntilLoaded(this IObservableList source) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + return source.Connect().DeferUntilLoaded(); + } + + #endregion + + #region Virtualisation / Paging + + /// + /// Virtualises the source using parameters provided via the requests observable + /// + /// + /// The source. + /// The requests. + /// + public static IObservable> Virtualise([NotNull] this IObservable> source, + [NotNull] IObservable requests) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (requests == null) throw new ArgumentNullException(nameof(requests)); + return new Virtualiser(source, requests).Run(); + } + + /// + /// Limits the size of the result set to the specified number of items + /// + /// + /// The source. + /// The number of items. + /// + public static IObservable> Top([NotNull] this IObservable> source, + int numberOfItems) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (numberOfItems <= 0) + throw new ArgumentOutOfRangeException(nameof(numberOfItems), + "Number of items should be greater than zero"); + + return source.Virtualise(Observable.Return(new VirtualRequest(0, numberOfItems))); + } + + + /// + /// Applies paging to the the data source + /// + /// + /// The source. + /// Observable to control page requests + /// + public static IObservable> Page([NotNull] this IObservable> source, + [NotNull] IObservable requests) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (requests == null) throw new ArgumentNullException(nameof(requests)); + return new Pager(source, requests).Run(); + } + + #endregion + + #region Expiry / size limiter + + /// + /// Limits the size of the source cache to the specified limit. + /// Notifies which items have been removed from the source list. + /// + /// + /// The source. + /// The size limit. + /// The scheduler. + /// + /// sizeLimit cannot be zero + /// source + /// sizeLimit cannot be zero + public static IObservable> LimitSizeTo([NotNull] this ISourceList source, int sizeLimit, + IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (sizeLimit <= 0) throw new ArgumentException("sizeLimit cannot be zero", nameof(sizeLimit)); + + var locker = new object(); + var limiter = new LimitSizeTo(source, sizeLimit, scheduler ?? Scheduler.Default, locker); + + return limiter.Run().Synchronize(locker).Do(source.RemoveMany); + } + + /// + /// Removes items from the cache according to the value specified by the time selector function + /// + /// + /// The source. + /// Selector returning when to expire the item. Return null for non-expiring item + /// The scheduler + /// + /// + /// + public static IObservable> ExpireAfter([NotNull] this ISourceList source, + [NotNull] Func timeSelector, IScheduler scheduler = null) + { + return source.ExpireAfter(timeSelector, null, scheduler); + } + + /// + /// Removes items from the cache according to the value specified by the time selector function + /// + /// + /// The source. + /// Selector returning when to expire the item. Return null for non-expiring item + /// Enter the polling interval to optimise expiry timers, if ommited 1 timer is created for each unique expiry time + /// The scheduler + /// + /// + /// + public static IObservable> ExpireAfter([NotNull] this ISourceList source, + [NotNull] Func timeSelector, TimeSpan? pollingInterval = null, IScheduler scheduler = null) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (timeSelector == null) throw new ArgumentNullException(nameof(timeSelector)); + + var locker = new object(); + var limiter = new ExpireAfter(source, timeSelector, pollingInterval, scheduler ?? Scheduler.Default, + locker); + + return limiter.Run().Synchronize(locker).Do(source.RemoveMany); + } + + #endregion + + #region Logical collection operators + + /// + /// Apply a logical Or operator between the collections. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Or([NotNull] this ICollection>> sources) + { + return sources.Combine(CombineOperator.Or); + } + + /// + /// Apply a logical Or operator between the collections. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// The others. + /// + public static IObservable> Or([NotNull] this IObservable> source, + params IObservable>[] others) + { + return source.Combine(CombineOperator.Or, others); + } + + /// + /// Dynamically apply a logical Or operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Or( + [NotNull] this IObservableList>> sources) + { + return sources.Combine(CombineOperator.Or); + } + + /// + /// Dynamically apply a logical Or operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Or([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.Or); + } + + /// + /// Dynamically apply a logical Or operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Or([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.Or); + } + + /// + /// Apply a logical Xor operator between the collections. + /// Items which are only in one of the sources are included in the result + /// + /// + /// The source. + /// The others. + /// + public static IObservable> Xor([NotNull] this IObservable> source, + params IObservable>[] others) + { + return source.Combine(CombineOperator.Xor, others); + } + + /// + /// Apply a logical Xor operator between the collections. + /// Items which are only in one of the sources are included in the result + /// + /// + /// The sources. + /// > + public static IObservable> Xor([NotNull] this ICollection>> sources) + { + return sources.Combine(CombineOperator.Xor); + } + + /// + /// Dynamically apply a logical Xor operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Xor( + [NotNull] this IObservableList>> sources) + { + return sources.Combine(CombineOperator.Xor); + } + + /// + /// Dynamically apply a logical Xor operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Xor([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.Xor); + } + + /// + /// Dynamically apply a logical Xor operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> Xor([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.Xor); + } + + /// + /// Apply a logical And operator between the collections. + /// Items which are in all of the sources are included in the result + /// + /// + /// The source. + /// The others. + /// + public static IObservable> And([NotNull] this IObservable> source, + params IObservable>[] others) + { + return source.Combine(CombineOperator.And, others); + } + + /// + /// Apply a logical And operator between the collections. + /// Items which are in all of the sources are included in the result + /// + /// + /// The sources. + /// > + public static IObservable> And([NotNull] this ICollection>> sources) + { + return sources.Combine(CombineOperator.And); + } + + /// + /// Dynamically apply a logical And operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> And( + [NotNull] this IObservableList>> sources) + { + return sources.Combine(CombineOperator.And); + } + + /// + /// Dynamically apply a logical And operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> And([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.And); + } + + /// + /// Dynamically apply a logical And operator between the items in the outer observable list. + /// Items which are in any of the sources are included in the result + /// + /// + /// The source. + /// + public static IObservable> And([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.And); + } + + /// + /// Apply a logical Except operator between the collections. + /// Items which are in the source and not in the others are included in the result + /// + /// + /// The source. + /// The others. + /// + public static IObservable> Except([NotNull] this IObservable> source, + params IObservable>[] others) + { + return source.Combine(CombineOperator.Except, others); + } + + /// + /// Apply a logical Except operator between the collections. + /// Items which are in the source and not in the others are included in the result + /// + /// + /// The sources. + /// > + public static IObservable> Except( + [NotNull] this ICollection>> sources) + { + return sources.Combine(CombineOperator.Except); + } + + /// + /// Dynamically apply a logical Except operator. Items from the first observable list are included when an equivalent item does not exist in the other sources. + /// + /// + /// The source. + /// + public static IObservable> Except( + [NotNull] this IObservableList>> sources) + { + return sources.Combine(CombineOperator.Except); + } + + /// + /// Dynamically apply a logical Except operator. Items from the first observable list are included when an equivalent item does not exist in the other sources. + /// + /// + /// The source. + /// + public static IObservable> Except([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.Except); + } + + /// + /// Dynamically apply a logical Except operator. Items from the first observable list are included when an equivalent item does not exist in the other sources. + /// + /// + /// The source. + /// + public static IObservable> Except([NotNull] this IObservableList> sources) + { + return sources.Combine(CombineOperator.Except); + } + + private static IObservable> Combine( + [NotNull] this ICollection>> sources, + CombineOperator type) + { + if (sources == null) throw new ArgumentNullException(nameof(sources)); + + return new Combiner(sources, type).Run(); + } + + private static IObservable> Combine([NotNull] this IObservable> source, + CombineOperator type, + params IObservable>[] others) + { + if (source == null) throw new ArgumentNullException(nameof(source)); + if (others.Length == 0) + throw new ArgumentException("Must be at least one item to combine with", nameof(others)); + + var items = source.EnumerateOne().Union(others).ToList(); + return new Combiner(items, type).Run(); + } + + private static IObservable> Combine([NotNull] this IObservableList> sources, + CombineOperator type) + { + if (sources == null) throw new ArgumentNullException(nameof(sources)); + + return Observable.Create>(observer => + { + var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList(); + var subscriber = changesSetList.Combine(type).SubscribeSafe(observer); + return new CompositeDisposable(changesSetList, subscriber); + }); + } + + private static IObservable> Combine([NotNull] this IObservableList> sources, + CombineOperator type) + { + if (sources == null) throw new ArgumentNullException(nameof(sources)); + + return Observable.Create>(observer => + { + var changesSetList = sources.Connect().Transform(s => s.Connect()).AsObservableList(); + var subscriber = changesSetList.Combine(type).SubscribeSafe(observer); + return new CompositeDisposable(changesSetList, subscriber); + }); + } + + private static IObservable> Combine( + [NotNull] this IObservableList>> sources, CombineOperator type) + { + if (sources == null) throw new ArgumentNullException(nameof(sources)); + return new DynamicCombiner(sources, type).Run(); + } + + #endregion + + #region Switch + + /// + /// Transforms an observable sequence of observable lists into a single sequence + /// producing values only from the most recent observable sequence. + /// Each time a new inner observable sequence is received, unsubscribe from the + /// previous inner observable sequence and clear the existing result set + /// + /// The type of the object. + /// The source. + /// + /// The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. + /// + /// + /// is null. + public static IObservable> Switch(this IObservable> sources) + { + if (sources == null) throw new ArgumentNullException(nameof(sources)); + return sources.Select(cache => cache.Connect()).Switch(); + } + + /// + /// Transforms an observable sequence of observable changes sets into an observable sequence + /// producing values only from the most recent observable sequence. + /// Each time a new inner observable sequence is received, unsubscribe from the + /// previous inner observable sequence and clear the existing resukt set + /// + /// The type of the object. + /// The source. + /// + /// The observable sequence that at any point in time produces the elements of the most recent inner observable sequence that has been received. + /// + /// + /// is null. + public static IObservable> Switch(this IObservable>> sources) + { + if (sources == null) throw new ArgumentNullException(nameof(sources)); + return new Switch(sources).Run(); + } + + #endregion + + + #region Start with + + /// + /// Prepends an empty changeset to the source + /// + public static IObservable> StartWithEmpty(this IObservable> source) + { + return source.StartWith(ChangeSet.Empty); + } + + + + #endregion + } +}