Skip to content
This repository has been archived by the owner on Oct 18, 2023. It is now read-only.

wip: migrate from rusqlite to libsql and libsql_sys #556

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,4 @@ members = [
]

[workspace.dependencies]
rusqlite = { version = "0.29.0", git = "https://github.com/psarna/rusqlite", rev = "477264453b", default-features = false, features = [
"buildtime_bindgen",
"bundled-libsql-wasm-experimental",
"column_decltype",
"load_extension"
] }
libsql = { version = "0.1.6", default-features = false }
68 changes: 34 additions & 34 deletions bottomless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,18 @@ pub extern "C" fn xOpen(
let rc = unsafe {
(orig_methods.xOpen.unwrap())(vfs, db_file, wal_name, no_shm_mode, max_size, methods, wal)
};
if rc != ffi::SQLITE_OK {
if rc != ffi::SQLITE_OK as i32 {
return rc;
}

if !is_regular(vfs) {
tracing::error!("Bottomless WAL is currently only supported for regular VFS");
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}

if is_local() {
tracing::info!("Running in local-mode only, without any replication");
return ffi::SQLITE_OK;
return ffi::SQLITE_OK as i32;
}

let runtime = match tokio::runtime::Builder::new_current_thread()
Expand All @@ -78,7 +78,7 @@ pub extern "C" fn xOpen(
Ok(runtime) => runtime,
Err(e) => {
tracing::error!("Failed to initialize async runtime: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
};

Expand All @@ -88,7 +88,7 @@ pub extern "C" fn xOpen(
Ok(path) => path,
Err(e) => {
tracing::error!("Failed to parse the main database path: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
}
};
Expand All @@ -98,12 +98,12 @@ pub extern "C" fn xOpen(
Ok(repl) => repl,
Err(e) => {
tracing::error!("Failed to initialize replicator: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
};

let rc = block_on!(runtime, try_restore(&mut replicator));
if rc != ffi::SQLITE_OK {
if rc != ffi::SQLITE_OK as i32 {
return rc;
}

Expand All @@ -114,7 +114,7 @@ pub extern "C" fn xOpen(
let context_ptr = Box::into_raw(Box::new(context)) as *mut c_void;
unsafe { (*(*wal)).pMethodsData = context_ptr };

ffi::SQLITE_OK
ffi::SQLITE_OK as i32
}

fn get_orig_methods(wal: *mut Wal) -> &'static libsql_wal_methods {
Expand All @@ -138,7 +138,7 @@ pub extern "C" fn xClose(
let orig_methods = get_orig_methods(wal);
let methods_data = unsafe { (*wal).pMethodsData as *mut replicator::Context };
let rc = unsafe { (orig_methods.xClose.unwrap())(wal, db, sync_flags, n_buf, z_buf) };
if rc != ffi::SQLITE_OK {
if rc != ffi::SQLITE_OK as i32 {
return rc;
}
if !is_local() && !methods_data.is_null() {
Expand Down Expand Up @@ -194,7 +194,7 @@ pub extern "C" fn xUndo(
) -> i32 {
let orig_methods = get_orig_methods(wal);
let rc = unsafe { (orig_methods.xUndo.unwrap())(wal, func, ctx) };
if is_local() || rc != ffi::SQLITE_OK {
if is_local() || rc != ffi::SQLITE_OK as i32 {
return rc;
}

Expand All @@ -207,7 +207,7 @@ pub extern "C" fn xUndo(
);
ctx.replicator.rollback_to_frame(last_valid_frame);

ffi::SQLITE_OK
ffi::SQLITE_OK as i32
}

pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) {
Expand All @@ -218,7 +218,7 @@ pub extern "C" fn xSavepoint(wal: *mut Wal, wal_data: *mut u32) {
pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 {
let orig_methods = get_orig_methods(wal);
let rc = unsafe { (orig_methods.xSavepointUndo.unwrap())(wal, wal_data) };
if is_local() || rc != ffi::SQLITE_OK {
if is_local() || rc != ffi::SQLITE_OK as i32 {
return rc;
}

Expand All @@ -231,7 +231,7 @@ pub extern "C" fn xSavepointUndo(wal: *mut Wal, wal_data: *mut u32) -> i32 {
);
ctx.replicator.rollback_to_frame(last_valid_frame);

ffi::SQLITE_OK
ffi::SQLITE_OK as i32
}

pub extern "C" fn xFrames(
Expand All @@ -253,7 +253,7 @@ pub extern "C" fn xFrames(
// supported by bottomless storage.
if let Err(e) = ctx.replicator.set_page_size(page_size as usize) {
tracing::error!("{}", e);
return ffi::SQLITE_IOERR_WRITE;
return ffi::SQLITE_IOERR_WRITE as i32;
}
let frame_count = ffi::PageHdrIter::new(page_headers, page_size as usize).count();
if size_after != 0 {
Expand All @@ -273,11 +273,11 @@ pub extern "C" fn xFrames(
sync_flags,
)
};
if is_local() || rc != ffi::SQLITE_OK {
if is_local() || rc != ffi::SQLITE_OK as i32 {
return rc;
}

ffi::SQLITE_OK
ffi::SQLITE_OK as i32
}

extern "C" fn always_wait(_busy_param: *mut c_void) -> i32 {
Expand Down Expand Up @@ -307,9 +307,9 @@ pub extern "C" fn xCheckpoint(
** In order to avoid autocheckpoint on close (that's too often),
** checkpoint attempts weaker than TRUNCATE are ignored.
*/
if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE {
if emode < ffi::SQLITE_CHECKPOINT_TRUNCATE as i32 {
tracing::trace!("Ignoring a checkpoint request weaker than TRUNCATE");
return ffi::SQLITE_OK;
return ffi::SQLITE_OK as i32;
}
/* If there's no busy handler, let's provide a default one,
** since we auto-upgrade the passive checkpoint
Expand All @@ -335,14 +335,14 @@ pub extern "C" fn xCheckpoint(
)
};

if is_local() || rc != ffi::SQLITE_OK {
if is_local() || rc != ffi::SQLITE_OK as i32 {
return rc;
}

let ctx = get_replicator_context(wal);
if ctx.replicator.commits_in_current_generation() == 0 {
tracing::debug!("No commits happened in this generation, not snapshotting");
return ffi::SQLITE_OK;
return ffi::SQLITE_OK as i32;
}

let last_known_frame = ctx.replicator.last_known_frame();
Expand All @@ -352,7 +352,7 @@ pub extern "C" fn xCheckpoint(
ctx.replicator.wait_until_committed(last_known_frame)
) {
tracing::error!("Failed to finalize replication: {}", e);
return ffi::SQLITE_IOERR_WRITE;
return ffi::SQLITE_IOERR_WRITE as i32;
}

ctx.replicator.new_generation();
Expand All @@ -363,10 +363,10 @@ pub extern "C" fn xCheckpoint(
"Failed to snapshot the main db file during checkpoint: {}",
e
);
return ffi::SQLITE_IOERR_WRITE;
return ffi::SQLITE_IOERR_WRITE as i32;
}

ffi::SQLITE_OK
ffi::SQLITE_OK as i32
}

pub extern "C" fn xCallback(wal: *mut Wal) -> i32 {
Expand Down Expand Up @@ -416,42 +416,42 @@ async fn try_restore(replicator: &mut replicator::Replicator) -> i32 {
replicator.new_generation();
if let Err(e) = replicator.snapshot_main_db_file().await {
tracing::error!("Failed to snapshot the main db file: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
// Restoration process only leaves the local WAL file if it was
// detected to be newer than its remote counterpart.
if let Err(e) = replicator.maybe_replicate_wal().await {
tracing::error!("Failed to replicate local WAL: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
}
Ok(replicator::RestoreAction::ReuseGeneration(gen)) => {
replicator.set_generation(gen);
}
Err(e) => {
tracing::error!("Failed to restore the database: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
}

ffi::SQLITE_OK
ffi::SQLITE_OK as i32
}

pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const c_char) -> i32 {
if is_local() {
tracing::info!("Running in local-mode only, without any replication");
return ffi::SQLITE_OK;
return ffi::SQLITE_OK as i32;
}

if path.is_null() {
return ffi::SQLITE_OK;
return ffi::SQLITE_OK as i32;
}
let path = unsafe {
match std::ffi::CStr::from_ptr(path).to_str() {
Ok(path) => path,
Err(e) => {
tracing::error!("Failed to parse the main database path: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
}
};
Expand All @@ -464,23 +464,23 @@ pub extern "C" fn xPreMainDbOpen(_methods: *mut libsql_wal_methods, path: *const
Ok(runtime) => runtime,
Err(e) => {
tracing::error!("Failed to initialize async runtime: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
};

let options = match replicator::Options::from_env() {
Ok(options) => options,
Err(e) => {
tracing::error!("Failed to parse replicator options: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
};
let replicator = block_on!(runtime, replicator::Replicator::with_options(path, options));
let mut replicator = match replicator {
Ok(repl) => repl,
Err(e) => {
tracing::error!("Failed to initialize replicator: {}", e);
return ffi::SQLITE_CANTOPEN;
return ffi::SQLITE_CANTOPEN as i32;
}
};
block_on!(runtime, try_restore(&mut replicator))
Expand Down Expand Up @@ -561,7 +561,7 @@ pub mod static_init {
if orig_methods.is_null() {}
let methods = crate::bottomless_methods(orig_methods);
let rc = unsafe { libsql_wal_methods_register(methods) };
if rc != crate::ffi::SQLITE_OK {
if rc != crate::ffi::SQLITE_OK as i32 {
let _box = unsafe { Box::from_raw(methods as *mut libsql_wal_methods) };
tracing::warn!("Failed to instantiate bottomless WAL methods");
}
Expand Down
2 changes: 1 addition & 1 deletion sqld-libsql-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ edition = "2021"

[dependencies]
anyhow = "1.0.66"
rusqlite = { workspace = true }
tracing = "0.1.37"
once_cell = "1.17.1"
libsql = { workspace = true }

[features]
unix-excl-vfs = []
13 changes: 3 additions & 10 deletions sqld-libsql-bindings/src/ffi/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,9 @@

pub mod types;

pub use rusqlite::ffi::{
libsql_wal_methods, libsql_wal_methods_find, libsql_wal_methods_register,
libsql_wal_methods_unregister, sqlite3, sqlite3_file, sqlite3_hard_heap_limit64,
sqlite3_io_methods, sqlite3_soft_heap_limit64, sqlite3_vfs, WalIndexHdr, SQLITE_CANTOPEN,
SQLITE_CHECKPOINT_FULL, SQLITE_CHECKPOINT_TRUNCATE, SQLITE_IOERR_WRITE, SQLITE_OK,
};

pub use rusqlite::ffi::libsql_pghdr as PgHdr;
pub use rusqlite::ffi::libsql_wal as Wal;
pub use rusqlite::ffi::*;
pub use libsql::ffi::libsql_pghdr as PgHdr;
pub use libsql::ffi::libsql_wal as Wal;
pub use libsql::ffi::*;

pub struct PageHdrIter {
current_ptr: *const PgHdr,
Expand Down
6 changes: 3 additions & 3 deletions sqld-libsql-bindings/src/ffi/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::ffi::{c_char, c_int, c_uint, c_void};

use super::{libsql_wal_methods, sqlite3_file, sqlite3_vfs, PgHdr, Wal};
use rusqlite::ffi::sqlite3;
use libsql::ffi::sqlite3;

// WAL methods
pub type XWalLimitFn = extern "C" fn(wal: *mut Wal, limit: i64);
Expand All @@ -18,7 +18,7 @@ pub type XWalSavepointFn = extern "C" fn(wal: *mut Wal, wal_data: *mut u32);
pub type XWalSavePointUndoFn = unsafe extern "C" fn(wal: *mut Wal, wal_data: *mut u32) -> c_int;
pub type XWalCheckpointFn = unsafe extern "C" fn(
wal: *mut Wal,
db: *mut rusqlite::ffi::sqlite3,
db: *mut libsql::ffi::sqlite3,
emode: c_int,
busy_handler: Option<unsafe extern "C" fn(busy_param: *mut c_void) -> c_int>,
busy_arg: *mut c_void,
Expand All @@ -32,7 +32,7 @@ pub type XWalCallbackFn = extern "C" fn(wal: *mut Wal) -> c_int;
pub type XWalExclusiveModeFn = extern "C" fn(wal: *mut Wal, op: c_int) -> c_int;
pub type XWalHeapMemoryFn = extern "C" fn(wal: *mut Wal) -> c_int;
pub type XWalFileFn = extern "C" fn(wal: *mut Wal) -> *mut sqlite3_file;
pub type XWalDbFn = extern "C" fn(wal: *mut Wal, db: *mut rusqlite::ffi::sqlite3);
pub type XWalDbFn = extern "C" fn(wal: *mut Wal, db: *mut libsql::ffi::sqlite3);
pub type XWalPathNameLenFn = extern "C" fn(orig_len: c_int) -> c_int;
pub type XWalGetPathNameFn = extern "C" fn(buf: *mut c_char, orig: *const c_char, orig_len: c_int);
pub type XWalPreMainDbOpen =
Expand Down
Loading