Skip to content

Commit

Permalink
[Fix] Table change updates in write transactions (#13)
Browse files Browse the repository at this point in the history
* trigger update table callbacks only if changes are commited inside transactions
  • Loading branch information
stevensJourney authored Jan 24, 2024
1 parent 7262d47 commit 3bb0212
Show file tree
Hide file tree
Showing 15 changed files with 450 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/chilled-queens-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/react-native-quick-sqlite': minor
---

Added `registerTablesChangedHook` to DB connections which reports batched table updates once `writeTransaction`s and `writeLock`s have been committed. Maintained API compatibility with `registerUpdateHook` which reports table change events as they occur. Added listeners for when write transactions have been committed or rolled back.
12 changes: 6 additions & 6 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
name: Test
runs-on: macos-latest
env:
AVD_NAME: macOS-avd-x86_64-29
AVD_NAME: macOS-avd-x86_64-31
steps:
- uses: actions/checkout@v4
with:
Expand All @@ -24,7 +24,7 @@ jobs:
path: |
~/.android/avd/*
~/.android/adb*
key: avd-29
key: avd-31

- name: Setup NodeJS
uses: actions/setup-node@v2
Expand Down Expand Up @@ -54,9 +54,9 @@ jobs:
- name: create AVD and generate snapshot for caching
if: steps.avd-cache.outputs.cache-hit != 'true'
uses: reactivecircus/android-emulator-runner@v2
uses: reactivecircus/android-emulator-runner@v2.28.0
with:
api-level: 29
api-level: 31
force-avd-creation: false
target: google_apis
arch: x86_64
Expand All @@ -66,9 +66,9 @@ jobs:
script: echo "Generated AVD snapshot for caching."

- name: Run connected tests
uses: ReactiveCircus/android-emulator-runner@v2
uses: ReactiveCircus/android-emulator-runner@v2.28.0
with:
api-level: 29
api-level: 31
target: google_apis
arch: x86_64
avd-name: $AVD_NAME
Expand Down
28 changes: 27 additions & 1 deletion cpp/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
: dbName(dbName), maxReads(numReadConnections),
writeConnection(dbName, docPath,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
SQLITE_OPEN_FULLMUTEX) {
SQLITE_OPEN_FULLMUTEX),
commitPayload(
{.dbName = &this->dbName, .event = TransactionEvent::COMMIT}),
rollbackPayload({
.dbName = &this->dbName,
.event = TransactionEvent::ROLLBACK,
}) {

onContextCallback = nullptr;
isConcurrencyEnabled = maxReads > 0;
Expand Down Expand Up @@ -127,6 +133,26 @@ void ConnectionPool::setTableUpdateHandler(
(void *)(dbName.c_str()));
}

/**
* The SQLite callback needs to return `0` in order for the commit to
* proceed correctly
*/
int onCommitIntermediate(ConnectionPool *pool) {
if (pool->onCommitCallback != NULL) {
pool->onCommitCallback(&(pool->commitPayload));
}
return 0;
}

void ConnectionPool::setTransactionFinalizerHandler(
void (*callback)(const TransactionCallbackPayload *)) {
this->onCommitCallback = callback;
sqlite3_commit_hook(writeConnection.connection,
(int (*)(void *))onCommitIntermediate, (void *)this);
sqlite3_rollback_hook(writeConnection.connection, (void (*)(void *))callback,
(void *)&rollbackPayload);
}

void ConnectionPool::closeContext(ConnectionLockId contextId) {
if (writeConnection.matchesLock(contextId)) {
if (writeQueue.size() > 0) {
Expand Down
22 changes: 22 additions & 0 deletions cpp/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
#ifndef ConnectionPool_h
#define ConnectionPool_h

enum TransactionEvent { COMMIT, ROLLBACK };

struct TransactionCallbackPayload {
std::string *dbName;
TransactionEvent event;
};

// The number of concurrent read connections to the database.
/**
* Concurrent connection pool class.
Expand Down Expand Up @@ -57,7 +64,12 @@ class ConnectionPool {
std::vector<ConnectionLockId> readQueue;
std::vector<ConnectionLockId> writeQueue;

// Cached constant payloads for c style commit/rollback callbacks
const TransactionCallbackPayload commitPayload;
const TransactionCallbackPayload rollbackPayload;

void (*onContextCallback)(std::string, ConnectionLockId);
void (*onCommitCallback)(const TransactionCallbackPayload *);

bool isConcurrencyEnabled;

Expand All @@ -66,6 +78,8 @@ class ConnectionPool {
unsigned int numReadConnections);
~ConnectionPool();

friend int onCommitIntermediate(ConnectionPool *pool);

/**
* Add a task to the read queue. If there are no available connections,
* the task will be queued.
Expand Down Expand Up @@ -94,6 +108,12 @@ class ConnectionPool {
void setTableUpdateHandler(void (*callback)(void *, int, const char *,
const char *, sqlite3_int64));

/**
* Set a callback function for transaction commits/rollbacks
*/
void setTransactionFinalizerHandler(
void (*callback)(const TransactionCallbackPayload *));

/**
* Close a context in order to progress queue
*/
Expand Down Expand Up @@ -124,4 +144,6 @@ class ConnectionPool {
sqlite3 **db, int sqlOpenFlags);
};

int onCommitIntermediate(ConnectionPool *pool);

#endif
32 changes: 29 additions & 3 deletions cpp/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,32 @@ void updateTableHandler(void *voidDBName, int opType, char const *dbName,
});
}

/**
* Callback handler for SQLite transaction updates
*/
void transactionFinalizerHandler(const TransactionCallbackPayload *payload) {
/**
* No DB operations should occur when this callback is fired from SQLite.
* This function triggers an async invocation to call watch callbacks,
* avoiding holding SQLite up.
*/
invoker->invokeAsync([payload] {
try {
auto global = runtime->global();
jsi::Function handlerFunction = global.getPropertyAsFunction(
*runtime, "triggerTransactionFinalizerHook");

auto jsiDbName = jsi::String::createFromAscii(*runtime, *payload->dbName);
auto jsiEventType = jsi::Value((int)payload->event);
handlerFunction.call(*runtime, move(jsiDbName), move(jsiEventType));
} catch (jsi::JSINativeException e) {
std::cout << e.what() << std::endl;
} catch (...) {
std::cout << "Unknown error" << std::endl;
}
});
}

/**
* Callback handler for Concurrent context is available
*/
Expand Down Expand Up @@ -137,9 +163,9 @@ void osp::install(jsi::Runtime &rt,
}
}

auto result =
sqliteOpenDb(dbName, tempDocPath, &contextLockAvailableHandler,
&updateTableHandler, numReadConnections);
auto result = sqliteOpenDb(
dbName, tempDocPath, &contextLockAvailableHandler, &updateTableHandler,
&transactionFinalizerHandler, numReadConnections);
if (result.type == SQLiteError) {
throw jsi::JSError(rt, result.errorMessage.c_str());
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/sqliteBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ sqliteOpenDb(string const dbName, string const docPath,
void (*contextAvailableCallback)(std::string, ConnectionLockId),
void (*updateTableCallback)(void *, int, const char *,
const char *, sqlite3_int64),
void (*onTransactionFinalizedCallback)(
const TransactionCallbackPayload *event),
uint32_t numReadConnections) {
if (dbMap.count(dbName) == 1) {
return SQLiteOPResult{
Expand All @@ -51,6 +53,7 @@ sqliteOpenDb(string const dbName, string const docPath,
dbMap[dbName] = new ConnectionPool(dbName, docPath, numReadConnections);
dbMap[dbName]->setOnContextAvailable(contextAvailableCallback);
dbMap[dbName]->setTableUpdateHandler(updateTableCallback);
dbMap[dbName]->setTransactionFinalizerHandler(onTransactionFinalizedCallback);

return SQLiteOPResult{
.type = SQLiteOk,
Expand Down
2 changes: 2 additions & 0 deletions cpp/sqliteBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ sqliteOpenDb(std::string const dbName, std::string const docPath,
void (*contextAvailableCallback)(std::string, ConnectionLockId),
void (*updateTableCallback)(void *, int, const char *,
const char *, sqlite3_int64),
void (*onTransactionFinalizedCallback)(
const TransactionCallbackPayload *event),
uint32_t numReadConnections);

SQLiteOPResult sqliteCloseDb(string const dbName);
Expand Down
99 changes: 99 additions & 0 deletions src/DBListenerManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import _ from 'lodash';
import { registerTransactionHook, registerUpdateHook } from './table-updates';
import {
BatchedUpdateCallback,
BatchedUpdateNotification,
TransactionEvent,
UpdateCallback,
UpdateNotification
} from './types';
import { BaseListener, BaseObserver } from './utils/BaseObserver';

export interface DBListenerManagerOptions {
dbName: string;
}

export interface WriteTransactionEvent {
type: TransactionEvent;
}

export interface DBListener extends BaseListener {
/**
* Register a listener to be fired for any table change.
* Changes inside write locks and transactions are reported immediately.
*/
rawTableChange: UpdateCallback;

/**
* Register a listener for when table changes are persisted
* into the DB. Changes during write transactions which are
* rolled back are not reported.
* Any changes during write locks are buffered and reported
* after transaction commit and lock release.
* Table changes are reported individually for now in order to maintain
* API compatibility. These can be batched in future.
*/
tablesUpdated: BatchedUpdateCallback;

/**
* Listener event triggered whenever a write transaction
* is started, committed or rolled back.
*/
writeTransaction: (event: WriteTransactionEvent) => void;
}

export class DBListenerManager extends BaseObserver<DBListener> {}

export class DBListenerManagerInternal extends DBListenerManager {
private updateBuffer: UpdateNotification[];

constructor(protected options: DBListenerManagerOptions) {
super();
this.updateBuffer = [];
registerUpdateHook(this.options.dbName, (update) => this.handleTableUpdates(update));
registerTransactionHook(this.options.dbName, (eventType) => {
switch (eventType) {
case TransactionEvent.COMMIT:
this.flushUpdates();
break;
case TransactionEvent.ROLLBACK:
this.transactionReverted();
break;
}

this.iterateListeners((l) =>
l.writeTransaction?.({
type: eventType
})
);
});
}

flushUpdates() {
if (!this.updateBuffer.length) {
return;
}

const groupedUpdates = _.groupBy(this.updateBuffer, (update) => update.table);
const batchedUpdate: BatchedUpdateNotification = {
groupedUpdates,
rawUpdates: this.updateBuffer,
tables: _.keys(groupedUpdates)
};
this.updateBuffer = [];
this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate));
}

protected transactionReverted() {
// clear updates
this.updateBuffer = [];
}

handleTableUpdates(notification: UpdateNotification) {
// Fire updates for any change
this.iterateListeners((l) => l.rawTableChange?.({ ...notification }));

// Queue changes until they are flushed
this.updateBuffer.push(notification);
}
}
7 changes: 7 additions & 0 deletions src/lock-hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Hooks which can be triggered during the execution of read/write locks
*/
export interface LockHooks {
lockAcquired?: () => Promise<void>;
lockReleased?: () => Promise<void>;
}
Loading

0 comments on commit 3bb0212

Please sign in to comment.