Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Table change updates in write transactions #13

Merged
merged 10 commits into from
Jan 24, 2024
5 changes: 5 additions & 0 deletions .changeset/chilled-queens-explain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@journeyapps/react-native-quick-sqlite': minor
---

Added `registerTablesChangedHook` to DB connections which reports batched table updates once `writeTransaction`s and `writeLock`s have been committed. Maintained API compatibility with `registerUpdateHook` which reports table change events as they occur. Added listeners for when write transactions have been committed or rolled back.
28 changes: 27 additions & 1 deletion cpp/ConnectionPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ ConnectionPool::ConnectionPool(std::string dbName, std::string docPath,
: dbName(dbName), maxReads(numReadConnections),
writeConnection(dbName, docPath,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE |
SQLITE_OPEN_FULLMUTEX) {
SQLITE_OPEN_FULLMUTEX),
commitPayload(
{.dbName = &this->dbName, .event = TransactionEvent::COMMIT}),
rollbackPayload({
.dbName = &this->dbName,
.event = TransactionEvent::ROLLBACK,
}) {

onContextCallback = nullptr;
isConcurrencyEnabled = maxReads > 0;
Expand Down Expand Up @@ -127,6 +133,26 @@ void ConnectionPool::setTableUpdateHandler(
(void *)(dbName.c_str()));
}

/**
* The SQLite callback needs to return `0` in order for the commit to
* proceed correctly
*/
int onCommitIntermediate(ConnectionPool *pool) {
if (pool->onCommitCallback != NULL) {
pool->onCommitCallback(&(pool->commitPayload));
}
return 0;
}

void ConnectionPool::setTransactionFinalizerHandler(
void (*callback)(const TransactionCallbackPayload *)) {
this->onCommitCallback = callback;
sqlite3_commit_hook(writeConnection.connection,
(int (*)(void *))onCommitIntermediate, (void *)this);
sqlite3_rollback_hook(writeConnection.connection, (void (*)(void *))callback,
(void *)&rollbackPayload);
}

void ConnectionPool::closeContext(ConnectionLockId contextId) {
if (writeConnection.matchesLock(contextId)) {
if (writeQueue.size() > 0) {
Expand Down
22 changes: 22 additions & 0 deletions cpp/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
#ifndef ConnectionPool_h
#define ConnectionPool_h

enum TransactionEvent { COMMIT, ROLLBACK };

struct TransactionCallbackPayload {
std::string *dbName;
TransactionEvent event;
};

// The number of concurrent read connections to the database.
/**
* Concurrent connection pool class.
Expand Down Expand Up @@ -57,7 +64,12 @@ class ConnectionPool {
std::vector<ConnectionLockId> readQueue;
std::vector<ConnectionLockId> writeQueue;

// Cached constant payloads for c style commit/rollback callbacks
const TransactionCallbackPayload commitPayload;
const TransactionCallbackPayload rollbackPayload;

void (*onContextCallback)(std::string, ConnectionLockId);
void (*onCommitCallback)(const TransactionCallbackPayload *);

bool isConcurrencyEnabled;

Expand All @@ -66,6 +78,8 @@ class ConnectionPool {
unsigned int numReadConnections);
~ConnectionPool();

friend int onCommitIntermediate(ConnectionPool *pool);

/**
* Add a task to the read queue. If there are no available connections,
* the task will be queued.
Expand Down Expand Up @@ -94,6 +108,12 @@ class ConnectionPool {
void setTableUpdateHandler(void (*callback)(void *, int, const char *,
const char *, sqlite3_int64));

/**
* Set a callback function for transaction commits/rollbacks
*/
void setTransactionFinalizerHandler(
void (*callback)(const TransactionCallbackPayload *));

/**
* Close a context in order to progress queue
*/
Expand Down Expand Up @@ -124,4 +144,6 @@ class ConnectionPool {
sqlite3 **db, int sqlOpenFlags);
};

int onCommitIntermediate(ConnectionPool *pool);

#endif
32 changes: 29 additions & 3 deletions cpp/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,32 @@ void updateTableHandler(void *voidDBName, int opType, char const *dbName,
});
}

/**
* Callback handler for SQLite transaction updates
*/
void transactionFinalizerHandler(const TransactionCallbackPayload *payload) {
/**
* No DB operations should occur when this callback is fired from SQLite.
* This function triggers an async invocation to call watch callbacks,
* avoiding holding SQLite up.
*/
invoker->invokeAsync([payload] {
try {
auto global = runtime->global();
jsi::Function handlerFunction = global.getPropertyAsFunction(
*runtime, "triggerTransactionFinalizerHook");

auto jsiDbName = jsi::String::createFromAscii(*runtime, *payload->dbName);
auto jsiEventType = jsi::Value((int)payload->event);
handlerFunction.call(*runtime, move(jsiDbName), move(jsiEventType));
} catch (jsi::JSINativeException e) {
std::cout << e.what() << std::endl;
} catch (...) {
std::cout << "Unknown error" << std::endl;
}
});
}

/**
* Callback handler for Concurrent context is available
*/
Expand Down Expand Up @@ -137,9 +163,9 @@ void osp::install(jsi::Runtime &rt,
}
}

auto result =
sqliteOpenDb(dbName, tempDocPath, &contextLockAvailableHandler,
&updateTableHandler, numReadConnections);
auto result = sqliteOpenDb(
dbName, tempDocPath, &contextLockAvailableHandler, &updateTableHandler,
&transactionFinalizerHandler, numReadConnections);
if (result.type == SQLiteError) {
throw jsi::JSError(rt, result.errorMessage.c_str());
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/sqliteBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ sqliteOpenDb(string const dbName, string const docPath,
void (*contextAvailableCallback)(std::string, ConnectionLockId),
void (*updateTableCallback)(void *, int, const char *,
const char *, sqlite3_int64),
void (*onTransactionFinalizedCallback)(
const TransactionCallbackPayload *event),
uint32_t numReadConnections) {
if (dbMap.count(dbName) == 1) {
return SQLiteOPResult{
Expand All @@ -51,6 +53,7 @@ sqliteOpenDb(string const dbName, string const docPath,
dbMap[dbName] = new ConnectionPool(dbName, docPath, numReadConnections);
dbMap[dbName]->setOnContextAvailable(contextAvailableCallback);
dbMap[dbName]->setTableUpdateHandler(updateTableCallback);
dbMap[dbName]->setTransactionFinalizerHandler(onTransactionFinalizedCallback);

return SQLiteOPResult{
.type = SQLiteOk,
Expand Down
2 changes: 2 additions & 0 deletions cpp/sqliteBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ sqliteOpenDb(std::string const dbName, std::string const docPath,
void (*contextAvailableCallback)(std::string, ConnectionLockId),
void (*updateTableCallback)(void *, int, const char *,
const char *, sqlite3_int64),
void (*onTransactionFinalizedCallback)(
const TransactionCallbackPayload *event),
uint32_t numReadConnections);

SQLiteOPResult sqliteCloseDb(string const dbName);
Expand Down
99 changes: 99 additions & 0 deletions src/DBListenerManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import _ from 'lodash';
import { registerTransactionHook, registerUpdateHook } from './table-updates';
import {
BatchedUpdateCallback,
BatchedUpdateNotification,
TransactionEvent,
UpdateCallback,
UpdateNotification
} from './types';
import { BaseListener, BaseObserver } from './utils/BaseObserver';

export interface DBListenerManagerOptions {
dbName: string;
}

export interface WriteTransactionEvent {
type: TransactionEvent;
}

export interface DBListener extends BaseListener {
/**
* Register a listener to be fired for any table change.
* Changes inside write locks and transactions are reported immediately.
*/
rawTableChange: UpdateCallback;

/**
* Register a listener for when table changes are persisted
* into the DB. Changes during write transactions which are
* rolled back are not reported.
* Any changes during write locks are buffered and reported
* after transaction commit and lock release.
* Table changes are reported individually for now in order to maintain
* API compatibility. These can be batched in future.
*/
tablesUpdated: BatchedUpdateCallback;

/**
* Listener event triggered whenever a write transaction
* is started, committed or rolled back.
*/
writeTransaction: (event: WriteTransactionEvent) => void;
}

export class DBListenerManager extends BaseObserver<DBListener> {}

export class DBListenerManagerInternal extends DBListenerManager {
private updateBuffer: UpdateNotification[];

constructor(protected options: DBListenerManagerOptions) {
super();
this.updateBuffer = [];
registerUpdateHook(this.options.dbName, (update) => this.handleTableUpdates(update));
registerTransactionHook(this.options.dbName, (eventType) => {
switch (eventType) {
case TransactionEvent.COMMIT:
this.flushUpdates();
break;
case TransactionEvent.ROLLBACK:
this.transactionReverted();
break;
}

this.iterateListeners((l) =>
l.writeTransaction?.({
type: eventType
})
);
});
}

flushUpdates() {
if (!this.updateBuffer.length) {
return;
}

const groupedUpdates = _.groupBy(this.updateBuffer, (update) => update.table);
const batchedUpdate: BatchedUpdateNotification = {
groupedUpdates,
rawUpdates: this.updateBuffer,
tables: _.keys(groupedUpdates)
};
this.updateBuffer = [];
this.iterateListeners((l) => l.tablesUpdated?.(batchedUpdate));
}

protected transactionReverted() {
// clear updates
this.updateBuffer = [];
}

handleTableUpdates(notification: UpdateNotification) {
// Fire updates for any change
this.iterateListeners((l) => l.rawTableChange?.({ ...notification }));

// Queue changes until they are flushed
this.updateBuffer.push(notification);
}
}
7 changes: 7 additions & 0 deletions src/lock-hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/**
* Hooks which can be triggered during the execution of read/write locks
*/
export interface LockHooks {
lockAcquired?: () => Promise<void>;
lockReleased?: () => Promise<void>;
}
Loading
Loading