Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent ValueObservation from notifying duplicate initial values when no concurrent write is observed #940

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions GRDB/Core/DatabasePool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -873,6 +873,13 @@ extension DatabasePool: DatabaseReader {
// tracked database region (we have no way to know).
//
// 3. Install the transaction observer.
//
// When `SQLITE_ENABLE_SNAPSHOT` is not available, we perform a similar
// but weaker check. We compare the number of transaction commits
// performed between this method, and the first access to the writer
// database. Whenever this number has changed, we perform the second
// fetch, even if the actual changes are unrelated to the tracked
// database region (we have no way to know).

#if SQLITE_ENABLE_SNAPSHOT
if scheduler.immediateInitialValue() {
Expand Down Expand Up @@ -906,11 +913,16 @@ extension DatabasePool: DatabaseReader {
}
}
#else
// Grab `attemptedCommitCount` *before* reader transaction has started,
// so that it describes a state of the database from before the
// beginning of snapshot isolation. We are guaranteed to read an initial
// value from this state, or from a later state.
let attemptedCommitCount = self.writer.attemptedCommitCount
if scheduler.immediateInitialValue() {
do {
let initialValue = try read(observer.fetchInitialValue)
onChange(initialValue)
addObserver(observer: observer)
addObserver(observer: observer, from: attemptedCommitCount)
} catch {
observer.complete()
observation.events.didFail?(error)
Expand All @@ -923,7 +935,7 @@ extension DatabasePool: DatabaseReader {
do {
let initialValue = try observer.fetchInitialValue(dbResult.get())
observer.notifyChange(initialValue)
self.addObserver(observer: observer)
self.addObserver(observer: observer, from: attemptedCommitCount)
} catch {
observer.notifyErrorAndComplete(error)
}
Expand All @@ -947,7 +959,7 @@ extension DatabasePool: DatabaseReader {
do {
// Transaction is needed for version snapshotting
try db.inTransaction(.deferred) {
// Keep DatabaseSnaphot alive until we have compared
// Keep `initialSnapshot` alive until we have compared
// database versions. It prevents database checkpointing,
// and keeps versions (`sqlite3_snapshot`) valid
// and comparable.
Expand Down Expand Up @@ -976,15 +988,25 @@ extension DatabasePool: DatabaseReader {
}
#else
// Support for _addConcurrent(observation:)
private func addObserver<Reducer: ValueReducer>(observer: ValueObserver<Reducer>) {
private func addObserver<Reducer: ValueReducer>(
observer: ValueObserver<Reducer>,
from attemptedCommitCount: Int)
{
_weakAsyncWriteWithoutTransaction { db in
guard let db = db else { return }
if observer.isCompleted { return }

do {
observer.events.databaseDidChange?()
if let value = try observer.fetchValue(db) {
observer.notifyChange(value)
// If no commit did happen since the observation was started,
// then we are sure the database was not modified, and we do not
// have to perform a second fetch. Otherwise, we have to suppose
// the commits may have modified the observed value.
let fetchNeeded = db.observationBroker.attemptedCommitCount > attemptedCommitCount
if fetchNeeded {
observer.events.databaseDidChange?()
if let value = try observer.fetchValue(db) {
observer.notifyChange(value)
}
}

// Now we can start observation
Expand Down
7 changes: 7 additions & 0 deletions GRDB/Core/SerializedDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ final class SerializedDatabase {
/// The path to the database file
var path: String

/// The number of attempted SQLite commits.
///
/// See `DatabaseObservationBroker.attemptedCommitCount`
var attemptedCommitCount: Int {
db.observationBroker.attemptedCommitCount
}

/// The dispatch queue
private let queue: DispatchQueue

Expand Down
10 changes: 10 additions & 0 deletions GRDB/Core/TransactionObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,13 @@ class DatabaseObservationBroker {
}
}
}
#if !SQLITE_ENABLE_SNAPSHOT
/// The number of attempted SQLite commits
///
/// This counter is incremented on each `sqlite3_commit_hook` callback: it
/// is allows DatabasePool to optimize its ValueObservation support.
@LockedBox private(set) var attemptedCommitCount = 0
#endif

init(_ database: Database) {
self.database = database
Expand Down Expand Up @@ -585,6 +592,9 @@ class DatabaseObservationBroker {
let broker = Unmanaged<DatabaseObservationBroker>.fromOpaque(brokerPointer!).takeUnretainedValue()
do {
try broker.databaseWillCommit()
#if !SQLITE_ENABLE_SNAPSHOT
broker.$attemptedCommitCount.increment()
#endif
broker.transactionState = .commit
// Next step: updateStatementDidExecute()
return 0
Expand Down
44 changes: 4 additions & 40 deletions Tests/GRDBTests/ValueObservationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class ValueObservationTests: GRDBTestCase {

// MARK: - Snapshot Optimization

func testDisallowedSnapshotOptimizationWithAsyncScheduler() throws {
func testDoubleInitialFetchWithAsyncScheduler() throws {
let dbPool = try makeDatabasePool()
try dbPool.write { db in
try db.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)")
Expand Down Expand Up @@ -183,7 +183,7 @@ class ValueObservationTests: GRDBTestCase {
}
}

func testDisallowedSnapshotOptimizationWithImmediateScheduler() throws {
func testDoubleInitialFetchWithImmediateScheduler() throws {
let dbPool = try makeDatabasePool()
try dbPool.write { db in
try db.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)")
Expand Down Expand Up @@ -223,7 +223,7 @@ class ValueObservationTests: GRDBTestCase {
}
}

func testAllowedSnapshotOptimizationWithAsyncScheduler() throws {
func testSingleInitialFetchWithAsyncScheduler() throws {
let dbPool = try makeDatabasePool()
try dbPool.write { db in
try db.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)")
Expand All @@ -247,7 +247,6 @@ class ValueObservationTests: GRDBTestCase {
return try Int.fetchOne(db, sql: "SELECT COUNT(*) FROM t")!
}

#if SQLITE_ENABLE_SNAPSHOT
let expectation = self.expectation(description: "")
expectation.expectedFulfillmentCount = 2
var observedCounts: [Int] = []
Expand All @@ -263,26 +262,9 @@ class ValueObservationTests: GRDBTestCase {
waitForExpectations(timeout: 2, handler: nil)
XCTAssertEqual(observedCounts, [0, 1])
}
#else
let expectation = self.expectation(description: "")
expectation.expectedFulfillmentCount = 3
var observedCounts: [Int] = []
let cancellable = observation.start(
in: dbPool,
scheduling: .async(onQueue: .main),
onError: { error in XCTFail("Unexpected error: \(error)") },
onChange: { count in
observedCounts.append(count)
expectation.fulfill()
})
withExtendedLifetime(cancellable) {
waitForExpectations(timeout: 2, handler: nil)
XCTAssertEqual(observedCounts, [0, 0, 1])
}
#endif
}

func testAllowedSnapshotOptimizationWithImmediateScheduler() throws {
func testSingleInitialFetchWithImmediateScheduler() throws {
let dbPool = try makeDatabasePool()
try dbPool.write { db in
try db.execute(sql: "CREATE TABLE t(id INTEGER PRIMARY KEY AUTOINCREMENT)")
Expand All @@ -306,7 +288,6 @@ class ValueObservationTests: GRDBTestCase {
return try Int.fetchOne(db, sql: "SELECT COUNT(*) FROM t")!
}

#if SQLITE_ENABLE_SNAPSHOT
let expectation = self.expectation(description: "")
expectation.expectedFulfillmentCount = 2
var observedCounts: [Int] = []
Expand All @@ -322,23 +303,6 @@ class ValueObservationTests: GRDBTestCase {
waitForExpectations(timeout: 2, handler: nil)
XCTAssertEqual(observedCounts, [0, 1])
}
#else
let expectation = self.expectation(description: "")
expectation.expectedFulfillmentCount = 3
var observedCounts: [Int] = []
let cancellable = observation.start(
in: dbPool,
scheduling: .immediate,
onError: { error in XCTFail("Unexpected error: \(error)") },
onChange: { count in
observedCounts.append(count)
expectation.fulfill()
})
withExtendedLifetime(cancellable) {
waitForExpectations(timeout: 2, handler: nil)
XCTAssertEqual(observedCounts, [0, 0, 1])
}
#endif
}

// MARK: - Cancellation
Expand Down