diff --git a/GRDB/Documentation.docc/Extension/ValueObservation.md b/GRDB/Documentation.docc/Extension/ValueObservation.md index 5c4664e1f1..f2535e93e6 100644 --- a/GRDB/Documentation.docc/Extension/ValueObservation.md +++ b/GRDB/Documentation.docc/Extension/ValueObservation.md @@ -78,7 +78,7 @@ See below for the li By default, `ValueObservation` notifies a fresh value whenever any component of its fetched value is modified (any fetched column, row, etc.). This can be configured: see . -By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main dispatch queue, asynchronously. This can be configured: see . +By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main actor, asynchronously. This can be configured: see . By default, `ValueObservation` fetches a fresh value immediately after a change is committed in the database. In particular, modifying the database on the main thread triggers a fetch on the main thread as well. This behavior can be configured: see . @@ -98,40 +98,38 @@ The database observation stops when the cancellable returned by the `start` meth ## ValueObservation Scheduling -By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main dispatch queue, asynchronously: +By default, `ValueObservation` notifies the initial value, as well as eventual changes and errors, on the main actor, asynchronously: ```swift // The default scheduling let cancellable = observation.start(in: dbQueue) { error in - // Called asynchronously on the main dispatch queue + // Called asynchronously on the main actor } onChange: { value in - // Called asynchronously on the main dispatch queue + // Called asynchronously on the main actor print("Fresh value", value) } ``` You can change this behavior by adding a `scheduling` argument to the `start()` method. -For example, the ``ValueObservationScheduler/immediate`` scheduler notifies all values on the main dispatch queue, and notifies the first one immediately when the observation starts. +For example, the ``ValueObservationMainActorScheduler/immediate`` scheduler notifies all values on the main actor, and notifies the first one immediately when the observation starts. It is very useful in graphic applications, because you can configure views right away, without waiting for the initial value to be fetched eventually. You don't have to implement any empty or loading screen, or to prevent some undesired initial animation. Take care that the user interface is not responsive during the fetch of the first value, so only use the `immediate` scheduling for very fast database requests! -The `immediate` scheduling requires that the observation starts from the main dispatch queue (a fatal error is raised otherwise): - ```swift // Immediate scheduling notifies // the initial value right on subscription. let cancellable = observation .start(in: dbQueue, scheduling: .immediate) { error in - // Called on the main dispatch queue + // Called on the main actor } onChange: { value in - // Called on the main dispatch queue + // Called on the main actor print("Fresh value", value) } // <- Here "Fresh value" has already been printed. ``` -The other built-in scheduler ``ValueObservationScheduler/async(onQueue:)`` asynchronously schedules values and errors on the dispatch queue of your choice. Make sure you provide a serial queue, because a concurrent one such as `DispachQueue.global(qos: .default)` would mess with the ordering of fresh value notifications: +The ``ValueObservationScheduler/async(onQueue:)`` scheduler asynchronously schedules values and errors on the dispatch queue of your choice. Make sure you provide a serial dispatch queue, because a concurrent one such as `DispachQueue.global(qos: .default)` would mess with the ordering of fresh value notifications: ```swift // Async scheduling notifies all values @@ -146,9 +144,33 @@ let cancellable = observation } ``` +The ``ValueObservationScheduler/task`` scheduler asynchronously schedules values and errors on the cooperative thread pool. It is implicitly used when you turn a ValueObservation into an async sequence. You can specify it explicitly when you intend to consume a shared observation as an async sequence: + +```swift +do { + for try await players in observation.values(in: dbQueue) { + // Called on the cooperative thread pool + print("Fresh players", players) + } +} catch { + // Handle error +} + +let sharedObservation = observation.shared(in: dbQueue, scheduling: .task) +do { + for try await players in sharedObservation.values() { + // Called on the cooperative thread pool + print("Fresh players", players) + } +} catch { + // Handle error +} + +``` + As described above, the `scheduling` argument controls the execution of the change and error callbacks. You also have some control on the execution of the database fetch: -- With the `.immediate` scheduling, the initial fetch is always performed synchronously, on the main thread, when the observation starts, so that the initial value can be notified immediately. +- With the `.immediate` scheduling, the initial fetch is always performed synchronously, on the main actor, when the observation starts, so that the initial value can be notified immediately. - With the default `.async` scheduling, the initial fetch is always performed asynchronouly. It never blocks the main thread. @@ -290,10 +312,12 @@ When needed, you can help GRDB optimize observations and reduce database content ### Accessing Observed Values - ``publisher(in:scheduling:)`` -- ``start(in:scheduling:onError:onChange:)`` +- ``start(in:scheduling:onError:onChange:)-10vwf`` +- ``start(in:scheduling:onError:onChange:)-7z197`` - ``values(in:scheduling:bufferingPolicy:)`` - ``DatabaseCancellable`` - ``ValueObservationScheduler`` +- ``ValueObservationMainActorScheduler`` ### Mapping Values diff --git a/GRDB/ValueObservation/SharedValueObservation.swift b/GRDB/ValueObservation/SharedValueObservation.swift index 2208ae4d1b..b3881a59cf 100644 --- a/GRDB/ValueObservation/SharedValueObservation.swift +++ b/GRDB/ValueObservation/SharedValueObservation.swift @@ -87,9 +87,9 @@ extension ValueObservation { /// main dispatch queue. You can change this behavior by providing a /// scheduler. /// - /// For example, the ``ValueObservationScheduler/immediate`` scheduler - /// notifies all values on the main dispatch queue, and notifies the first - /// one immediately when the + /// For example, the ``ValueObservationMainActorScheduler/immediate`` + /// scheduler notifies all values on the main dispatch queue, and + /// notifies the first one immediately when the /// ``SharedValueObservation/start(onError:onChange:)`` method is called. /// The `immediate` scheduling requires that the observation starts from the /// main thread (a fatal error is raised otherwise): @@ -111,9 +111,6 @@ extension ValueObservation { /// // <- here "Fresh players" is already printed. /// ``` /// - /// Note that the `.immediate` scheduler requires that the observation is - /// subscribed from the main thread. It raises a fatal error otherwise. - /// /// - parameter reader: A DatabaseReader. /// - parameter scheduler: A Scheduler. By default, fresh values are /// dispatched asynchronously on the main queue. @@ -227,7 +224,7 @@ public final class SharedValueObservation: @unchecked Sendabl /// - parameter onChange: The closure to execute on receipt of a /// fresh value. /// - returns: A DatabaseCancellable that can stop the observation. - public func start( + @preconcurrency public func start( onError: @escaping @Sendable (Error) -> Void, onChange: @escaping @Sendable (Element) -> Void) -> AnyDatabaseCancellable diff --git a/GRDB/ValueObservation/ValueObservation.swift b/GRDB/ValueObservation/ValueObservation.swift index 32abcd97f8..3b0de8ef9e 100644 --- a/GRDB/ValueObservation/ValueObservation.swift +++ b/GRDB/ValueObservation/ValueObservation.swift @@ -102,6 +102,54 @@ extension ValueObservation: Refinable { /// try Player.fetchAll(db) /// } /// + /// let cancellable = try observation.start( + /// in: dbQueue, + /// scheduling: .async(onQueue: .main)) + /// { error in + /// // handle error + /// } onChange: { (players: [Player]) in + /// print("Fresh players: \(players)") + /// } + /// ``` + /// + /// - parameter reader: A DatabaseReader. + /// - parameter scheduler: A ValueObservationScheduler. + /// - parameter onError: The closure to execute when the + /// observation fails. + /// - parameter onChange: The closure to execute on receipt of a + /// fresh value. + /// - returns: A DatabaseCancellable that can stop the observation. + @preconcurrency public func start( + in reader: some DatabaseReader, + scheduling scheduler: some ValueObservationScheduler, + onError: @escaping @Sendable (Error) -> Void, + onChange: @escaping @Sendable (Reducer.Value) -> Void) + -> AnyDatabaseCancellable + where Reducer: ValueReducer + { + let observation = self.with { + $0.events.didFail = concat($0.events.didFail, onError) + } + observation.events.willStart?() + return reader._add( + observation: observation, + scheduling: scheduler, + onChange: onChange) + } + + /// Starts observing the database and notifies fresh values on the + /// main actor. + /// + /// The observation lasts until the returned cancellable is cancelled + /// or deallocated. + /// + /// For example: + /// + /// ```swift + /// let observation = ValueObservation.tracking { db in + /// try Player.fetchAll(db) + /// } + /// /// let cancellable = try observation.start(in: dbQueue) { error in /// // handle error /// } onChange: { (players: [Player]) in @@ -110,14 +158,8 @@ extension ValueObservation: Refinable { /// ``` /// /// By default, fresh values are dispatched asynchronously on the - /// main dispatch queue. You can change this behavior by providing a - /// scheduler. - /// - /// For example, the ``ValueObservationScheduler/immediate`` scheduler - /// notifies all values on the main dispatch queue, and notifies the first - /// one immediately when the observation starts. The `immediate` scheduling - /// requires that the observation starts from the main dispatch queue (a - /// fatal error is raised otherwise): + /// main actor. Pass `.immediate` if the first value shoud be notified + /// immediately when the observation starts: /// /// ```swift /// let cancellable = try observation.start(in: dbQueue, scheduling: .immediate) { error in @@ -129,28 +171,37 @@ extension ValueObservation: Refinable { /// ``` /// /// - parameter reader: A DatabaseReader. - /// - parameter scheduler: A ValueObservationScheduler. By default, fresh - /// values are dispatched asynchronously on the main queue. - /// - parameter onError: The closure to execute when the observation fails. + /// - parameter scheduler: A ValueObservationMainActorScheduler. + /// By default, fresh values are dispatched asynchronously on the + /// main actor. + /// - parameter onError: The closure to execute when the + /// observation fails. /// - parameter onChange: The closure to execute on receipt of a /// fresh value. /// - returns: A DatabaseCancellable that can stop the observation. - public func start( + @available(iOS 13, macOS 10.15, tvOS 13, *) + @preconcurrency @MainActor public func start( in reader: some DatabaseReader, - scheduling scheduler: some ValueObservationScheduler = .async(onQueue: .main), - onError: @escaping @Sendable (Error) -> Void, - onChange: @escaping @Sendable (Reducer.Value) -> Void) + scheduling scheduler: some ValueObservationMainActorScheduler = .mainActor, + onError: @escaping @MainActor (Error) -> Void, + onChange: @escaping @MainActor (Reducer.Value) -> Void) -> AnyDatabaseCancellable where Reducer: ValueReducer { - let observation = self.with { - $0.events.didFail = concat($0.events.didFail, onError) - } - observation.events.willStart?() - return reader._add( - observation: observation, - scheduling: scheduler, - onChange: onChange) + let regularScheduler: some ValueObservationScheduler = scheduler + return start( + in: reader, + scheduling: regularScheduler, + onError: { error in + MainActor.assumeIsolated { + onError(error) + } + }, + onChange: { value in + MainActor.assumeIsolated { + onChange(value) + } + }) } // MARK: - Debugging @@ -295,14 +346,14 @@ extension ValueObservation { /// ``` /// /// - parameter reader: A DatabaseReader. - /// - parameter scheduler: A ValueObservationScheduler. By default, fresh - /// values are dispatched asynchronously on the main dispatch queue. + /// - parameter scheduler: A ValueObservationScheduler. By default, + /// fresh values are dispatched on the cooperative thread pool. /// - parameter bufferingPolicy: see the documntation /// of `AsyncThrowingStream`. @available(iOS 13, macOS 10.15, tvOS 13, *) public func values( in reader: some DatabaseReader, - scheduling scheduler: some ValueObservationScheduler = .async(onQueue: .main), + scheduling scheduler: some ValueObservationScheduler = .task, bufferingPolicy: AsyncValueObservation.BufferingPolicy = .unbounded) -> AsyncValueObservation where Reducer: ValueReducer @@ -416,11 +467,11 @@ extension ValueObservation { /// main dispatch queue. You can change this behavior by providing a /// scheduler. /// - /// For example, the ``ValueObservationScheduler/immediate`` scheduler - /// notifies all values on the main dispatch queue, and notifies the first - /// one immediately when the observation starts. The `immediate` scheduling - /// requires that the observation starts from the main dispatch queue (a - /// fatal error is raised otherwise): + /// For example, the ``ValueObservationMainActorScheduler/immediate`` + /// scheduler notifies all values on the main dispatch queue, and + /// notifies the first one immediately when the observation starts. The + /// `immediate` scheduling requires that the observation starts from the + /// main dispatch queue (a fatal error is raised otherwise): /// /// ```swift /// let publisher = observation.publisher(in: dbQueue, scheduling: .immediate) @@ -736,7 +787,7 @@ extension ValueObservation { /// ``` /// /// - parameter fetch: The closure that fetches the observed value. - public static func trackingConstantRegion( + @preconcurrency public static func trackingConstantRegion( _ fetch: @escaping @Sendable (Database) throws -> Value) -> Self where Reducer == ValueReducers.Fetch @@ -808,7 +859,7 @@ extension ValueObservation { /// - parameter otherRegions: A list of supplementary regions /// to observe. /// - parameter fetch: The closure that fetches the observed value. - public static func tracking( + @preconcurrency public static func tracking( region: any DatabaseRegionConvertible, _ otherRegions: any DatabaseRegionConvertible..., fetch: @escaping @Sendable (Database) throws -> Value) @@ -878,7 +929,7 @@ extension ValueObservation { /// /// - parameter regions: An array of observed regions. /// - parameter fetch: The closure that fetches the observed value. - public static func tracking( + @preconcurrency public static func tracking( regions: [any DatabaseRegionConvertible], fetch: @escaping @Sendable (Database) throws -> Value) -> Self @@ -936,7 +987,7 @@ extension ValueObservation { /// ``` /// /// - parameter fetch: The closure that fetches the observed value. - public static func tracking( + @preconcurrency public static func tracking( _ fetch: @escaping @Sendable (Database) throws -> Value) -> Self where Reducer == ValueReducers.Fetch diff --git a/GRDB/ValueObservation/ValueObservationScheduler.swift b/GRDB/ValueObservation/ValueObservationScheduler.swift index 3e4d9fdce8..b4080efb32 100644 --- a/GRDB/ValueObservation/ValueObservationScheduler.swift +++ b/GRDB/ValueObservation/ValueObservationScheduler.swift @@ -9,8 +9,11 @@ import Foundation /// /// - ``async(onQueue:)`` /// - ``immediate`` +/// - ``mainActor`` +/// - ``task`` +/// - ``task(priority:)`` /// - ``AsyncValueObservationScheduler`` -/// - ``ImmediateValueObservationScheduler`` +/// - ``TaskValueObservationScheduler`` public protocol ValueObservationScheduler: Sendable { /// Returns whether the initial value should be immediately notified. /// @@ -30,6 +33,29 @@ extension ValueObservationScheduler { } } +// MARK: - ValueObservationMainActorScheduler + +/// A type that determines when `ValueObservation` notifies its fresh +/// values, on the main actor. +/// +/// ## Topics +/// +/// ### Built-In Schedulers +/// +/// - ``immediate`` +/// - ``ValueObservationScheduler/mainActor`` +/// - ``ImmediateValueObservationScheduler`` +/// - ``DelayedMainActorValueObservationScheduler`` +public protocol ValueObservationMainActorScheduler: ValueObservationScheduler { + func scheduleOnMainActor(_ action: @escaping @MainActor () -> Void) +} + +extension ValueObservationMainActorScheduler { + public func schedule(_ action: @escaping @Sendable () -> Void) { + scheduleOnMainActor(action) + } +} + // MARK: - AsyncValueObservationScheduler /// A scheduler that asynchronously notifies fresh value of a `DispatchQueue`. @@ -77,10 +103,10 @@ extension ValueObservationScheduler where Self == AsyncValueObservationScheduler // MARK: - ImmediateValueObservationScheduler -/// A scheduler that notifies all values on the main `DispatchQueue`. The +/// A scheduler that notifies all values on the main actor. The /// first value is immediately notified when the `ValueObservation` /// is started. -public struct ImmediateValueObservationScheduler: ValueObservationScheduler, Sendable { +public struct ImmediateValueObservationScheduler: ValueObservationMainActorScheduler { public init() { } public func immediateInitialValue() -> Bool { @@ -90,13 +116,42 @@ public struct ImmediateValueObservationScheduler: ValueObservationScheduler, Sen return true } - public func schedule(_ action: @escaping @Sendable () -> Void) { + public func scheduleOnMainActor(_ action: @escaping @MainActor () -> Void) { DispatchQueue.main.async(execute: action) } } extension ValueObservationScheduler where Self == ImmediateValueObservationScheduler { - /// A scheduler that notifies all values on the main `DispatchQueue`. The + /// A scheduler that notifies all values on the main actor. The + /// first value is immediately notified when the `ValueObservation` + /// is started. + /// + /// For example: + /// + /// ```swift + /// let observation = ValueObservation.tracking { db in + /// try Player.fetchAll(db) + /// } + /// + /// let cancellable = try observation.start( + /// in: dbQueue, + /// scheduling: .immediate, + /// onError: { error in ... }, + /// onChange: { (players: [Player]) in + /// print("fresh players: \(players)") + /// }) + /// // <- here "fresh players" is already printed. + /// ``` + /// + /// - important: this scheduler requires that the observation is started + /// from the main actor. A fatal error is raised otherwise. + public static var immediate: ImmediateValueObservationScheduler { + ImmediateValueObservationScheduler() + } +} + +extension ValueObservationMainActorScheduler where Self == ImmediateValueObservationScheduler { + /// A scheduler that notifies all values on the main actor. The /// first value is immediately notified when the `ValueObservation` /// is started. /// @@ -118,8 +173,77 @@ extension ValueObservationScheduler where Self == ImmediateValueObservationSched /// ``` /// /// - important: this scheduler requires that the observation is started - /// from the main queue. A fatal error is raised otherwise. + /// from the main actor. A fatal error is raised otherwise. public static var immediate: ImmediateValueObservationScheduler { ImmediateValueObservationScheduler() } } + +// MARK: - TaskValueObservationScheduler + +/// A scheduler that notifies all values on the cooperative thread pool. +@available(iOS 13, macOS 10.15, tvOS 13, *) +public final class TaskValueObservationScheduler: ValueObservationScheduler { + typealias Action = @Sendable () -> Void + let continuation: AsyncStream.Continuation + let task: Task + + init(priority: TaskPriority?) { + let (stream, continuation) = AsyncStream.makeStream(of: Action.self) + + self.continuation = continuation + self.task = Task(priority: priority) { + for await action in stream { + action() + } + } + } + + deinit { + task.cancel() + } + + public func immediateInitialValue() -> Bool { + false + } + + public func schedule(_ action: @escaping @Sendable () -> Void) { + continuation.yield(action) + } +} + +@available(iOS 13, macOS 10.15, tvOS 13, *) +extension ValueObservationScheduler where Self == TaskValueObservationScheduler { + /// A scheduler that notifies all values from a new `Task`. + public static var task: TaskValueObservationScheduler { + TaskValueObservationScheduler(priority: nil) + } + + /// A scheduler that notifies all values from a new `Task` with the + /// given priority. + public static func task(priority: TaskPriority) -> TaskValueObservationScheduler { + TaskValueObservationScheduler(priority: priority) + } +} + +// MARK: - DelayedMainActorValueObservationScheduler + +/// A scheduler that notifies all values on the cooperative thread pool. +@available(iOS 13, macOS 10.15, tvOS 13, *) +public final class DelayedMainActorValueObservationScheduler: ValueObservationMainActorScheduler { + public func immediateInitialValue() -> Bool { + false + } + + public func scheduleOnMainActor(_ action: @escaping @MainActor () -> Void) { + DispatchQueue.main.async(execute: action) + } +} + +@available(iOS 13, macOS 10.15, tvOS 13, *) +extension ValueObservationScheduler where Self == DelayedMainActorValueObservationScheduler { + /// A scheduler that notifies all values on the main actor. + public static var mainActor: DelayedMainActorValueObservationScheduler { + DelayedMainActorValueObservationScheduler() + } +} diff --git a/TODO.md b/TODO.md index de09c24f3e..09cb7ee9fc 100644 --- a/TODO.md +++ b/TODO.md @@ -88,7 +88,7 @@ - [ ] What can we do with `cross-module-optimization`? See https://github.com/apple/swift-homomorphic-encryption - [X] GRDB7/BREAKING: insertAndFetch, saveAndFetch, and updateAndFetch no longer return optionals (32f41472) -- [ ] GRDB7/BREAKING: AsyncValueObservation does not need any scheduler (83c0e643) +- [X] GRDB7/BREAKING: AsyncValueObservation does not need any scheduler (83c0e643) - [X] GRDB7/BREAKING: Stop exporting SQLite (679d6463) - [X] GRDB7/BREAKING: Remove Configuration.defaultTransactionKind (2661ff46) - [X] GRDB7: Replace LockedBox with Mutex (00ccab06) @@ -158,7 +158,8 @@ - [X] GRDB7: Sendable: DatabaseRegionObservation (b4ff52fb) - [-] GRDB7: DispatchQueue.asyncSending (7b075e6b) - [X] GRDB7: Replace sequences with collection (e.g. https://github.com/tidal-music/tidal-sdk-ios/pull/39) -- [ ] GRDB7: Replace `some` DatabaseReader/Writer with `any` where possible, in order to avoid issues with accessing DatabaseContext from GRDBQuery (if the problem exists in Xcode 16) +- [ ] GRDB7: Replace `some` DatabaseReader/Writer with `any` where possible, in order to avoid issues with accessing DatabaseContext from GRDBQuery (if the problem exists in Xcode 16) +- [ ] GRDB7: bump to iOS 13, macOS 10.15, tvOS 13 (for ValueObservation support for MainActor) - [?] GRDB7: Change ValueObservation callback argument so that it could expose snapshots? https://github.com/groue/GRDB.swift/discussions/1523#discussioncomment-9092500 diff --git a/Tests/GRDBTests/SharedValueObservationTests.swift b/Tests/GRDBTests/SharedValueObservationTests.swift index dc9baa1f9c..2b4ff72163 100644 --- a/Tests/GRDBTests/SharedValueObservationTests.swift +++ b/Tests/GRDBTests/SharedValueObservationTests.swift @@ -525,6 +525,232 @@ class SharedValueObservationTests: GRDBTestCase { XCTAssertEqual(log.flush(), []) } + @available(iOS 13, macOS 10.15, tvOS 13, *) + func test_task_observationLifetime() throws { + let dbQueue = try makeDatabaseQueue() + try dbQueue.write { db in + try db.create(table: "player") { t in + t.autoIncrementedPrimaryKey("id") + } + } + + let log = Log() + var sharedObservation: SharedValueObservation? = ValueObservation + .tracking(Table("player").fetchCount) + .print(to: log) + .shared( + in: dbQueue, + scheduling: .task, + extent: .observationLifetime) + XCTAssertEqual(log.flush(), []) + + // We want to control when the shared observation is deallocated + try withExtendedLifetime(sharedObservation) { sharedObservation in + // --- Start observation 1 + let values1Mutex: Mutex<[Int]> = Mutex([]) + let exp1 = expectation(description: "") + exp1.expectedFulfillmentCount = 2 + exp1.assertForOverFulfill = false + let cancellable1 = sharedObservation!.start( + onError: { XCTFail("Unexpected error \($0)") }, + onChange: { value in + values1Mutex.withLock { $0.append(value) } + exp1.fulfill() + }) + + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + wait(for: [exp1], timeout: 1) + XCTAssertEqual(values1Mutex.load(), [0, 1]) + XCTAssertEqual(log.flush(), [ + "start", "fetch", "tracked region: player(*)", "value: 0", + "database did change", "fetch", "value: 1"]) + + // --- Start observation 2 + let values2Mutex: Mutex<[Int]> = Mutex([]) + let exp2 = expectation(description: "") + exp2.expectedFulfillmentCount = 2 + exp2.assertForOverFulfill = false + let cancellable2 = sharedObservation!.start( + onError: { XCTFail("Unexpected error \($0)") }, + onChange: { value in + values2Mutex.withLock { $0.append(value) } + exp2.fulfill() + }) + + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + wait(for: [exp2], timeout: 1) + XCTAssertEqual(values1Mutex.load(), [0, 1, 2]) + XCTAssertEqual(values2Mutex.load(), [1, 2]) + XCTAssertEqual(log.flush(), ["database did change", "fetch", "value: 2"]) + + // --- Stop observation 1 + cancellable1.cancel() + XCTAssertEqual(log.flush(), []) + + // --- Start observation 3 + let values3Mutex: Mutex<[Int]> = Mutex([]) + let exp3 = expectation(description: "") + exp3.expectedFulfillmentCount = 2 + exp3.assertForOverFulfill = false + let cancellable3 = sharedObservation!.start( + onError: { XCTFail("Unexpected error \($0)") }, + onChange: { value in + values3Mutex.withLock { $0.append(value) } + exp3.fulfill() + }) + + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + wait(for: [exp3], timeout: 1) + XCTAssertEqual(values1Mutex.load(), [0, 1, 2]) + XCTAssertEqual(values2Mutex.load(), [1, 2, 3]) + XCTAssertEqual(values3Mutex.load(), [2, 3]) + XCTAssertEqual(log.flush(), ["database did change", "fetch", "value: 3"]) + + // --- Stop observation 2 + cancellable2.cancel() + XCTAssertEqual(log.flush(), []) + + // --- Stop observation 3 + cancellable3.cancel() + XCTAssertEqual(log.flush(), []) + } + + // --- Release shared observation + sharedObservation = nil + XCTAssertEqual(log.flush(), ["cancel"]) + } + +#if canImport(Combine) + func test_task_publisher() throws { + guard #available(iOS 13, macOS 10.15, tvOS 13, *) else { + throw XCTSkip("Combine is not available") + } + + let dbQueue = try makeDatabaseQueue() + try dbQueue.write { db in + try db.create(table: "player") { t in + t.autoIncrementedPrimaryKey("id") + } + } + + let publisher = ValueObservation + .tracking(Table("player").fetchCount) + .shared(in: dbQueue, scheduling: .task) + .publisher() + + do { + let recorder = publisher.record() + try XCTAssert(recorder.availableElements.get().isEmpty) + try XCTAssertEqual(wait(for: recorder.next(), timeout: 1), 0) + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + try XCTAssertEqual(wait(for: recorder.next(), timeout: 1), 1) + } + + do { + let recorder = publisher.record() + try XCTAssert(recorder.availableElements.get().isEmpty) + try XCTAssertEqual(wait(for: recorder.next(), timeout: 1), 1) + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + try XCTAssertEqual(wait(for: recorder.next(), timeout: 1), 2) + } + } +#endif + + @available(iOS 13, macOS 10.15, tvOS 13, *) + func test_task_whileObserved() throws { + let dbQueue = try makeDatabaseQueue() + try dbQueue.write { db in + try db.create(table: "player") { t in + t.autoIncrementedPrimaryKey("id") + } + } + + let log = Log() + var sharedObservation: SharedValueObservation? = ValueObservation + .tracking(Table("player").fetchCount) + .print(to: log) + .shared( + in: dbQueue, + scheduling: .task, + extent: .whileObserved) + XCTAssertEqual(log.flush(), []) + + // We want to control when the shared observation is deallocated + try withExtendedLifetime(sharedObservation) { sharedObservation in + // --- Start observation 1 + let values1Mutex: Mutex<[Int]> = Mutex([]) + let exp1 = expectation(description: "") + exp1.expectedFulfillmentCount = 2 + exp1.assertForOverFulfill = false + let cancellable1 = sharedObservation!.start( + onError: { XCTFail("Unexpected error \($0)") }, + onChange: { value in + values1Mutex.withLock { $0.append(value) } + exp1.fulfill() + }) + + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + wait(for: [exp1], timeout: 1) + XCTAssertEqual(values1Mutex.load(), [0, 1]) + XCTAssertEqual(log.flush(), [ + "start", "fetch", "tracked region: player(*)", "value: 0", + "database did change", "fetch", "value: 1"]) + + // --- Start observation 2 + let values2Mutex: Mutex<[Int]> = Mutex([]) + let exp2 = expectation(description: "") + exp2.expectedFulfillmentCount = 2 + exp2.assertForOverFulfill = false + let cancellable2 = sharedObservation!.start( + onError: { XCTFail("Unexpected error \($0)") }, + onChange: { value in + values2Mutex.withLock { $0.append(value) } + exp2.fulfill() + }) + + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + wait(for: [exp2], timeout: 1) + XCTAssertEqual(values1Mutex.load(), [0, 1, 2]) + XCTAssertEqual(values2Mutex.load(), [1, 2]) + XCTAssertEqual(log.flush(), ["database did change", "fetch", "value: 2"]) + + // --- Stop observation 1 + cancellable1.cancel() + XCTAssertEqual(log.flush(), []) + + // --- Start observation 3 + let values3Mutex: Mutex<[Int]> = Mutex([]) + let exp3 = expectation(description: "") + exp3.expectedFulfillmentCount = 2 + exp3.assertForOverFulfill = false + let cancellable3 = sharedObservation!.start( + onError: { XCTFail("Unexpected error \($0)") }, + onChange: { value in + values3Mutex.withLock { $0.append(value) } + exp3.fulfill() + }) + + try dbQueue.write { try $0.execute(sql: "INSERT INTO player DEFAULT VALUES")} + wait(for: [exp3], timeout: 1) + XCTAssertEqual(values1Mutex.load(), [0, 1, 2]) + XCTAssertEqual(values2Mutex.load(), [1, 2, 3]) + XCTAssertEqual(values3Mutex.load(), [2, 3]) + XCTAssertEqual(log.flush(), ["database did change", "fetch", "value: 3"]) + + // --- Stop observation 2 + cancellable2.cancel() + XCTAssertEqual(log.flush(), []) + + // --- Stop observation 3 + cancellable3.cancel() + XCTAssertEqual(log.flush(), ["cancel"]) + } + + // --- Release shared observation + sharedObservation = nil + XCTAssertEqual(log.flush(), []) + } + #if canImport(Combine) func test_error_recovery_observationLifetime() throws { guard #available(iOS 13, macOS 10.15, tvOS 13, *) else { @@ -642,7 +868,7 @@ class SharedValueObservationTests: GRDBTestCase { #endif @available(iOS 13, macOS 10.15, tvOS 13, *) - func testAsyncAwait() async throws { + func testAsyncAwait_mainQueue() async throws { let dbQueue = try makeDatabaseQueue() try await dbQueue.write { db in try db.create(table: "player") { t in @@ -660,6 +886,26 @@ class SharedValueObservationTests: GRDBTestCase { break } } + + @available(iOS 13, macOS 10.15, tvOS 13, *) + func testAsyncAwait_task() async throws { + let dbQueue = try makeDatabaseQueue() + try await dbQueue.write { db in + try db.create(table: "player") { t in + t.autoIncrementedPrimaryKey("id") + } + } + + let values = ValueObservation + .tracking(Table("player").fetchCount) + .shared(in: dbQueue, scheduling: .task) + .values() + + for try await value in values { + XCTAssertEqual(value, 0) + break + } + } } private class Log: TextOutputStream { diff --git a/Tests/GRDBTests/ValueObservationRegionRecordingTests.swift b/Tests/GRDBTests/ValueObservationRegionRecordingTests.swift index 390df4af0a..54d90f9f43 100644 --- a/Tests/GRDBTests/ValueObservationRegionRecordingTests.swift +++ b/Tests/GRDBTests/ValueObservationRegionRecordingTests.swift @@ -105,6 +105,7 @@ class ValueObservationRegionRecordingTests: GRDBTestCase { } } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testTupleObservation() throws { // Here we just test that user can destructure an observed tuple. // I'm completely paranoid about tuple destructuring - I can't wrap my @@ -119,6 +120,7 @@ class ValueObservationRegionRecordingTests: GRDBTestCase { onChange: { (int: Int, string: String) in }) // <- destructure } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testVaryingRegionTrackingImmediateScheduling() throws { let dbQueue = try makeDatabaseQueue() try dbQueue.write { diff --git a/Tests/GRDBTests/ValueObservationTests.swift b/Tests/GRDBTests/ValueObservationTests.swift index 08b59e1d5c..20e0626c70 100644 --- a/Tests/GRDBTests/ValueObservationTests.swift +++ b/Tests/GRDBTests/ValueObservationTests.swift @@ -5,6 +5,7 @@ import Dispatch class ValueObservationTests: GRDBTestCase { // Test passes if it compiles. // See + @available(iOS 13, macOS 10.15, tvOS 13, *) func testStartFromAnyDatabaseReader(reader: any DatabaseReader) { _ = ValueObservation .trackingConstantRegion { _ in } @@ -13,6 +14,7 @@ class ValueObservationTests: GRDBTestCase { // Test passes if it compiles. // See + @available(iOS 13, macOS 10.15, tvOS 13, *) func testStartFromAnyDatabaseWriter(writer: any DatabaseWriter) { _ = ValueObservation .trackingConstantRegion { _ in } @@ -53,6 +55,7 @@ class ValueObservationTests: GRDBTestCase { try test(makeDatabasePool()) } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testErrorCompletesTheObservation() throws { struct TestError: Error { } @@ -102,6 +105,7 @@ class ValueObservationTests: GRDBTestCase { try test(makeDatabasePool()) } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testViewOptimization() throws { let dbQueue = try makeDatabaseQueue() try dbQueue.write { @@ -139,6 +143,7 @@ class ValueObservationTests: GRDBTestCase { } } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testPragmaTableOptimization() throws { let dbQueue = try makeDatabaseQueue() try dbQueue.write { @@ -174,6 +179,7 @@ class ValueObservationTests: GRDBTestCase { // MARK: - Constant Explicit Region + @available(iOS 13, macOS 10.15, tvOS 13, *) func testTrackingExplicitRegion() throws { class TestStream: TextOutputStream { private var stringsMutex: Mutex<[String]> = Mutex([]) @@ -614,6 +620,7 @@ class ValueObservationTests: GRDBTestCase { // MARK: - Cancellation + @available(iOS 13, macOS 10.15, tvOS 13, *) func testCancellableLifetime() throws { // We need something to change let dbQueue = try makeDatabaseQueue() @@ -659,6 +666,7 @@ class ValueObservationTests: GRDBTestCase { XCTAssertEqual(changesCountMutex.load(), 2) } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testCancellableExplicitCancellation() throws { // We need something to change let dbQueue = try makeDatabaseQueue() @@ -796,6 +804,7 @@ class ValueObservationTests: GRDBTestCase { try test(makeDatabasePool()) } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testIssue1550() throws { func test(_ writer: some DatabaseWriter) throws { try writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } @@ -843,6 +852,7 @@ class ValueObservationTests: GRDBTestCase { try test(makeDatabasePool()) } + @available(iOS 13, macOS 10.15, tvOS 13, *) func testIssue1209() throws { func test(_ dbWriter: some DatabaseWriter) throws { try dbWriter.write { @@ -892,60 +902,57 @@ class ValueObservationTests: GRDBTestCase { try test(makeDatabasePool()) } - // MARK: - Async Await - + // MARK: - Main Actor @available(iOS 13, macOS 10.15, tvOS 13, *) - func testAsyncAwait_values_prefix() async throws { - func test(_ writer: some DatabaseWriter) async throws { - // We need something to change - try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } - - let cancellationExpectation = expectation(description: "cancelled") - let task = Task { () -> [Int] in - var counts: [Int] = [] - let observation = ValueObservation - .trackingConstantRegion(Table("t").fetchCount) - .handleEvents(didCancel: { cancellationExpectation.fulfill() }) - - for try await count in try observation.values(in: writer).prefix(while: { $0 <= 3 }) { - counts.append(count) - try await writer.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") } - } - return counts + @MainActor func test_mainActor_observation() throws { + let dbQueue = try makeDatabaseQueue() + try dbQueue.write { db in + try db.create(table: "test") { t in + t.autoIncrementedPrimaryKey("id") } - - let counts = try await task.value - XCTAssertTrue(counts.contains(0)) - XCTAssertTrue(counts.contains(where: { $0 >= 2 })) - XCTAssertEqual(counts.sorted(), counts) - - // Observation was ended -#if compiler(>=5.8) - await fulfillment(of: [cancellationExpectation], timeout: 2) -#else - wait(for: [cancellationExpectation], timeout: 2) -#endif } - try await AsyncTest(test).run { try DatabaseQueue() } - try await AsyncTest(test).runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) } - try await AsyncTest(test).runAtTemporaryDatabasePath { try DatabasePool(path: $0) } + let observation = ValueObservation.tracking { + try Table("test").fetchCount($0) + } + + var value = 0 // No mutex necessary! + let expectation = self.expectation(description: "completion") + let cancellable = observation.start( + in: dbQueue, + onError: { error in XCTFail("Unexpected error: \(error)") }, + onChange: { + value = $0 + if value == 2 { + expectation.fulfill() + } + }) + + try dbQueue.write { db in + try db.execute(sql: "INSERT INTO test DEFAULT VALUES") + try db.execute(sql: "INSERT INTO test DEFAULT VALUES") + } + withExtendedLifetime(cancellable) { _ in + wait(for: [expectation], timeout: 2) + } } + + // MARK: - Async Await @available(iOS 13, macOS 10.15, tvOS 13, *) - func testAsyncAwait_values_prefix_immediate_scheduling() async throws { + func testAsyncAwait_values_prefix() async throws { func test(_ writer: some DatabaseWriter) async throws { // We need something to change try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } let cancellationExpectation = expectation(description: "cancelled") - let task = Task { @MainActor () -> [Int] in + let task = Task { () -> [Int] in var counts: [Int] = [] let observation = ValueObservation .trackingConstantRegion(Table("t").fetchCount) .handleEvents(didCancel: { cancellationExpectation.fulfill() }) - for try await count in try observation.values(in: writer, scheduling: .immediate).prefix(while: { $0 <= 3 }) { + for try await count in try observation.values(in: writer).prefix(while: { $0 <= 3 }) { counts.append(count) try await writer.write { try $0.execute(sql: "INSERT INTO t DEFAULT VALUES") } } @@ -1012,45 +1019,6 @@ class ValueObservationTests: GRDBTestCase { try await AsyncTest(test).runAtTemporaryDatabasePath { try DatabasePool(path: $0) } } - @available(iOS 13, macOS 10.15, tvOS 13, *) - func testAsyncAwait_values_immediate_break() async throws { - func test(_ writer: some DatabaseWriter) async throws { - // We need something to change - try await writer.write { try $0.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)") } - - let cancellationExpectation = expectation(description: "cancelled") - - let task = Task { @MainActor () -> [Int] in - var counts: [Int] = [] - let observation = ValueObservation - .trackingConstantRegion(Table("t").fetchCount) - .handleEvents(didCancel: { cancellationExpectation.fulfill() }) - - for try await count in observation.values(in: writer, scheduling: .immediate) { - counts.append(count) - break - } - return counts - } - - let counts = try await task.value - - // A single value was published - assertValueObservationRecordingMatch(recorded: counts, expected: [0]) - - // Observation was ended -#if compiler(>=5.8) - await fulfillment(of: [cancellationExpectation], timeout: 2) -#else - wait(for: [cancellationExpectation], timeout: 2) -#endif - } - - try await AsyncTest(test).run { try DatabaseQueue() } - try await AsyncTest(test).runAtTemporaryDatabasePath { try DatabaseQueue(path: $0) } - try await AsyncTest(test).runAtTemporaryDatabasePath { try DatabasePool(path: $0) } - } - @available(iOS 13, macOS 10.15, tvOS 13, *) func testAsyncAwait_values_cancelled() async throws { func test(_ writer: some DatabaseWriter) async throws { @@ -1233,6 +1201,7 @@ class ValueObservationTests: GRDBTestCase { } // Regression test for + @available(iOS 13, macOS 10.15, tvOS 13, *) func testIssue1362() throws { func test(_ writer: some DatabaseWriter) throws { try writer.write { try $0.execute(sql: "CREATE TABLE s(id INTEGER PRIMARY KEY AUTOINCREMENT)") } @@ -1323,6 +1292,7 @@ class ValueObservationTests: GRDBTestCase { } // Regression test for + @available(iOS 13, macOS 10.15, tvOS 13, *) func testIssue1383_async() throws { do { let dbPool = try makeDatabasePool(filename: "test")