From 4a1737b237093d4e1c2871c5a14c48446a6fc547 Mon Sep 17 00:00:00 2001 From: Jonathan Cammisuli Date: Wed, 23 Oct 2024 15:33:33 -0400 Subject: [PATCH] fix(core): handle concurrent db connections better (#28544) --- Cargo.lock | 50 ++++++ packages/nx/Cargo.toml | 2 + packages/nx/src/native/cache/cache.rs | 51 ++++--- packages/nx/src/native/db/connection.rs | 75 +++++++++ packages/nx/src/native/db/initialize.rs | 144 ++++++++++++++++++ packages/nx/src/native/db/mod.rs | 84 ++-------- packages/nx/src/native/index.d.ts | 8 +- packages/nx/src/native/tasks/details.rs | 12 +- packages/nx/src/native/tasks/task_history.rs | 38 ++--- packages/nx/src/native/tests/cache.spec.ts | 17 ++- .../nx/src/native/tests/task_history.spec.ts | 17 ++- packages/nx/src/tasks-runner/cache.ts | 2 + 12 files changed, 367 insertions(+), 133 deletions(-) create mode 100644 packages/nx/src/native/db/connection.rs create mode 100644 packages/nx/src/native/db/initialize.rs diff --git a/Cargo.lock b/Cargo.lock index 7cf314466715d..a4da77495d984 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -290,6 +290,15 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "ci_info" +version = "0.14.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "840dbb7bdd1f2c4d434d6b08420ef204e0bfad0ab31a07a80a1248d24cc6e38b" +dependencies = [ + "envmnt", +] + [[package]] name = "clang-sys" version = "1.8.1" @@ -460,6 +469,16 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" +[[package]] +name = "envmnt" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d73999a2b8871e74c8b8bc23759ee9f3d85011b24fafc91a4b3b5c8cc8185501" +dependencies = [ + "fsio", + "indexmap", +] + [[package]] name = "errno" version = "0.3.8" @@ -545,6 +564,16 @@ dependencies = [ "syn 2.0.53", ] +[[package]] +name = "fs4" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec6fcfb3c0c1d71612528825042261419d5dade9678c39a781e05b63677d9b32" +dependencies = [ + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -560,6 +589,15 @@ dependencies = [ "libc", ] +[[package]] +name = "fsio" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dad0ce30be0cc441b325c5d705c8b613a0ca0d92b6a8953d41bd236dc09a36d0" +dependencies = [ + "dunce", +] + [[package]] name = "funty" version = "2.0.0" @@ -1030,6 +1068,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "inotify" version = "0.9.6" @@ -1467,11 +1515,13 @@ version = "0.1.0" dependencies = [ "anyhow", "assert_fs", + "ci_info", "colored", "crossbeam-channel", "crossterm", "dashmap", "dunce", + "fs4", "fs_extra", "globset", "hashbrown 0.14.5", diff --git a/packages/nx/Cargo.toml b/packages/nx/Cargo.toml index d58a8ed28adb1..9780242dbb297 100644 --- a/packages/nx/Cargo.toml +++ b/packages/nx/Cargo.toml @@ -13,6 +13,7 @@ strip = "none" [dependencies] anyhow = "1.0.71" +ci_info = "0.14.14" colored = "2" crossbeam-channel = '0.5' dashmap = { version = "5.5.3", features = ["rayon"] } @@ -55,6 +56,7 @@ mio = "0.8" portable-pty = { git = "https://github.com/cammisuli/wezterm", rev = "b538ee29e1e89eeb4832fb35ae095564dce34c29" } crossterm = "0.27.0" ignore-files = "2.1.0" +fs4 = "0.10.0" rusqlite = { version = "0.32.1", features = ["bundled", "array", "vtab"] } watchexec = "3.0.1" watchexec-events = "2.0.1" diff --git a/packages/nx/src/native/cache/cache.rs b/packages/nx/src/native/cache/cache.rs index 16ce0daf30308..013eedcfb852e 100644 --- a/packages/nx/src/native/cache/cache.rs +++ b/packages/nx/src/native/cache/cache.rs @@ -5,11 +5,12 @@ use std::time::Instant; use fs_extra::remove_items; use napi::bindgen_prelude::*; use regex::Regex; -use rusqlite::{params, Connection, OptionalExtension}; +use rusqlite::params; use tracing::trace; use crate::native::cache::expand_outputs::_expand_outputs; use crate::native::cache::file_ops::_copy; +use crate::native::db::connection::NxDbConnection; use crate::native::utils::Normalize; #[napi(object)] @@ -25,7 +26,7 @@ pub struct NxCache { pub cache_directory: String, workspace_root: PathBuf, cache_path: PathBuf, - db: External, + db: External, link_task_details: bool, } @@ -35,7 +36,7 @@ impl NxCache { pub fn new( workspace_root: String, cache_path: String, - db_connection: External, + db_connection: External, link_task_details: Option, ) -> anyhow::Result { let cache_path = PathBuf::from(&cache_path); @@ -58,29 +59,26 @@ impl NxCache { fn setup(&self) -> anyhow::Result<()> { let query = if self.link_task_details { - "BEGIN; - CREATE TABLE IF NOT EXISTS cache_outputs ( + "CREATE TABLE IF NOT EXISTS cache_outputs ( hash TEXT PRIMARY KEY NOT NULL, code INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (hash) REFERENCES task_details (hash) - ); - COMMIT; + ); " } else { - "BEGIN; - CREATE TABLE IF NOT EXISTS cache_outputs ( + "CREATE TABLE IF NOT EXISTS cache_outputs ( hash TEXT PRIMARY KEY NOT NULL, code INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, accessed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); - COMMIT; - " + " }; - self.db.execute_batch(query).map_err(anyhow::Error::from) + self.db.execute(query, []).map_err(anyhow::Error::from)?; + Ok(()) } #[napi] @@ -114,8 +112,7 @@ impl NxCache { }) }, ) - .optional() - .map_err(anyhow::Error::new)?; + .map_err(|e| anyhow::anyhow!("Unable to get {}: {:?}", &hash, e))?; trace!("GET {} {:?}", &hash, start.elapsed()); Ok(r) } @@ -157,7 +154,6 @@ impl NxCache { } } - trace!("Recording to cache: {:?}", &hash); self.record_to_cache(hash, code)?; Ok(()) } @@ -168,6 +164,11 @@ impl NxCache { hash: String, result: CachedResult, ) -> anyhow::Result<()> { + trace!( + "applying remote cache results: {:?} ({})", + &hash, + &result.outputs_path + ); let terminal_output = result.terminal_output; write(self.get_task_outputs_path(hash.clone()), terminal_output)?; @@ -187,6 +188,7 @@ impl NxCache { } fn record_to_cache(&self, hash: String, code: i16) -> anyhow::Result<()> { + trace!("Recording to cache: {}, {}", &hash, code); self.db.execute( "INSERT OR REPLACE INTO cache_outputs (hash, code) VALUES (?1, ?2)", params![hash, code], @@ -235,7 +237,7 @@ impl NxCache { Ok(vec![ self.cache_path.join(&hash), - self.get_task_outputs_path_internal(&hash).into(), + self.get_task_outputs_path_internal(&hash), ]) })? .filter_map(anyhow::Result::ok) @@ -252,14 +254,15 @@ impl NxCache { // Checks that the number of cache records in the database // matches the number of cache directories on the filesystem. // If they don't match, it means that the cache is out of sync. - let cache_records_exist = - self.db - .query_row("SELECT EXISTS (SELECT 1 FROM cache_outputs)", [], |row| { - let exists: bool = row.get(0)?; - Ok(exists) - })?; - - if !cache_records_exist { + let cache_records_exist = self + .db + .query_row("SELECT EXISTS (SELECT 1 FROM cache_outputs)", [], |row| { + let exists: bool = row.get(0)?; + Ok(exists) + })? + .unwrap_or(false); + + if cache_records_exist { let hash_regex = Regex::new(r"^\d+$").expect("Hash regex is invalid"); let fs_entries = std::fs::read_dir(&self.cache_path).map_err(anyhow::Error::from)?; diff --git a/packages/nx/src/native/db/connection.rs b/packages/nx/src/native/db/connection.rs new file mode 100644 index 0000000000000..3d7de42530874 --- /dev/null +++ b/packages/nx/src/native/db/connection.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use rusqlite::{Connection, Error, OptionalExtension, Params, Row, Statement}; +use std::thread; +use std::time::{Duration, Instant}; +use tracing::trace; + +pub struct NxDbConnection { + pub conn: Connection, +} + +impl NxDbConnection { + pub fn new(connection: Connection) -> Self { + Self { conn: connection } + } + + pub fn execute(&self, sql: &str, params: P) -> Result { + self.retry_on_busy(|conn| conn.execute(sql, params.clone())) + .map_err(|e| anyhow::anyhow!("DB execute error: \"{}\", {:?}", sql, e)) + } + + pub fn execute_batch(&self, sql: &str) -> Result<()> { + self.retry_on_busy(|conn| conn.execute_batch(sql)) + .map_err(|e| anyhow::anyhow!("DB execute batch error: \"{}\", {:?}", sql, e)) + } + + pub fn prepare(&self, sql: &str) -> Result { + self.retry_on_busy(|conn| conn.prepare(sql)) + .map_err(|e| anyhow::anyhow!("DB prepare error: \"{}\", {:?}", sql, e)) + } + + pub fn query_row(&self, sql: &str, params: P, f: F) -> Result> + where + P: Params + Clone, + F: FnOnce(&Row<'_>) -> rusqlite::Result + Clone, + { + self.retry_on_busy(|conn| conn.query_row(sql, params.clone(), f.clone()).optional()) + .map_err(|e| anyhow::anyhow!("DB query error: \"{}\", {:?}", sql, e)) + } + + pub fn close(self) -> rusqlite::Result<(), (Connection, Error)> { + self.conn + .close() + .inspect_err(|e| trace!("Error in close: {:?}", e)) + } + + #[allow(clippy::needless_lifetimes)] + fn retry_on_busy<'a, F, T>(&'a self, operation: F) -> rusqlite::Result + where + F: Fn(&'a Connection) -> rusqlite::Result, + { + let start = Instant::now(); + let max_retries: u64 = 5; + let retry_delay = Duration::from_millis(25); + + for i in 0..max_retries { + match operation(&self.conn) { + Ok(result) => return Ok(result), + Err(Error::SqliteFailure(err, _)) + if err.code == rusqlite::ErrorCode::DatabaseBusy => + { + trace!("Database busy. Retrying{}", ".".repeat(i as usize)); + if start.elapsed() + >= Duration::from_millis(max_retries * retry_delay.as_millis() as u64) + { + break; + } + thread::sleep(retry_delay); + } + err @ Err(_) => return err, + } + } + + operation(&self.conn) + } +} diff --git a/packages/nx/src/native/db/initialize.rs b/packages/nx/src/native/db/initialize.rs new file mode 100644 index 0000000000000..bacc2b5c0e67c --- /dev/null +++ b/packages/nx/src/native/db/initialize.rs @@ -0,0 +1,144 @@ +use std::fs::{remove_file, File}; +use std::path::{Path, PathBuf}; +use tracing::{debug, trace}; +use rusqlite::{Connection, OpenFlags}; +use fs4::fs_std::FileExt; +use crate::native::db::connection::NxDbConnection; + +pub(super) struct LockFile { + file: File, + path: PathBuf, +} + +pub(super) fn unlock_file(lock_file: &LockFile) { + if lock_file.path.exists() { + lock_file + .file + .unlock() + .and_then(|_| remove_file(&lock_file.path)) + .ok(); + } +} + +pub(super) fn create_lock_file(db_path: &Path) -> anyhow::Result { + let lock_file_path = db_path.with_extension("lock"); + let lock_file = File::create(&lock_file_path) + .map_err(|e| anyhow::anyhow!("Unable to create db lock file: {:?}", e))?; + + trace!("Getting lock on db lock file"); + lock_file + .lock_exclusive() + .inspect(|_| trace!("Got lock on db lock file")) + .map_err(|e| anyhow::anyhow!("Unable to lock the db lock file: {:?}", e))?; + Ok(LockFile { + file: lock_file, + path: lock_file_path, + }) +} + +pub(super) fn initialize_db(nx_version: String, db_path: &PathBuf) -> anyhow::Result { + let c = create_connection(db_path)?; + + trace!( + "Checking if current existing database is compatible with Nx {}", + nx_version + ); + let db_version = c.query_row( + "SELECT value FROM metadata WHERE key='NX_VERSION'", + [], + |row| { + let r: String = row.get(0)?; + Ok(r) + }, + ); + let c = match db_version { + Ok(Some(version)) if version == nx_version => { + trace!("Database is compatible with Nx {}", nx_version); + c + } + // If there is no version, it means that this database is new + Ok(None) => { + trace!("Recording Nx Version: {}", nx_version); + c.execute( + "INSERT OR REPLACE INTO metadata (key, value) VALUES ('NX_VERSION', ?)", + [nx_version], + )?; + c + } + _ => { + trace!("Disconnecting from existing incompatible database"); + c.close().map_err(|(_, error)| anyhow::Error::from(error))?; + trace!("Removing existing incompatible database"); + remove_file(db_path)?; + + trace!("Creating a new connection to a new database"); + create_connection(db_path)? + } + }; + + Ok(c) +} + +fn create_connection(db_path: &PathBuf) -> anyhow::Result { + match open_database_connection(db_path) { + Ok(connection) => { + configure_database(&connection)?; + create_metadata_table(&connection)?; + Ok(connection) + } + err @ Err(_) => err, + } +} + +fn create_metadata_table(c: &NxDbConnection) -> anyhow::Result<()> { + debug!("Creating table for metadata if it does not exist"); + c.execute( + "CREATE TABLE IF NOT EXISTS metadata ( + key TEXT NOT NULL PRIMARY KEY, + value TEXT NOT NULL + )", + [], + )?; + Ok(()) +} + +fn open_database_connection(db_path: &PathBuf) -> anyhow::Result { + let conn = if cfg!(target_family = "unix") && ci_info::is_ci() { + trace!("Opening connection with unix-dotfile"); + Connection::open_with_flags_and_vfs( + db_path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_NO_MUTEX, + "unix-dotfile", + ) + } else { + Connection::open_with_flags( + db_path, + OpenFlags::SQLITE_OPEN_READ_WRITE + | OpenFlags::SQLITE_OPEN_CREATE + | OpenFlags::SQLITE_OPEN_URI + | OpenFlags::SQLITE_OPEN_FULL_MUTEX, + ) + }; + + conn.map_err(|e| anyhow::anyhow!("Error creating connection {:?}", e)) + .map(NxDbConnection::new) +} + +fn configure_database(connection: &NxDbConnection) -> anyhow::Result<()> { + connection + .conn + .pragma_update(None, "journal_mode", "WAL") + .map_err(|e| anyhow::anyhow!("Unable to set journal_mode: {:?}", e))?; + connection + .conn + .pragma_update(None, "synchronous", "NORMAL") + .map_err(|e| anyhow::anyhow!("Unable to set synchronous: {:?}", e))?; + connection + .conn + .busy_handler(Some(|tries| tries < 6)) + .map_err(|e| anyhow::anyhow!("Unable to set busy handler: {:?}", e))?; + Ok(()) +} diff --git a/packages/nx/src/native/db/mod.rs b/packages/nx/src/native/db/mod.rs index 0c4838ee99756..5438a42cfe414 100644 --- a/packages/nx/src/native/db/mod.rs +++ b/packages/nx/src/native/db/mod.rs @@ -1,88 +1,32 @@ -use rusqlite::OpenFlags; -use std::fs::{create_dir_all, remove_file}; -use std::path::PathBuf; -use std::time::Duration; +pub mod connection; +mod initialize; +use crate::native::db::connection::NxDbConnection; use crate::native::machine_id::get_machine_id; use napi::bindgen_prelude::External; -use rusqlite::Connection; -use tracing::debug; +use std::fs::create_dir_all; +use std::path::PathBuf; +use std::process; +use tracing::{trace, trace_span}; #[napi] pub fn connect_to_nx_db( cache_dir: String, nx_version: String, db_name: Option, -) -> anyhow::Result> { +) -> anyhow::Result> { let cache_dir_buf = PathBuf::from(cache_dir); let db_path = cache_dir_buf.join(format!("{}.db", db_name.unwrap_or_else(get_machine_id))); create_dir_all(cache_dir_buf)?; - let c = create_connection(&db_path)?; - - debug!( - "Checking if current existing database is compatible with Nx {}", - nx_version - ); - let db_version = c.query_row( - "SELECT value FROM metadata WHERE key='NX_VERSION'", - [], - |row| { - let r: String = row.get(0)?; - Ok(r) - }, - ); + let _ = trace_span!("process", id = process::id()).entered(); + trace!("Creating connection to {:?}", db_path); + let lock_file = initialize::create_lock_file(&db_path)?; - let c = match db_version { - Ok(version) if version == nx_version => c, - _ => { - debug!("Disconnecting from existing incompatible database"); - c.close().map_err(|(_, error)| anyhow::Error::from(error))?; - debug!("Removing existing incompatible database"); - remove_file(&db_path)?; + let c = initialize::initialize_db(nx_version, &db_path) + .inspect_err(|_| initialize::unlock_file(&lock_file))?; - debug!("Creating a new connection to a new database"); - create_connection(&db_path)? - } - }; - - debug!("Creating table for metadata"); - c.execute( - "CREATE TABLE IF NOT EXISTS metadata ( - key TEXT NOT NULL PRIMARY KEY, - value TEXT NOT NULL - )", - [], - )?; - - debug!("Recording Nx Version: {}", nx_version); - c.execute( - "INSERT OR REPLACE INTO metadata (key, value) VALUES ('NX_VERSION', ?)", - [nx_version], - )?; + initialize::unlock_file(&lock_file); Ok(External::new(c)) } - -fn create_connection(db_path: &PathBuf) -> anyhow::Result { - debug!("Creating connection to {:?}", db_path); - let c = Connection::open_with_flags( - db_path, - OpenFlags::SQLITE_OPEN_READ_WRITE - | OpenFlags::SQLITE_OPEN_CREATE - | OpenFlags::SQLITE_OPEN_URI - | OpenFlags::SQLITE_OPEN_FULL_MUTEX, - ) - .map_err(anyhow::Error::from)?; - - // This allows writes at the same time as reads - c.pragma_update(None, "journal_mode", "WAL")?; - - // This makes things less synchronous than default - c.pragma_update(None, "synchronous", "NORMAL")?; - - c.busy_timeout(Duration::from_millis(25))?; - c.busy_handler(Some(|tries| tries < 6))?; - - Ok(c) -} diff --git a/packages/nx/src/native/index.d.ts b/packages/nx/src/native/index.d.ts index 0b378e8302835..cc8c9af88e789 100644 --- a/packages/nx/src/native/index.d.ts +++ b/packages/nx/src/native/index.d.ts @@ -28,7 +28,7 @@ export declare class ImportResult { export declare class NxCache { cacheDirectory: string - constructor(workspaceRoot: string, cachePath: string, dbConnection: ExternalObject, linkTaskDetails?: boolean | undefined | null) + constructor(workspaceRoot: string, cachePath: string, dbConnection: ExternalObject, linkTaskDetails?: boolean | undefined | null) get(hash: string): CachedResult | null put(hash: string, terminalOutput: string, outputs: Array, code: number): void applyRemoteCacheResults(hash: string, result: CachedResult): void @@ -39,7 +39,7 @@ export declare class NxCache { } export declare class NxTaskHistory { - constructor(db: ExternalObject) + constructor(db: ExternalObject) recordTaskRuns(taskRuns: Array): void getFlakyTasks(hashes: Array): Array getEstimatedTaskTimings(targets: Array): Record @@ -56,7 +56,7 @@ export declare class RustPseudoTerminal { } export declare class TaskDetails { - constructor(db: ExternalObject) + constructor(db: ExternalObject) recordTaskDetails(tasks: Array): void } @@ -97,7 +97,7 @@ export interface CachedResult { outputsPath: string } -export declare export function connectToNxDb(cacheDir: string, nxVersion: string, dbName?: string | undefined | null): ExternalObject +export declare export function connectToNxDb(cacheDir: string, nxVersion: string, dbName?: string | undefined | null): ExternalObject export declare export function copy(src: string, dest: string): void diff --git a/packages/nx/src/native/tasks/details.rs b/packages/nx/src/native/tasks/details.rs index e6ceaefb01439..70d0c05919867 100644 --- a/packages/nx/src/native/tasks/details.rs +++ b/packages/nx/src/native/tasks/details.rs @@ -1,5 +1,6 @@ +use crate::native::db::connection::NxDbConnection; use napi::bindgen_prelude::*; -use rusqlite::{params, Connection}; +use rusqlite::params; #[napi(object)] #[derive(Default, Clone)] @@ -11,14 +12,14 @@ pub struct HashedTask { } #[napi] -struct TaskDetails { - db: External, +pub struct TaskDetails { + db: External, } #[napi] impl TaskDetails { #[napi(constructor)] - pub fn new(db: External) -> anyhow::Result { + pub fn new(db: External) -> anyhow::Result { let r = Self { db }; r.setup()?; @@ -28,8 +29,7 @@ impl TaskDetails { fn setup(&self) -> anyhow::Result<()> { self.db.execute( - " - CREATE TABLE IF NOT EXISTS task_details ( + "CREATE TABLE IF NOT EXISTS task_details ( hash TEXT PRIMARY KEY NOT NULL, project TEXT NOT NULL, target TEXT NOT NULL, diff --git a/packages/nx/src/native/tasks/task_history.rs b/packages/nx/src/native/tasks/task_history.rs index a610df290f8f9..5ee0c53904e16 100644 --- a/packages/nx/src/native/tasks/task_history.rs +++ b/packages/nx/src/native/tasks/task_history.rs @@ -1,11 +1,10 @@ -use std::rc::Rc; -use std::collections::HashMap; -use itertools::Itertools; +use crate::native::db::connection::NxDbConnection; +use crate::native::tasks::types::TaskTarget; use napi::bindgen_prelude::*; use rusqlite::vtab::array; -use rusqlite::{params, types::Value, Connection}; - -use crate::native::tasks::types::TaskTarget; +use rusqlite::{params, types::Value}; +use std::collections::HashMap; +use std::rc::Rc; #[napi(object)] pub struct TaskRun { @@ -18,13 +17,13 @@ pub struct TaskRun { #[napi] pub struct NxTaskHistory { - db: External, + db: External, } #[napi] impl NxTaskHistory { #[napi(constructor)] - pub fn new(db: External) -> anyhow::Result { + pub fn new(db: External) -> anyhow::Result { let s = Self { db }; s.setup()?; @@ -33,11 +32,11 @@ impl NxTaskHistory { } fn setup(&self) -> anyhow::Result<()> { - array::load_module(&self.db)?; + array::load_module(&self.db.conn)?; self.db .execute_batch( " - BEGIN; + BEGIN IMMEDIATE; CREATE TABLE IF NOT EXISTS task_history ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, hash TEXT NOT NULL, @@ -99,16 +98,21 @@ impl NxTaskHistory { } #[napi] - pub fn get_estimated_task_timings(&self, targets: Vec) -> anyhow::Result> { + pub fn get_estimated_task_timings( + &self, + targets: Vec, + ) -> anyhow::Result> { let values = Rc::new( targets .iter() - .map(|t| Value::from( - match &t.configuration { - Some(configuration) => format!("{}:{}:{}", t.project, t.target, configuration), - _ => format!("{}:{}", t.project, t.target) - } - )) + .map(|t| { + Value::from(match &t.configuration { + Some(configuration) => { + format!("{}:{}:{}", t.project, t.target, configuration) + } + _ => format!("{}:{}", t.project, t.target), + }) + }) .collect::>(), ); diff --git a/packages/nx/src/native/tests/cache.spec.ts b/packages/nx/src/native/tests/cache.spec.ts index b465ceba8078b..d577cea4eae9f 100644 --- a/packages/nx/src/native/tests/cache.spec.ts +++ b/packages/nx/src/native/tests/cache.spec.ts @@ -3,23 +3,21 @@ import { join } from 'path'; import { TempFs } from '../../internal-testing-utils/temp-fs'; import { rmSync } from 'fs'; import { getDbConnection } from '../../utils/db-connection'; +import { randomBytes } from 'crypto'; describe('Cache', () => { let cache: NxCache; let tempFs: TempFs; let taskDetails: TaskDetails; + const dbOutputFolder = 'temp-db-cache'; beforeEach(() => { tempFs = new TempFs('cache'); - rmSync(join(__dirname, 'temp-db'), { - recursive: true, - force: true, - }); const dbConnection = getDbConnection({ - directory: join(__dirname, 'temp-db'), + directory: join(__dirname, dbOutputFolder), + dbName: `temp-db-${randomBytes(4).toString('hex')}`, }); - taskDetails = new TaskDetails(dbConnection); cache = new NxCache( @@ -38,6 +36,13 @@ describe('Cache', () => { ]); }); + afterAll(() => { + rmSync(join(__dirname, dbOutputFolder), { + recursive: true, + force: true, + }); + }); + it('should store results into cache', async () => { const result = cache.get('123'); diff --git a/packages/nx/src/native/tests/task_history.spec.ts b/packages/nx/src/native/tests/task_history.spec.ts index 2e891de940459..991663459f784 100644 --- a/packages/nx/src/native/tests/task_history.spec.ts +++ b/packages/nx/src/native/tests/task_history.spec.ts @@ -3,7 +3,9 @@ import { join } from 'path'; import { TempFs } from '../../internal-testing-utils/temp-fs'; import { rmSync } from 'fs'; import { getDbConnection } from '../../utils/db-connection'; +import { randomBytes } from 'crypto'; +const dbOutputFolder = 'temp-db-task'; describe('NxTaskHistory', () => { let taskHistory: NxTaskHistory; let tempFs: TempFs; @@ -12,13 +14,9 @@ describe('NxTaskHistory', () => { beforeEach(() => { tempFs = new TempFs('task-history'); - rmSync(join(__dirname, 'temp-db'), { - recursive: true, - force: true, - }); - const dbConnection = getDbConnection({ - directory: join(__dirname, 'temp-db'), + directory: join(__dirname, dbOutputFolder), + dbName: `temp-db-${randomBytes(4).toString('hex')}`, }); taskHistory = new NxTaskHistory(dbConnection); taskDetails = new TaskDetails(dbConnection); @@ -40,6 +38,13 @@ describe('NxTaskHistory', () => { ]); }); + afterAll(() => { + rmSync(join(__dirname, dbOutputFolder), { + recursive: true, + force: true, + }); + }); + it('should record task history', () => { taskHistory.recordTaskRuns([ { diff --git a/packages/nx/src/tasks-runner/cache.ts b/packages/nx/src/tasks-runner/cache.ts index 3730916ae0f62..e24d61b135291 100644 --- a/packages/nx/src/tasks-runner/cache.ts +++ b/packages/nx/src/tasks-runner/cache.ts @@ -54,6 +54,8 @@ export class DbCache { private remoteCache: RemoteCacheV2 | null; private remoteCachePromise: Promise; + private isVerbose = process.env.NX_VERBOSE_LOGGING === 'true'; + constructor(private readonly options: { nxCloudRemoteCache: RemoteCache }) {} async init() {