Skip to content

Commit

Permalink
fix(core): handle concurrent db connections better (#28544)
Browse files Browse the repository at this point in the history
  • Loading branch information
Cammisuli authored and jaysoo committed Oct 23, 2024
1 parent 5da14f4 commit 4a1737b
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 133 deletions.
50 changes: 50 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/nx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ strip = "none"

[dependencies]
anyhow = "1.0.71"
ci_info = "0.14.14"
colored = "2"
crossbeam-channel = '0.5'
dashmap = { version = "5.5.3", features = ["rayon"] }
Expand Down Expand Up @@ -55,6 +56,7 @@ mio = "0.8"
portable-pty = { git = "https://github.com/cammisuli/wezterm", rev = "b538ee29e1e89eeb4832fb35ae095564dce34c29" }
crossterm = "0.27.0"
ignore-files = "2.1.0"
fs4 = "0.10.0"
rusqlite = { version = "0.32.1", features = ["bundled", "array", "vtab"] }
watchexec = "3.0.1"
watchexec-events = "2.0.1"
Expand Down
51 changes: 27 additions & 24 deletions packages/nx/src/native/cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use std::time::Instant;
use fs_extra::remove_items;
use napi::bindgen_prelude::*;
use regex::Regex;
use rusqlite::{params, Connection, OptionalExtension};
use rusqlite::params;
use tracing::trace;

use crate::native::cache::expand_outputs::_expand_outputs;
use crate::native::cache::file_ops::_copy;
use crate::native::db::connection::NxDbConnection;
use crate::native::utils::Normalize;

#[napi(object)]
Expand All @@ -25,7 +26,7 @@ pub struct NxCache {
pub cache_directory: String,
workspace_root: PathBuf,
cache_path: PathBuf,
db: External<Connection>,
db: External<NxDbConnection>,
link_task_details: bool,
}

Expand All @@ -35,7 +36,7 @@ impl NxCache {
pub fn new(
workspace_root: String,
cache_path: String,
db_connection: External<Connection>,
db_connection: External<NxDbConnection>,
link_task_details: Option<bool>,
) -> anyhow::Result<Self> {
let cache_path = PathBuf::from(&cache_path);
Expand All @@ -58,29 +59,26 @@ impl NxCache {

fn setup(&self) -> anyhow::Result<()> {
let query = if self.link_task_details {
"BEGIN;
CREATE TABLE IF NOT EXISTS cache_outputs (
"CREATE TABLE IF NOT EXISTS cache_outputs (
hash TEXT PRIMARY KEY NOT NULL,
code INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (hash) REFERENCES task_details (hash)
);
COMMIT;
);
"
} else {
"BEGIN;
CREATE TABLE IF NOT EXISTS cache_outputs (
"CREATE TABLE IF NOT EXISTS cache_outputs (
hash TEXT PRIMARY KEY NOT NULL,
code INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COMMIT;
"
"
};

self.db.execute_batch(query).map_err(anyhow::Error::from)
self.db.execute(query, []).map_err(anyhow::Error::from)?;
Ok(())
}

#[napi]
Expand Down Expand Up @@ -114,8 +112,7 @@ impl NxCache {
})
},
)
.optional()
.map_err(anyhow::Error::new)?;
.map_err(|e| anyhow::anyhow!("Unable to get {}: {:?}", &hash, e))?;
trace!("GET {} {:?}", &hash, start.elapsed());
Ok(r)
}
Expand Down Expand Up @@ -157,7 +154,6 @@ impl NxCache {
}
}

trace!("Recording to cache: {:?}", &hash);
self.record_to_cache(hash, code)?;
Ok(())
}
Expand All @@ -168,6 +164,11 @@ impl NxCache {
hash: String,
result: CachedResult,
) -> anyhow::Result<()> {
trace!(
"applying remote cache results: {:?} ({})",
&hash,
&result.outputs_path
);
let terminal_output = result.terminal_output;
write(self.get_task_outputs_path(hash.clone()), terminal_output)?;

Expand All @@ -187,6 +188,7 @@ impl NxCache {
}

fn record_to_cache(&self, hash: String, code: i16) -> anyhow::Result<()> {
trace!("Recording to cache: {}, {}", &hash, code);
self.db.execute(
"INSERT OR REPLACE INTO cache_outputs (hash, code) VALUES (?1, ?2)",
params![hash, code],
Expand Down Expand Up @@ -235,7 +237,7 @@ impl NxCache {

Ok(vec![
self.cache_path.join(&hash),
self.get_task_outputs_path_internal(&hash).into(),
self.get_task_outputs_path_internal(&hash),
])
})?
.filter_map(anyhow::Result::ok)
Expand All @@ -252,14 +254,15 @@ impl NxCache {
// Checks that the number of cache records in the database
// matches the number of cache directories on the filesystem.
// If they don't match, it means that the cache is out of sync.
let cache_records_exist =
self.db
.query_row("SELECT EXISTS (SELECT 1 FROM cache_outputs)", [], |row| {
let exists: bool = row.get(0)?;
Ok(exists)
})?;

if !cache_records_exist {
let cache_records_exist = self
.db
.query_row("SELECT EXISTS (SELECT 1 FROM cache_outputs)", [], |row| {
let exists: bool = row.get(0)?;
Ok(exists)
})?
.unwrap_or(false);

if cache_records_exist {
let hash_regex = Regex::new(r"^\d+$").expect("Hash regex is invalid");
let fs_entries = std::fs::read_dir(&self.cache_path).map_err(anyhow::Error::from)?;

Expand Down
75 changes: 75 additions & 0 deletions packages/nx/src/native/db/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use anyhow::Result;
use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement};
use std::thread;
use std::time::{Duration, Instant};
use tracing::trace;

pub struct NxDbConnection {
pub conn: Connection,
}

impl NxDbConnection {
pub fn new(connection: Connection) -> Self {
Self { conn: connection }
}

pub fn execute<P: Params + Clone>(&self, sql: &str, params: P) -> Result<usize> {
self.retry_on_busy(|conn| conn.execute(sql, params.clone()))
.map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e))
}

pub fn execute_batch(&self, sql: &str) -> Result<()> {
self.retry_on_busy(|conn| conn.execute_batch(sql))
.map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e))
}

pub fn prepare(&self, sql: &str) -> Result<Statement> {
self.retry_on_busy(|conn| conn.prepare(sql))
.map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e))
}

pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<Option<T>>
where
P: Params + Clone,
F: FnOnce(&Row<'_>) -> rusqlite::Result<T> + Clone,
{
self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional())
.map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e))
}

pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> {
self.conn
.close()
.inspect_err(|e| trace!("Error in close: {:?}", e))
}

#[allow(clippy::needless_lifetimes)]
fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result<T>
where
F: Fn(&'a Connection) -> rusqlite::Result<T>,
{
let start = Instant::now();
let max_retries: u64 = 5;
let retry_delay = Duration::from_millis(25);

for i in 0..max_retries {
match operation(&self.conn) {
Ok(result) => return Ok(result),
Err(Error::SqliteFailure(err, _))
if err.code == rusqlite::ErrorCode::DatabaseBusy =>
{
trace!("Database busy. Retrying{}", ".".repeat(i as usize));
if start.elapsed()
>= Duration::from_millis(max_retries * retry_delay.as_millis() as u64)
{
break;
}
thread::sleep(retry_delay);
}
err @ Err(_) => return err,
}
}

operation(&self.conn)
}
}
Loading

0 comments on commit 4a1737b

Please sign in to comment.