Skip to content
This repository has been archived by the owner on Feb 10, 2025. It is now read-only.

Ensure ready future completes (with error) in the case the watcher fails and closes. #123

Merged
merged 8 commits into from
Jan 12, 2022
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# 1.0.2-dev

- Require Dart SDK >= 2.14
- Ensure `DirectoryWatcher.ready` completes even when errors occur that close the watcher.

# 1.0.1

61 changes: 41 additions & 20 deletions lib/src/directory_watcher/linux.dart
Original file line number Diff line number Diff line change
@@ -82,20 +82,32 @@ class _LinuxDirectoryWatcher

// Batch the inotify changes together so that we can dedup events.
var innerStream = _nativeEvents.stream.batchEvents();
_listen(innerStream, _onBatch, onError: _eventsController.addError);

_listen(Directory(path).list(recursive: true), (FileSystemEntity entity) {
if (entity is Directory) {
_watchSubdir(entity.path);
} else {
_files.add(entity.path);
_listen(innerStream, _onBatch,
onError: (Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
}, onError: (Object error, StackTrace stackTrace) {
_eventsController.addError(error, stackTrace);
close();
}, onDone: () {
_readyCompleter.complete();
}, cancelOnError: true);
});

_listen(
Directory(path).list(recursive: true),
(FileSystemEntity entity) {
if (entity is Directory) {
_watchSubdir(entity.path);
} else {
_files.add(entity.path);
}
},
onError: _emitError,
onDone: () {
if (!isReady) {
_readyCompleter.complete();
}
},
cancelOnError: true,
);
}

@override
@@ -195,15 +207,15 @@ class _LinuxDirectoryWatcher
// contents,
if (files.contains(path) && _files.contains(path)) continue;
for (var file in _files.remove(path)) {
_emit(ChangeType.REMOVE, file);
_emitEvent(ChangeType.REMOVE, file);
}
}

for (var file in files) {
if (_files.contains(file)) {
_emit(ChangeType.MODIFY, file);
_emitEvent(ChangeType.MODIFY, file);
} else {
_emit(ChangeType.ADD, file);
_emitEvent(ChangeType.ADD, file);
_files.add(file);
}
}
@@ -221,15 +233,14 @@ class _LinuxDirectoryWatcher
_watchSubdir(entity.path);
} else {
_files.add(entity.path);
_emit(ChangeType.ADD, entity.path);
_emitEvent(ChangeType.ADD, entity.path);
}
}, onError: (Object error, StackTrace stackTrace) {
// Ignore an exception caused by the dir not existing. It's fine if it
// was added and then quickly removed.
if (error is FileSystemException) return;

_eventsController.addError(error, stackTrace);
close();
_emitError(error, stackTrace);
}, cancelOnError: true);
}

@@ -242,7 +253,7 @@ class _LinuxDirectoryWatcher
// caused by a MOVE, we need to manually emit events.
if (isReady) {
for (var file in _files.paths) {
_emit(ChangeType.REMOVE, file);
_emitEvent(ChangeType.REMOVE, file);
}
}

@@ -251,12 +262,22 @@ class _LinuxDirectoryWatcher

/// Emits a [WatchEvent] with [type] and [path] if this watcher is in a state
/// to emit events.
void _emit(ChangeType type, String path) {
void _emitEvent(ChangeType type, String path) {
if (!isReady) return;
if (_eventsController.isClosed) return;
_eventsController.add(WatchEvent(type, path));
}

/// Emit an error, then close the watcher.
void _emitError(Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
_eventsController.addError(error, stackTrace);
close();
}

/// Like [Stream.listen], but automatically adds the subscription to
/// [_subscriptions] so that it can be canceled when [close] is called.
void _listen<T>(Stream<T> stream, void Function(T) onData,
17 changes: 14 additions & 3 deletions lib/src/directory_watcher/mac_os.dart
Original file line number Diff line number Diff line change
@@ -87,8 +87,11 @@ class _MacOSDirectoryWatcher
//
// If we do receive a batch of events, [_onBatch] will ensure that these
// futures don't fire and that the directory is re-listed.
Future.wait([_listDir(), _waitForBogusEvents()])
.then((_) => _readyCompleter.complete());
Future.wait([_listDir(), _waitForBogusEvents()]).then((_) {
if (!isReady) {
_readyCompleter.complete();
}
});
}

@override
@@ -115,7 +118,11 @@ class _MacOSDirectoryWatcher
// Cancel the timer because bogus events only occur in the first batch, so
// we can fire [ready] as soon as we're done listing the directory.
_bogusEventTimer.cancel();
_listDir().then((_) => _readyCompleter.complete());
_listDir().then((_) {
if (!isReady) {
_readyCompleter.complete();
}
});
return;
}

@@ -396,6 +403,10 @@ class _MacOSDirectoryWatcher

/// Emit an error, then close the watcher.
void _emitError(Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
_eventsController.addError(error, stackTrace);
close();
}
12 changes: 8 additions & 4 deletions lib/src/directory_watcher/polling.dart
Original file line number Diff line number Diff line change
@@ -43,11 +43,11 @@ class _PollingDirectoryWatcher
final _events = StreamController<WatchEvent>.broadcast();

@override
bool get isReady => _ready.isCompleted;
bool get isReady => _readyCompleter.isCompleted;

@override
Future<void> get ready => _ready.future;
final _ready = Completer<void>();
Future<void> get ready => _readyCompleter.future;
final _readyCompleter = Completer<void>();

/// The amount of time the watcher pauses between successive polls of the
/// directory contents.
@@ -119,6 +119,10 @@ class _PollingDirectoryWatcher
if (entity is! File) return;
_filesToProcess.add(entity.path);
}, onError: (Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
if (!isDirectoryNotFoundException(error)) {
// It's some unknown error. Pipe it over to the event stream so the
// user can see it.
@@ -177,7 +181,7 @@ class _PollingDirectoryWatcher
_lastModifieds.remove(removed);
}

if (!isReady) _ready.complete();
if (!isReady) _readyCompleter.complete();

// Wait and then poll again.
await Future.delayed(_pollingDelay);
8 changes: 7 additions & 1 deletion lib/src/directory_watcher/windows.dart
Original file line number Diff line number Diff line change
@@ -92,7 +92,9 @@ class _WindowsDirectoryWatcher
_listDir().then((_) {
_startWatch();
_startParentWatcher();
_readyCompleter.complete();
if (!isReady) {
_readyCompleter.complete();
}
});
}

@@ -427,6 +429,10 @@ class _WindowsDirectoryWatcher

/// Emit an error, then close the watcher.
void _emitError(Object error, StackTrace stackTrace) {
// Guarantee that ready always completes.
if (!isReady) {
_readyCompleter.complete();
}
_eventsController.addError(error, stackTrace);
close();
}
2 changes: 1 addition & 1 deletion lib/src/utils.dart
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';
import 'dart:collection';
import 'dart:io';

/// Returns `true` if [error] is a [FileSystemException] for a missing
/// directory.
3 changes: 3 additions & 0 deletions lib/watcher.dart
Original file line number Diff line number Diff line change
@@ -42,6 +42,9 @@ abstract class Watcher {
///
/// If the watcher is already monitoring, this returns an already complete
/// future.
///
/// This future always completes successfully as errors are provided through
/// the [events] stream.
Future get ready;

/// Creates a new [DirectoryWatcher] or [FileWatcher] monitoring [path],
13 changes: 13 additions & 0 deletions test/ready/shared.dart
Original file line number Diff line number Diff line change
@@ -61,4 +61,17 @@ void sharedTests() {
// Should be back to not ready.
expect(watcher.ready, doesNotComplete);
});

test('completes even if directory does not exist', () async {
var watcher = createWatcher(path: 'does/not/exist');

// Subscribe to the events (else ready will never fire).
var subscription = watcher.events.listen((event) {}, onError: (error) {});
jakemac53 marked this conversation as resolved.
Show resolved Hide resolved

// Expect ready still completes.
await watcher.ready;

// Now unsubscribe.
await subscription.cancel();
});
}