Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GRDB 7: MainActor ValueObservation scheduling #1633

Merged
merged 4 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 36 additions & 12 deletions GRDB/Documentation.docc/Extension/ValueObservation.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ See <doc:GRDB/ValueObservation#Dealing-with-Undetected-Changes> below for the li

By default, `ValueObservation` notifies a fresh value whenever any component of its fetched value is modified (any fetched column, row, etc.). This can be configured: see <doc:ValueObservation#Specifying-the-Tracked-Region>.

By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main dispatch queue, asynchronously. This can be configured: see <doc:ValueObservation#ValueObservation-Scheduling>.
By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main actor, asynchronously. This can be configured: see <doc:ValueObservation#ValueObservation-Scheduling>.

By default, `ValueObservation` fetches a fresh value immediately after a change is committed in the database. In particular, modifying the database on the main thread triggers a fetch on the main thread as well. This behavior can be configured: see <doc:ValueObservation#ValueObservation-Scheduling>.

Expand All @@ -98,40 +98,38 @@ The database observation stops when the cancellable returned by the `start` meth

## ValueObservation Scheduling

By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main dispatch queue, asynchronously:
By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main actor, asynchronously:

```swift
// The default scheduling
let cancellable = observation.start(in: dbQueue) { error in
// Called asynchronously on the main dispatch queue
// Called asynchronously on the main actor
} onChange: { value in
// Called asynchronously on the main dispatch queue
// Called asynchronously on the main actor
print("Fresh value", value)
}
```

You can change this behavior by adding a `scheduling` argument to the `start()` method.

For example, the ``ValueObservationScheduler/immediate`` scheduler notifies all values on the main dispatch queue, and notifies the first one immediately when the observation starts.
For example, the ``ValueObservationMainActorScheduler/immediate`` scheduler notifies all values on the main actor, and notifies the first one immediately when the observation starts.

It is very useful in graphic applications, because you can configure views right away, without waiting for the initial value to be fetched eventually. You don't have to implement any empty or loading screen, or to prevent some undesired initial animation. Take care that the user interface is not responsive during the fetch of the first value, so only use the `immediate` scheduling for very fast database requests!

The `immediate` scheduling requires that the observation starts from the main dispatch queue (a fatal error is raised otherwise):

```swift
// Immediate scheduling notifies
// the initial value right on subscription.
let cancellable = observation
.start(in: dbQueue, scheduling: .immediate) { error in
// Called on the main dispatch queue
// Called on the main actor
} onChange: { value in
// Called on the main dispatch queue
// Called on the main actor
print("Fresh value", value)
}
// <- Here "Fresh value" has already been printed.
```

The other built-in scheduler ``ValueObservationScheduler/async(onQueue:)`` asynchronously schedules values and errors on the dispatch queue of your choice. Make sure you provide a serial queue, because a concurrent one such as `DispachQueue.global(qos: .default)` would mess with the ordering of fresh value notifications:
The ``ValueObservationScheduler/async(onQueue:)`` scheduler asynchronously schedules values and errors on the dispatch queue of your choice. Make sure you provide a serial dispatch queue, because a concurrent one such as `DispachQueue.global(qos: .default)` would mess with the ordering of fresh value notifications:

```swift
// Async scheduling notifies all values
Expand All @@ -146,9 +144,33 @@ let cancellable = observation
}
```

The ``ValueObservationScheduler/task`` scheduler asynchronously schedules values and errors on the cooperative thread pool. It is implicitly used when you turn a ValueObservation into an async sequence. You can specify it explicitly when you intend to consume a shared observation as an async sequence:

```swift
do {
for try await players in observation.values(in: dbQueue) {
// Called on the cooperative thread pool
print("Fresh players", players)
}
} catch {
// Handle error
}

let sharedObservation = observation.shared(in: dbQueue, scheduling: .task)
do {
for try await players in sharedObservation.values() {
// Called on the cooperative thread pool
print("Fresh players", players)
}
} catch {
// Handle error
}

```

As described above, the `scheduling` argument controls the execution of the change and error callbacks. You also have some control on the execution of the database fetch:

- With the `.immediate` scheduling, the initial fetch is always performed synchronously, on the main thread, when the observation starts, so that the initial value can be notified immediately.
- With the `.immediate` scheduling, the initial fetch is always performed synchronously, on the main actor, when the observation starts, so that the initial value can be notified immediately.

- With the default `.async` scheduling, the initial fetch is always performed asynchronouly. It never blocks the main thread.

Expand Down Expand Up @@ -290,10 +312,12 @@ When needed, you can help GRDB optimize observations and reduce database content
### Accessing Observed Values

- ``publisher(in:scheduling:)``
- ``start(in:scheduling:onError:onChange:)``
- ``start(in:scheduling:onError:onChange:)-10vwf``
- ``start(in:scheduling:onError:onChange:)-7z197``
- ``values(in:scheduling:bufferingPolicy:)``
- ``DatabaseCancellable``
- ``ValueObservationScheduler``
- ``ValueObservationMainActorScheduler``

### Mapping Values

Expand Down
11 changes: 4 additions & 7 deletions GRDB/ValueObservation/SharedValueObservation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ extension ValueObservation {
/// main dispatch queue. You can change this behavior by providing a
/// scheduler.
///
/// For example, the ``ValueObservationScheduler/immediate`` scheduler
/// notifies all values on the main dispatch queue, and notifies the first
/// one immediately when the
/// For example, the ``ValueObservationMainActorScheduler/immediate``
/// scheduler notifies all values on the main dispatch queue, and
/// notifies the first one immediately when the
/// ``SharedValueObservation/start(onError:onChange:)`` method is called.
/// The `immediate` scheduling requires that the observation starts from the
/// main thread (a fatal error is raised otherwise):
Expand All @@ -111,9 +111,6 @@ extension ValueObservation {
/// // <- here "Fresh players" is already printed.
/// ```
///
/// Note that the `.immediate` scheduler requires that the observation is
/// subscribed from the main thread. It raises a fatal error otherwise.
///
/// - parameter reader: A DatabaseReader.
/// - parameter scheduler: A Scheduler. By default, fresh values are
/// dispatched asynchronously on the main queue.
Expand Down Expand Up @@ -227,7 +224,7 @@ public final class SharedValueObservation<Element: Sendable>: @unchecked Sendabl
/// - parameter onChange: The closure to execute on receipt of a
/// fresh value.
/// - returns: A DatabaseCancellable that can stop the observation.
public func start(
@preconcurrency public func start(
onError: @escaping @Sendable (Error) -> Void,
onChange: @escaping @Sendable (Element) -> Void)
-> AnyDatabaseCancellable
Expand Down
121 changes: 86 additions & 35 deletions GRDB/ValueObservation/ValueObservation.swift
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,54 @@ extension ValueObservation: Refinable {
/// try Player.fetchAll(db)
/// }
///
/// let cancellable = try observation.start(
/// in: dbQueue,
/// scheduling: .async(onQueue: .main))
/// { error in
/// // handle error
/// } onChange: { (players: [Player]) in
/// print("Fresh players: \(players)")
/// }
/// ```
///
/// - parameter reader: A DatabaseReader.
/// - parameter scheduler: A ValueObservationScheduler.
/// - parameter onError: The closure to execute when the
/// observation fails.
/// - parameter onChange: The closure to execute on receipt of a
/// fresh value.
/// - returns: A DatabaseCancellable that can stop the observation.
@preconcurrency public func start(
in reader: some DatabaseReader,
scheduling scheduler: some ValueObservationScheduler,
onError: @escaping @Sendable (Error) -> Void,
onChange: @escaping @Sendable (Reducer.Value) -> Void)
-> AnyDatabaseCancellable
where Reducer: ValueReducer
{
let observation = self.with {
$0.events.didFail = concat($0.events.didFail, onError)
}
observation.events.willStart?()
return reader._add(
observation: observation,
scheduling: scheduler,
onChange: onChange)
}

/// Starts observing the database and notifies fresh values on the
/// main actor.
///
/// The observation lasts until the returned cancellable is cancelled
/// or deallocated.
///
/// For example:
///
/// ```swift
/// let observation = ValueObservation.tracking { db in
/// try Player.fetchAll(db)
/// }
///
/// let cancellable = try observation.start(in: dbQueue) { error in
/// // handle error
/// } onChange: { (players: [Player]) in
Expand All @@ -110,14 +158,8 @@ extension ValueObservation: Refinable {
/// ```
///
/// By default, fresh values are dispatched asynchronously on the
/// main dispatch queue. You can change this behavior by providing a
/// scheduler.
///
/// For example, the ``ValueObservationScheduler/immediate`` scheduler
/// notifies all values on the main dispatch queue, and notifies the first
/// one immediately when the observation starts. The `immediate` scheduling
/// requires that the observation starts from the main dispatch queue (a
/// fatal error is raised otherwise):
/// main actor. Pass `.immediate` if the first value shoud be notified
/// immediately when the observation starts:
///
/// ```swift
/// let cancellable = try observation.start(in: dbQueue, scheduling: .immediate) { error in
Expand All @@ -129,28 +171,37 @@ extension ValueObservation: Refinable {
/// ```
///
/// - parameter reader: A DatabaseReader.
/// - parameter scheduler: A ValueObservationScheduler. By default, fresh
/// values are dispatched asynchronously on the main queue.
/// - parameter onError: The closure to execute when the observation fails.
/// - parameter scheduler: A ValueObservationMainActorScheduler.
/// By default, fresh values are dispatched asynchronously on the
/// main actor.
/// - parameter onError: The closure to execute when the
/// observation fails.
/// - parameter onChange: The closure to execute on receipt of a
/// fresh value.
/// - returns: A DatabaseCancellable that can stop the observation.
public func start(
@available(iOS 13, macOS 10.15, tvOS 13, *)
@preconcurrency @MainActor public func start(
in reader: some DatabaseReader,
scheduling scheduler: some ValueObservationScheduler = .async(onQueue: .main),
onError: @escaping @Sendable (Error) -> Void,
onChange: @escaping @Sendable (Reducer.Value) -> Void)
scheduling scheduler: some ValueObservationMainActorScheduler = .mainActor,
onError: @escaping @MainActor (Error) -> Void,
onChange: @escaping @MainActor (Reducer.Value) -> Void)
-> AnyDatabaseCancellable
where Reducer: ValueReducer
{
let observation = self.with {
$0.events.didFail = concat($0.events.didFail, onError)
}
observation.events.willStart?()
return reader._add(
observation: observation,
scheduling: scheduler,
onChange: onChange)
let regularScheduler: some ValueObservationScheduler = scheduler
return start(
in: reader,
scheduling: regularScheduler,
onError: { error in
MainActor.assumeIsolated {
onError(error)
}
},
onChange: { value in
MainActor.assumeIsolated {
onChange(value)
}
})
}

// MARK: - Debugging
Expand Down Expand Up @@ -295,14 +346,14 @@ extension ValueObservation {
/// ```
///
/// - parameter reader: A DatabaseReader.
/// - parameter scheduler: A ValueObservationScheduler. By default, fresh
/// values are dispatched asynchronously on the main dispatch queue.
/// - parameter scheduler: A ValueObservationScheduler. By default,
/// fresh values are dispatched on the cooperative thread pool.
/// - parameter bufferingPolicy: see the documntation
/// of `AsyncThrowingStream`.
@available(iOS 13, macOS 10.15, tvOS 13, *)
public func values(
in reader: some DatabaseReader,
scheduling scheduler: some ValueObservationScheduler = .async(onQueue: .main),
scheduling scheduler: some ValueObservationScheduler = .task,
bufferingPolicy: AsyncValueObservation<Reducer.Value>.BufferingPolicy = .unbounded)
-> AsyncValueObservation<Reducer.Value>
where Reducer: ValueReducer
Expand Down Expand Up @@ -416,11 +467,11 @@ extension ValueObservation {
/// main dispatch queue. You can change this behavior by providing a
/// scheduler.
///
/// For example, the ``ValueObservationScheduler/immediate`` scheduler
/// notifies all values on the main dispatch queue, and notifies the first
/// one immediately when the observation starts. The `immediate` scheduling
/// requires that the observation starts from the main dispatch queue (a
/// fatal error is raised otherwise):
/// For example, the ``ValueObservationMainActorScheduler/immediate``
/// scheduler notifies all values on the main dispatch queue, and
/// notifies the first one immediately when the observation starts. The
/// `immediate` scheduling requires that the observation starts from the
/// main dispatch queue (a fatal error is raised otherwise):
///
/// ```swift
/// let publisher = observation.publisher(in: dbQueue, scheduling: .immediate)
Expand Down Expand Up @@ -736,7 +787,7 @@ extension ValueObservation {
/// ```
///
/// - parameter fetch: The closure that fetches the observed value.
public static func trackingConstantRegion<Value>(
@preconcurrency public static func trackingConstantRegion<Value>(
_ fetch: @escaping @Sendable (Database) throws -> Value)
-> Self
where Reducer == ValueReducers.Fetch<Value>
Expand Down Expand Up @@ -808,7 +859,7 @@ extension ValueObservation {
/// - parameter otherRegions: A list of supplementary regions
/// to observe.
/// - parameter fetch: The closure that fetches the observed value.
public static func tracking<Value>(
@preconcurrency public static func tracking<Value>(
region: any DatabaseRegionConvertible,
_ otherRegions: any DatabaseRegionConvertible...,
fetch: @escaping @Sendable (Database) throws -> Value)
Expand Down Expand Up @@ -878,7 +929,7 @@ extension ValueObservation {
///
/// - parameter regions: An array of observed regions.
/// - parameter fetch: The closure that fetches the observed value.
public static func tracking<Value>(
@preconcurrency public static func tracking<Value>(
regions: [any DatabaseRegionConvertible],
fetch: @escaping @Sendable (Database) throws -> Value)
-> Self
Expand Down Expand Up @@ -936,7 +987,7 @@ extension ValueObservation {
/// ```
///
/// - parameter fetch: The closure that fetches the observed value.
public static func tracking<Value>(
@preconcurrency public static func tracking<Value>(
_ fetch: @escaping @Sendable (Database) throws -> Value)
-> Self
where Reducer == ValueReducers.Fetch<Value>
Expand Down
Loading