Skip to content

Commit

Permalink
Ensure ready future completes (with error) in the case the watcher fa…
Browse files Browse the repository at this point in the history
…ils and closes. (#123)

See #115.
  • Loading branch information
DanTup authored Jan 12, 2022
1 parent 1071dec commit f76997a
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# 1.0.2-dev

- Require Dart SDK >= 2.14
- Ensure `DirectoryWatcher.ready` completes even when errors occur that close the watcher.

# 1.0.1

Expand Down
61 changes: 41 additions & 20 deletions lib/src/directory_watcher/linux.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,32 @@ class _LinuxDirectoryWatcher

// Batch the inotify changes together so that we can dedup events.
var innerStream = _nativeEvents.stream.batchEvents();
_listen(innerStream, _onBatch, onError: _eventsController.addError);

_listen(Directory(path).list(recursive: true), (FileSystemEntity entity) {
if (entity is Directory) {
_watchSubdir(entity.path);
} else {
_files.add(entity.path);
_listen(innerStream, _onBatch,
onError: (Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
}, onError: (Object error, StackTrace stackTrace) {
_eventsController.addError(error, stackTrace);
close();
}, onDone: () {
_readyCompleter.complete();
}, cancelOnError: true);
});

_listen(
Directory(path).list(recursive: true),
(FileSystemEntity entity) {
if (entity is Directory) {
_watchSubdir(entity.path);
} else {
_files.add(entity.path);
}
},
onError: _emitError,
onDone: () {
if (!isReady) {
_readyCompleter.complete();
}
},
cancelOnError: true,
);
}

@override
Expand Down Expand Up @@ -195,15 +207,15 @@ class _LinuxDirectoryWatcher
// contents,
if (files.contains(path) && _files.contains(path)) continue;
for (var file in _files.remove(path)) {
_emit(ChangeType.REMOVE, file);
_emitEvent(ChangeType.REMOVE, file);
}
}

for (var file in files) {
if (_files.contains(file)) {
_emit(ChangeType.MODIFY, file);
_emitEvent(ChangeType.MODIFY, file);
} else {
_emit(ChangeType.ADD, file);
_emitEvent(ChangeType.ADD, file);
_files.add(file);
}
}
Expand All @@ -221,15 +233,14 @@ class _LinuxDirectoryWatcher
_watchSubdir(entity.path);
} else {
_files.add(entity.path);
_emit(ChangeType.ADD, entity.path);
_emitEvent(ChangeType.ADD, entity.path);
}
}, onError: (Object error, StackTrace stackTrace) {
// Ignore an exception caused by the dir not existing. It's fine if it
// was added and then quickly removed.
if (error is FileSystemException) return;

_eventsController.addError(error, stackTrace);
close();
_emitError(error, stackTrace);
}, cancelOnError: true);
}

Expand All @@ -242,7 +253,7 @@ class _LinuxDirectoryWatcher
// caused by a MOVE, we need to manually emit events.
if (isReady) {
for (var file in _files.paths) {
_emit(ChangeType.REMOVE, file);
_emitEvent(ChangeType.REMOVE, file);
}
}

Expand All @@ -251,12 +262,22 @@ class _LinuxDirectoryWatcher

/// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state
/// to emit events.
void _emit(ChangeType type, String path) {
void _emitEvent(ChangeType type, String path) {
if (!isReady) return;
if (_eventsController.isClosed) return;
_eventsController.add(WatchEvent(type, path));
}

/// Emit an error, then close the watcher.
void _emitError(Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
_eventsController.addError(error, stackTrace);
close();
}

/// Like [Stream.listen], but automatically adds the subscription to
/// [_subscriptions] so that it can be canceled when [close] is called.
void _listen<T>(Stream<T> stream, void Function(T) onData,
Expand Down
17 changes: 14 additions & 3 deletions lib/src/directory_watcher/mac_os.dart
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ class _MacOSDirectoryWatcher
//
// If we do receive a batch of events, [_onBatch] will ensure that these
// futures don't fire and that the directory is re-listed.
Future.wait([_listDir(), _waitForBogusEvents()])
.then((_) => _readyCompleter.complete());
Future.wait([_listDir(), _waitForBogusEvents()]).then((_) {
if (!isReady) {
_readyCompleter.complete();
}
});
}

@override
Expand All @@ -115,7 +118,11 @@ class _MacOSDirectoryWatcher
// Cancel the timer because bogus events only occur in the first batch, so
// we can fire [ready] as soon as we're done listing the directory.
_bogusEventTimer.cancel();
_listDir().then((_) => _readyCompleter.complete());
_listDir().then((_) {
if (!isReady) {
_readyCompleter.complete();
}
});
return;
}

Expand Down Expand Up @@ -396,6 +403,10 @@ class _MacOSDirectoryWatcher

/// Emit an error, then close the watcher.
void _emitError(Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
_eventsController.addError(error, stackTrace);
close();
}
Expand Down
12 changes: 8 additions & 4 deletions lib/src/directory_watcher/polling.dart
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ class _PollingDirectoryWatcher
final _events = StreamController<WatchEvent>.broadcast();

@override
bool get isReady => _ready.isCompleted;
bool get isReady => _readyCompleter.isCompleted;

@override
Future<void> get ready => _ready.future;
final _ready = Completer<void>();
Future<void> get ready => _readyCompleter.future;
final _readyCompleter = Completer<void>();

/// The amount of time the watcher pauses between successive polls of the
/// directory contents.
Expand Down Expand Up @@ -119,6 +119,10 @@ class _PollingDirectoryWatcher
if (entity is! File) return;
_filesToProcess.add(entity.path);
}, onError: (Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
if (!isDirectoryNotFoundException(error)) {
// It's some unknown error. Pipe it over to the event stream so the
// user can see it.
Expand Down Expand Up @@ -177,7 +181,7 @@ class _PollingDirectoryWatcher
_lastModifieds.remove(removed);
}

if (!isReady) _ready.complete();
if (!isReady) _readyCompleter.complete();

// Wait and then poll again.
await Future.delayed(_pollingDelay);
Expand Down
8 changes: 7 additions & 1 deletion lib/src/directory_watcher/windows.dart
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ class _WindowsDirectoryWatcher
_listDir().then((_) {
_startWatch();
_startParentWatcher();
_readyCompleter.complete();
if (!isReady) {
_readyCompleter.complete();
}
});
}

Expand Down Expand Up @@ -427,6 +429,10 @@ class _WindowsDirectoryWatcher

/// Emit an error, then close the watcher.
void _emitError(Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
_eventsController.addError(error, stackTrace);
close();
}
Expand Down
2 changes: 1 addition & 1 deletion lib/src/utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';
import 'dart:collection';
import 'dart:io';

/// Returns `true` if [error] is a [FileSystemException] for a missing
/// directory.
Expand Down
3 changes: 3 additions & 0 deletions lib/watcher.dart
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ abstract class Watcher {
///
/// If the watcher is already monitoring, this returns an already complete
/// future.
///
/// This future always completes successfully as errors are provided through
/// the [events] stream.
Future get ready;

/// Creates a new [DirectoryWatcher] or [FileWatcher] monitoring [path],
Expand Down
13 changes: 13 additions & 0 deletions test/ready/shared.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,17 @@ void sharedTests() {
// Should be back to not ready.
expect(watcher.ready, doesNotComplete);
});

test('completes even if directory does not exist', () async {
var watcher = createWatcher(path: 'does/not/exist');

// Subscribe to the events (else ready will never fire).
var subscription = watcher.events.listen((event) {}, onError: (error) {});

// Expect ready still completes.
await watcher.ready;

// Now unsubscribe.
await subscription.cancel();
});
}

0 comments on commit f76997a

Please sign in to comment.