-
-
Notifications
You must be signed in to change notification settings - Fork 182
Custom Observable ChangeSets
The observable changesets from DynamicData can be created from a variety of situations thanks to the ObservableChangeSet.Create
method. This method allows almost anything to be adapted to the ChangeSet model. This article will show how this can be accomplished.
Anything that has "Add" "Update" "Remove" events might be a good candidate for adapting to the ChangeSet model. This model provides many benefits including the ability to allow initial values and future values to be treated uniformly through a single code path instead of having to handle them separately.
ObservableChangeSet.Create
is very similar to Observable.Create
in that it takes a function that is run when a new subscription is created that returns an IDisposable
that manages the subscription. However, unlike Observable.Create
, instead of receiving the IObserver<T>
, the function receives an implicit SourceCache<T, TKey>
. The function merely needs to set up whatever is needed to add/remove/update items from this cache, and the subscriber will receive all of those changes via the resulting observable changeset.
The provided cache is implicitly thread-safe so schedulers or other asynchronous methods can be used without the need for other synchronization.
DotNet provides the FileSystemWatcher
that enables monitoring the file system for changes by firing events for adds, removes, etc. However, each of those events must be handled separately. Furthermore, it doesn't do anything for existing files so those must be dealt with explicitly.
However, one can combine DynamicData
and FileSystemWatcher
to create an Observable ChangeSet for the File System. Like other Observable ChangeSets, a new subscription receives an initial changeset with Add events for all of the initial values, and this allows current files and new files to be handled through a single code path.
The following static class exposes two methods:
- An extension method for
FileSystemWatcher
calledAsObservableChangeSet
for use with code that already has an instance ofFileSystemWatcher
- A static helper method that completely hides the fact a
FileSystemWatcher
is used at all
Both methods return an IObservable<IChangeSet<FileSystemInfo, string>>
where the Key for the ChangeSet is the full file path. FileSystemInfo
is the base class for both DirectoryInfo
and FileInfo
and each value in the cache will actually be one of those concrete types.
public static class FileSystemObservable
{
// Extension method to convert a FileSystemWatcher to an Observable ChangeSet
public static IObservable<IChangeSet<FileSystemInfo, string>> AsObservableChangeSet(this FileSystemWatcher fsw) =>
ObservableChangeSet.Create<FileSystemInfo, string>(cache =>
{
// Create an observable from the Changed event that updates the cache
var observeChanged = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
o => fsw.Changed += o,
o => fsw.Changed -= o)
.Select(args => args.EventArgs)
.Do(args => cache.AddOrUpdate(GetInfo(args.FullPath)));
// Create an observable from the Created event that adds the file to the cache
var observeCreated = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
o => fsw.Created += o,
o => fsw.Created -= o)
.Select(args => args.EventArgs)
.Do(args => cache.AddOrUpdate(GetInfo(args.FullPath)));
// Create an observable from the Deleted event that removes the file from the cache
var observeDeleted = Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
o => fsw.Deleted += o,
o => fsw.Deleted -= o)
.Select(args => args.EventArgs)
.Do(args => cache.RemoveKey(args.FullPath));
// Create an observable from the Renamed event that removes the old name and adds the new name
// Use Edit so both changes are emitted together as a single ChangeSet
var observeRenamed = Observable.FromEventPattern<RenamedEventHandler, RenamedEventArgs>(
o => fsw.Renamed += o,
o => fsw.Renamed -= o)
.Select(args => args.EventArgs)
.Do(args => cache.Edit(updater =>
{
updater.RemoveKey(args.OldFullPath);
updater.AddOrUpdate(GetInfo(args.FullPath));
}));
// Subscribe to all the observables together
var subscription = Observable.Merge(observeChanged, observeCreated, observeDeleted, observeRenamed).Subscribe();
// Ensure the events are flowing
fsw.EnableRaisingEvents = true;
// Add the initial files
var initialFiles = Directory.EnumerateFileSystemEntries(fsw.Path, fsw.Filter, fsw.IncludeSubdirectories ? SearchOption.AllDirectories : SearchOption.TopDirectoryOnly)
.Select(p => GetInfo(p));
cache.AddOrUpdate(initialFiles);
// Return the Disposable that controls the subscription
return subscription;
},
fsi => fsi.FullName);
// Helper function that manages the FileSystemWatcher instance automatically
public static IObservable<IChangeSet<FileSystemInfo, string>> Create(string path, string filter, bool includeSubDirectories) =>
Observable.Using(
() => new FileSystemWatcher(path, filter){ IncludeSubdirectories = includeSubDirectories },
fsw => fsw.AsObservableChangeSet());
private static FileSystemInfo GetInfo(string fullPath) =>
Directory.Exists(fullPath) ? new DirectoryInfo(fullPath) : new FileInfo(fullPath);
}
The above code works but creating a separate observable for each of the 4 events from FileSystemWatcher
needed to keep the cache up-to-date. It adds a handler to each one that applies the appropriate changes to the implicit cache that was provided to the function and then creates a single subscription that combines all 4 together.
All though the events may fire on multiple different threads, the cache synchronizes the changes implicitly.
It then pre-populates the cache by enumerating all the file system entries and adding them so that the initial changeset emitted by the observable contains all of the initial files.
The result is a single changeset that allows consumers to uniformly deal with current files and future changes.
Here is a test fixture to show all the events and existing files are handled:
void Main()
{
var tempDir = Directory.CreateTempSubdirectory();
File.WriteAllText(Path.Join(tempDir.FullName, "ExistingFile.txt"), "Show Existing Files Show Up");
using var sub = FileSystemObservable.Create(tempDir.FullName, string.Empty, includeSubDirectories: true)
.OnItemAdded(fso => Console.WriteLine($"Added: {fso} [{fso.GetType()}]"))
.OnItemRemoved(fso => Console.WriteLine($"Removed: {fso} [{fso.GetType()}]"))
.OnItemUpdated((fso, _) => Console.WriteLine($"Updated: {fso} [{fso.GetType()}]"))
.Subscribe();
try
{
var subDir = tempDir.CreateSubdirectory(UniqueStr());
var files = Enumerable.Range(0, 10).Select(_ => Path.Join(subDir.FullName, UniqueStr() + ".temp")).ToList();
files.ForEach(path => File.WriteAllText(path, "Hello World"));
files.ForEach(path => File.AppendAllText(path, "Hello Again"));
files.ForEach(path => File.Move(path, path + ".temp2"));
}
finally
{
tempDir.Delete(recursive: true);
}
Thread.Yield();
static string UniqueStr() => $"{Guid.NewGuid():N}";
}