Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache locks #213

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 105 additions & 54 deletions DynamicData/Cache/Internal/ReaderWriter.cs
Original file line number Diff line number Diff line change
@@ -1,89 +1,102 @@
using System;
using System.Collections.Generic;
using System.Threading;
using DynamicData.Kernel;

namespace DynamicData.Cache.Internal
{
internal sealed class ReaderWriter<TObject, TKey>
internal sealed class ReaderWriter<TObject, TKey> : IDisposable
{
private readonly Func<TObject, TKey> _keySelector;
private readonly ChangeAwareCache<TObject, TKey> _changeAwareCache ;
private readonly Dictionary<TKey,TObject> _data = new Dictionary<TKey, TObject>();
private readonly object _locker = new object();
private Dictionary<TKey, TObject> _data = new Dictionary<TKey, TObject>();
private CacheUpdater<TObject, TKey> _activeUpdater = null;

private TwoStageRWLock _lock = new TwoStageRWLock(LockRecursionPolicy.SupportsRecursion);

public ReaderWriter(Func<TObject, TKey> keySelector = null)
{
_keySelector = keySelector;
_changeAwareCache = new ChangeAwareCache<TObject, TKey>(_data);
}

#region Writers

public ChangeSet<TObject, TKey> Write(IChangeSet<TObject, TKey> changes, bool notifyChanges)
public ChangeSet<TObject, TKey> Write(IChangeSet<TObject, TKey> changes, bool collectChanges)
{
if (changes == null) throw new ArgumentNullException(nameof(changes));
ChangeSet<TObject, TKey> result;
lock (_locker)

return DoUpdate(updater => updater.Clone(changes), collectChanges);
}

public ChangeSet<TObject, TKey> Write(Action<ICacheUpdater<TObject, TKey>> updateAction, bool collectChanges)
{
if (updateAction == null) throw new ArgumentNullException(nameof(updateAction));

return DoUpdate(updateAction, collectChanges);
}

public ChangeSet<TObject, TKey> Write(Action<ISourceUpdater<TObject, TKey>> updateAction, bool collectChanges)
{
if (updateAction == null) throw new ArgumentNullException(nameof(updateAction));

return DoUpdate(updateAction, collectChanges);
}

private ChangeSet<TObject, TKey> DoUpdate(Action<CacheUpdater<TObject, TKey>> updateAction, bool collectChanges)
{
_lock.EnterWriteLock();
try
{

if (notifyChanges)
if (collectChanges)
{
_changeAwareCache.Clone(changes);
result = _changeAwareCache.CaptureChanges();
var changeAwareCache = new ChangeAwareCache<TObject, TKey>(_data);

_activeUpdater = new CacheUpdater<TObject, TKey>(changeAwareCache, _keySelector);
updateAction(_activeUpdater);
_activeUpdater = null;

return changeAwareCache.CaptureChanges();
}
else
{
_data.Clone(changes);
result = ChangeSet<TObject, TKey>.Empty;
_activeUpdater = new CacheUpdater<TObject, TKey>(_data, _keySelector);
updateAction(_activeUpdater);
_activeUpdater = null;

return ChangeSet<TObject, TKey>.Empty;
}
}
return result;
}

public ChangeSet<TObject, TKey> Write(Action<ICacheUpdater<TObject, TKey>> updateAction, bool notifyChanges)
{
if (updateAction == null) throw new ArgumentNullException(nameof(updateAction));
ChangeSet<TObject, TKey> result;
lock (_locker)
finally
{
var updater = CreateUpdater(notifyChanges);
updateAction(updater);
result = _changeAwareCache.CaptureChanges();
_lock.ExitWriteLock();
}
return result;
}


public ChangeSet<TObject, TKey> Write(Action<ISourceUpdater<TObject, TKey>> updateAction, bool notifyChanges)
internal void WriteNested(Action<ISourceUpdater<TObject, TKey>> updateAction)
{
if (updateAction == null) throw new ArgumentNullException(nameof(updateAction));

ChangeSet<TObject, TKey> result;
lock (_locker)
_lock.EnterWriteLock();
try
{
var updater = CreateUpdater(notifyChanges);
updateAction(updater);
result = _changeAwareCache.CaptureChanges();
if (_activeUpdater == null)
{
throw new InvalidOperationException("WriteNested can only be used if another write is already in progress.");
}
updateAction(_activeUpdater);
}
finally
{
_lock.ExitWriteLock();
}
return result;
}

private CacheUpdater<TObject, TKey> CreateUpdater(bool notifyChanges)
{
return notifyChanges
? new CacheUpdater<TObject, TKey>(_changeAwareCache, _keySelector)
: new CacheUpdater<TObject, TKey>(_data, _keySelector);
}

#endregion

#region Accessors
public ChangeSet<TObject, TKey> GetInitialUpdates( Func<TObject, bool> filter = null)

public ChangeSet<TObject, TKey> GetInitialUpdates(Func<TObject, bool> filter = null)
{
ChangeSet<TObject, TKey> result;
lock (_locker)
_lock.EnterReadLock();
try
{
var dictionary = _data;

Expand All @@ -102,6 +115,10 @@ public ChangeSet<TObject, TKey> GetInitialUpdates( Func<TObject, bool> filter =

result = changes;
}
finally
{
_lock.ExitReadLock();
}
return result;
}

Expand All @@ -110,11 +127,16 @@ public TKey[] Keys
get
{
TKey[] result;
lock (_locker)
_lock.EnterReadLock();
try
{
result = new TKey[_data.Count];
_data.Keys.CopyTo(result, 0);
}
finally
{
_lock.ExitReadLock();
}
return result;
}
}
Expand All @@ -124,7 +146,8 @@ public KeyValuePair<TKey, TObject>[] KeyValues
get
{
KeyValuePair<TKey, TObject>[] result;
lock (_locker)
_lock.EnterReadLock();
try
{
result = new KeyValuePair<TKey, TObject>[_data.Count];
int i = 0;
Expand All @@ -134,6 +157,10 @@ public KeyValuePair<TKey, TObject>[] KeyValues
i++;
}
}
finally
{
_lock.ExitReadLock();
}
return result;
}
}
Expand All @@ -143,21 +170,33 @@ public TObject[] Items
get
{
TObject[] result;
lock (_locker)
_lock.EnterReadLock();
try
{
result = new TObject[_data.Count];
_data.Values.CopyTo(result, 0);
}
finally
{
_lock.ExitReadLock();
}
return result;
}
}

public Optional<TObject> Lookup(TKey key)
{
Optional<TObject> result;
lock (_locker)
result= _data.Lookup(key);

_lock.EnterReadLock();
try
{
result = _data.Lookup(key);
}
finally
{
_lock.ExitReadLock();
}

return result;
}

Expand All @@ -166,13 +205,25 @@ public int Count
get
{
int count;
lock (_locker)
_lock.EnterReadLock();
try
{
count = _data.Count;
}
finally
{
_lock.ExitReadLock();
}

return count;
}
}

#endregion

public void Dispose()
{
_lock.Dispose();
}
}
}
40 changes: 38 additions & 2 deletions DynamicData/Cache/ObservableCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ internal sealed class ObservableCache<TObject, TKey> : IObservableCache<TObject,
{
private readonly Subject<ChangeSet<TObject, TKey>> _changes = new Subject<ChangeSet<TObject, TKey>>();
private readonly Lazy<ISubject<int>> _countChanged = new Lazy<ISubject<int>>(() => new Subject<int>());
private int _editLevel = 0; // The level of recursion in editing.
private readonly ReaderWriter<TObject, TKey> _readerWriter;
private readonly IDisposable _cleanUp;

Expand All @@ -34,7 +35,10 @@ public ObservableCache(IObservable<IChangeSet<TObject, TKey>> source)
loader.Dispose();
_changes.OnCompleted();
if (_countChanged.IsValueCreated)
{
_countChanged.Value.OnCompleted();
}
_readerWriter.Dispose();
});
}

Expand All @@ -55,7 +59,23 @@ internal void UpdateFromIntermediate(Action<ICacheUpdater<TObject, TKey>> update
if (updateAction == null) throw new ArgumentNullException(nameof(updateAction));
lock (_writeLock)
{
InvokeNext(_readerWriter.Write(updateAction, _changes.HasObservers));
ChangeSet<TObject, TKey> changes = null;

_editLevel++;
if (_editLevel == 1)
{
changes = _readerWriter.Write(updateAction, _changes.HasObservers);
}
else
{
_readerWriter.WriteNested(updateAction);
}
_editLevel--;

if (_editLevel == 0)
{
InvokeNext(changes);
}
}
}

Expand All @@ -64,7 +84,23 @@ internal void UpdateFromSource(Action<ISourceUpdater<TObject, TKey>> updateActio
if (updateAction == null) throw new ArgumentNullException(nameof(updateAction));
lock (_writeLock)
{
InvokeNext(_readerWriter.Write(updateAction, _changes.HasObservers));
ChangeSet<TObject, TKey> changes = null;

_editLevel++;
if (_editLevel == 1)
{
changes = _readerWriter.Write(updateAction, _changes.HasObservers);
}
else
{
_readerWriter.WriteNested(updateAction);
}
_editLevel--;

if (_editLevel == 0)
{
InvokeNext(changes);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions DynamicData/List/SourceList.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ public SourceList(IObservable<IChangeSet<T>> source = null)
loader.Dispose();
OnCompleted();
if (_countChanged.IsValueCreated)
{
_countChanged.Value.OnCompleted();
}
_readerWriter.Dispose();
});
}

Expand Down