From 0c87751a251922bc05c7371b09aff36ed30881df Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Fri, 29 Sep 2023 11:32:57 +0200 Subject: [PATCH 01/12] feat: Do not preemptively drop implicit job token. --- src/lib.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6257fa0df..384b2b3ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1293,7 +1293,12 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::{mpsc, Once}; + use std::sync::{ + mpsc::{self, Receiver, Sender}, + Once, + }; + + use jobserver::{Acquired, Client}; if objs.len() <= 1 { for obj in objs { @@ -1309,10 +1314,54 @@ impl Build { // easier to write loop below. If this fails, though, then we're likely // on Windows with the main implicit token, so we just have a bit extra // parallelism for a bit and don't reacquire later. - let server = jobserver(); + let server = jobserver().clone(); // Reacquire our process's token on drop - let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server)); - + //let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server)); + struct JobToken { + owned: Option, + pool: Sender>, + should_return_to_queue: bool, + } + impl Drop for JobToken { + fn drop(&mut self) { + if self.should_return_to_queue { + let _ = self.pool.send(self.owned.take()); + } + } + } + struct JobTokenServer { + helper: jobserver::HelperThread, + tx: Sender>, + rx: Receiver>, + } + impl JobTokenServer { + fn new(client: Client) -> Result { + let (tx, rx) = std::sync::mpsc::channel(); + tx.send(None).unwrap(); + let pool = tx.clone(); + let helper = client.into_helper_thread(move |acq| { + let _ = pool.send(Some(acq.unwrap())); + })?; + Ok(Self { helper, tx, rx }) + } + fn acquire(&mut self) -> JobToken { + if let Ok(token) = self.rx.try_recv() { + JobToken { + owned: token, + pool: self.tx.clone(), + should_return_to_queue: true, + } + } else { + self.helper.request_token(); + let token = self.rx.recv().unwrap(); + JobToken { + owned: token, + pool: self.tx.clone(), + should_return_to_queue: true, + } + } + } + } // When compiling objects in parallel we do a few dirty tricks to speed // things up: // @@ -1332,7 +1381,7 @@ impl Build { // acquire the appropriate tokens, Once all objects have been compiled // we wait on all the processes and propagate the results of compilation. - let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>(); + let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, JobToken)>(); // Since jobserver::Client::acquire can block, waiting // must be done in parallel so that acquire won't block forever. @@ -1345,7 +1394,7 @@ impl Build { loop { let mut has_made_progress = false; - + let mut is_disconnected = false; // Reading new pending tasks loop { match rx.try_recv() { @@ -1361,14 +1410,21 @@ impl Build { Ok(()) }; } + Err(mpsc::TryRecvError::Disconnected) => { + is_disconnected = true; + break; + } _ => break, } } // Try waiting on them. - pendings.retain_mut(|(cmd, program, child, _)| { + pendings.retain_mut(|(cmd, program, child, token)| { match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { Ok(Some(())) => { + if is_disconnected { + token.should_return_to_queue = false; + } // Task done, remove the entry has_made_progress = true; false @@ -1377,7 +1433,9 @@ impl Build { Err(err) => { // Task fail, remove the entry. has_made_progress = true; - + if is_disconnected { + token.should_return_to_queue = false; + } // Since we can only return one error, log the error to make // sure users always see all the compilation failures. let _ = writeln!(stdout, "cargo:warning={}", err); @@ -1415,11 +1473,10 @@ impl Build { }; } })?; - + let mut tokens = JobTokenServer::new(server)?; for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = server.acquire()?; - + let token = tokens.acquire(); let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; tx.send((cmd, program, KillOnDrop(child), token)) From d0a8a49fa8f5ece55d2685e7d06cb6ad479019c6 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sat, 30 Sep 2023 02:29:00 +0200 Subject: [PATCH 02/12] Move implementation to a separate module and document a bunch --- src/job_token.rs | 66 ++++++++++++++++++++++++++++++ src/lib.rs | 104 +++++++---------------------------------------- 2 files changed, 81 insertions(+), 89 deletions(-) create mode 100644 src/job_token.rs diff --git a/src/job_token.rs b/src/job_token.rs new file mode 100644 index 000000000..4c3a51e3e --- /dev/null +++ b/src/job_token.rs @@ -0,0 +1,66 @@ +use jobserver::{Acquired, Client, HelperThread}; +use std::sync::mpsc::{self, Receiver, Sender}; + +pub(crate) struct JobToken { + /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. + /// Both are valid values to put into queue. + token: Option, + pool: Sender>, + should_return_to_queue: bool, +} + +impl Drop for JobToken { + fn drop(&mut self) { + if self.should_return_to_queue { + let _ = self.pool.send(self.token.take()); + } + } +} + +impl JobToken { + /// Ensure that this token is not put back into queue once it's dropped. + /// This also leads to releasing it sooner for other processes to use, which is a good thing to do once you know that + /// you're never going to request a token in this process again. + pub(crate) fn forget(&mut self) { + self.should_return_to_queue = false; + } +} + +/// A thin wrapper around jobserver's Client. +/// It would be perfectly fine to just use that, but we also want to reuse our own implicit token assigned for this build script. +/// This struct manages that and gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. +/// Furthermore, instead of giving up job tokens, it keeps them around for reuse if we know we're going to request another token after freeing the current one. +pub(crate) struct JobTokenServer { + helper: HelperThread, + tx: Sender>, + rx: Receiver>, +} + +impl JobTokenServer { + pub(crate) fn new(client: Client) -> Result { + let (tx, rx) = mpsc::channel(); + // Initialize the + tx.send(None).unwrap(); + let pool = tx.clone(); + let helper = client.into_helper_thread(move |acq| { + let _ = pool.send(Some(acq.unwrap())); + })?; + Ok(Self { helper, tx, rx }) + } + + pub(crate) fn acquire(&mut self) -> JobToken { + let token = if let Ok(token) = self.rx.try_recv() { + // Opportunistically check if we already have a token for our own reuse. + token + } else { + // Cold path, request a token and block + self.helper.request_token(); + self.rx.recv().unwrap() + }; + JobToken { + token, + pool: self.tx.clone(), + should_return_to_queue: true, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 384b2b3ba..7ffdfcd6c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; +#[cfg(feature = "parallel")] +mod job_token; mod os_pipe; - // These modules are all glue to support reading the MSVC version from // the registry and from COM interfaces #[cfg(windows)] @@ -1293,12 +1294,7 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::{ - mpsc::{self, Receiver, Sender}, - Once, - }; - - use jobserver::{Acquired, Client}; + use std::sync::mpsc; if objs.len() <= 1 { for obj in objs { @@ -1309,59 +1305,10 @@ impl Build { return Ok(()); } - // Limit our parallelism globally with a jobserver. Start off by - // releasing our own token for this process so we can have a bit of an - // easier to write loop below. If this fails, though, then we're likely - // on Windows with the main implicit token, so we just have a bit extra - // parallelism for a bit and don't reacquire later. - let server = jobserver().clone(); + // Limit our parallelism globally with a jobserver. + let server = unsafe { default_jobserver() }; // Reacquire our process's token on drop - //let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server)); - struct JobToken { - owned: Option, - pool: Sender>, - should_return_to_queue: bool, - } - impl Drop for JobToken { - fn drop(&mut self) { - if self.should_return_to_queue { - let _ = self.pool.send(self.owned.take()); - } - } - } - struct JobTokenServer { - helper: jobserver::HelperThread, - tx: Sender>, - rx: Receiver>, - } - impl JobTokenServer { - fn new(client: Client) -> Result { - let (tx, rx) = std::sync::mpsc::channel(); - tx.send(None).unwrap(); - let pool = tx.clone(); - let helper = client.into_helper_thread(move |acq| { - let _ = pool.send(Some(acq.unwrap())); - })?; - Ok(Self { helper, tx, rx }) - } - fn acquire(&mut self) -> JobToken { - if let Ok(token) = self.rx.try_recv() { - JobToken { - owned: token, - pool: self.tx.clone(), - should_return_to_queue: true, - } - } else { - self.helper.request_token(); - let token = self.rx.recv().unwrap(); - JobToken { - owned: token, - pool: self.tx.clone(), - should_return_to_queue: true, - } - } - } - } + // When compiling objects in parallel we do a few dirty tricks to speed // things up: // @@ -1381,7 +1328,7 @@ impl Build { // acquire the appropriate tokens, Once all objects have been compiled // we wait on all the processes and propagate the results of compilation. - let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, JobToken)>(); + let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>(); // Since jobserver::Client::acquire can block, waiting // must be done in parallel so that acquire won't block forever. @@ -1394,6 +1341,10 @@ impl Build { loop { let mut has_made_progress = false; + // If the other end of the pipe is already disconnected, then we're not gonna get any new jobs, + // so it doesn't make sense to reuse the tokens; in fact, releasing them as soon as possible (once we know that the other end is disconnected) is beneficial. + // Imagine that the last file built takes an hour to finish; in this scenario, by not releasing the tokens before other builds are done we'd effectively block other processes from + // starting sooner - even though we only need one token, not however many we've acquired. let mut is_disconnected = false; // Reading new pending tasks loop { @@ -1422,10 +1373,10 @@ impl Build { pendings.retain_mut(|(cmd, program, child, token)| { match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) { Ok(Some(())) => { + // Task done, remove the entry if is_disconnected { - token.should_return_to_queue = false; + token.forget(); } - // Task done, remove the entry has_made_progress = true; false } @@ -1434,7 +1385,7 @@ impl Build { // Task fail, remove the entry. has_made_progress = true; if is_disconnected { - token.should_return_to_queue = false; + token.forget(); } // Since we can only return one error, log the error to make // sure users always see all the compilation failures. @@ -1473,7 +1424,7 @@ impl Build { }; } })?; - let mut tokens = JobTokenServer::new(server)?; + let mut tokens = crate::job_token::JobTokenServer::new(server)?; for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; let token = tokens.acquire(); @@ -1487,24 +1438,6 @@ impl Build { return wait_thread.join().expect("wait_thread panics"); - /// Returns a suitable `jobserver::Client` used to coordinate - /// parallelism between build scripts. - fn jobserver() -> &'static jobserver::Client { - static INIT: Once = Once::new(); - static mut JOBSERVER: Option = None; - - fn _assert_sync() {} - _assert_sync::(); - - unsafe { - INIT.call_once(|| { - let server = default_jobserver(); - JOBSERVER = Some(server); - }); - JOBSERVER.as_ref().unwrap() - } - } - unsafe fn default_jobserver() -> jobserver::Client { // Try to use the environmental jobserver which Cargo typically // initializes for us... @@ -1541,13 +1474,6 @@ impl Build { child.kill().ok(); } } - - struct JobserverToken(&'static jobserver::Client); - impl Drop for JobserverToken { - fn drop(&mut self) { - let _ = self.0.acquire_raw(); - } - } } #[cfg(not(feature = "parallel"))] From ee5dee70fe58966abc45364a101c03d95ecc5881 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sat, 30 Sep 2023 02:37:52 +0200 Subject: [PATCH 03/12] Readd reusable global jobserver connection --- src/lib.rs | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7ffdfcd6c..cf180d134 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1294,7 +1294,7 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::mpsc; + use std::sync::{mpsc, Once}; if objs.len() <= 1 { for obj in objs { @@ -1306,7 +1306,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let server = unsafe { default_jobserver() }; + let server = jobserver(); // Reacquire our process's token on drop // When compiling objects in parallel we do a few dirty tricks to speed @@ -1438,6 +1438,24 @@ impl Build { return wait_thread.join().expect("wait_thread panics"); + /// Returns a suitable `jobserver::Client` used to coordinate + /// parallelism between build scripts. + fn jobserver() -> jobserver::Client { + static INIT: Once = Once::new(); + static mut JOBSERVER: Option = None; + + fn _assert_sync() {} + _assert_sync::(); + + unsafe { + INIT.call_once(|| { + let server = default_jobserver(); + JOBSERVER = Some(server); + }); + JOBSERVER.clone().unwrap() + } + } + unsafe fn default_jobserver() -> jobserver::Client { // Try to use the environmental jobserver which Cargo typically // initializes for us... From 36371277d24d7a8baecd77f8bb951eda8aee5479 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sat, 30 Sep 2023 02:43:04 +0200 Subject: [PATCH 04/12] Fix up a comment --- src/job_token.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/job_token.rs b/src/job_token.rs index 4c3a51e3e..afb97cde4 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -39,7 +39,8 @@ pub(crate) struct JobTokenServer { impl JobTokenServer { pub(crate) fn new(client: Client) -> Result { let (tx, rx) = mpsc::channel(); - // Initialize the + // Push the implicit token. Since JobTokens only give back what they got, + // there should be at most one global implicit token in the wild. tx.send(None).unwrap(); let pool = tx.clone(); let helper = client.into_helper_thread(move |acq| { From 4ca780838a8f20f5e954f70f2be7a626276754e9 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sat, 30 Sep 2023 03:02:49 +0200 Subject: [PATCH 05/12] Further documentation refinements --- src/job_token.rs | 15 +++++++++------ src/lib.rs | 8 +++++--- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index afb97cde4..28ec43f8a 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -19,17 +19,20 @@ impl Drop for JobToken { impl JobToken { /// Ensure that this token is not put back into queue once it's dropped. - /// This also leads to releasing it sooner for other processes to use, which is a good thing to do once you know that - /// you're never going to request a token in this process again. + /// This also leads to releasing it sooner for other processes to use, + /// which is a correct thing to do once it is known that there won't be + /// any more token acquisitions. pub(crate) fn forget(&mut self) { self.should_return_to_queue = false; } } /// A thin wrapper around jobserver's Client. -/// It would be perfectly fine to just use that, but we also want to reuse our own implicit token assigned for this build script. -/// This struct manages that and gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. -/// Furthermore, instead of giving up job tokens, it keeps them around for reuse if we know we're going to request another token after freeing the current one. +/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse +/// our own implicit token assigned for this build script. This struct manages that and +/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. +/// Furthermore, instead of giving up job tokens, it keeps them around +/// for reuse if we know we're going to request another token after freeing the current one. pub(crate) struct JobTokenServer { helper: HelperThread, tx: Sender>, @@ -51,7 +54,7 @@ impl JobTokenServer { pub(crate) fn acquire(&mut self) -> JobToken { let token = if let Ok(token) = self.rx.try_recv() { - // Opportunistically check if we already have a token for our own reuse. + // Opportunistically check if there's a token that can be reused. token } else { // Cold path, request a token and block diff --git a/src/lib.rs b/src/lib.rs index cf180d134..ed7d1bf66 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1342,9 +1342,11 @@ impl Build { loop { let mut has_made_progress = false; // If the other end of the pipe is already disconnected, then we're not gonna get any new jobs, - // so it doesn't make sense to reuse the tokens; in fact, releasing them as soon as possible (once we know that the other end is disconnected) is beneficial. - // Imagine that the last file built takes an hour to finish; in this scenario, by not releasing the tokens before other builds are done we'd effectively block other processes from - // starting sooner - even though we only need one token, not however many we've acquired. + // so it doesn't make sense to reuse the tokens; in fact, + // releasing them as soon as possible (once we know that the other end is disconnected) is beneficial. + // Imagine that the last file built takes an hour to finish; in this scenario, + // by not releasing the tokens before that last file is done we would effectively block other processes from + // starting sooner - even though we only need one token for that last file, not N others that were acquired. let mut is_disconnected = false; // Reading new pending tasks loop { From 1b025d810d87258fa2af1f9043b9bf99ab7ae326 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sun, 1 Oct 2023 18:33:01 +0200 Subject: [PATCH 06/12] Move jobserver func into job_token module --- src/job_token.rs | 53 +++++++++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 49 ++------------------------------------------ 2 files changed, 54 insertions(+), 48 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 28ec43f8a..9f0518f41 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,5 +1,11 @@ use jobserver::{Acquired, Client, HelperThread}; -use std::sync::mpsc::{self, Receiver, Sender}; +use std::{ + env, + sync::{ + mpsc::{self, Receiver, Sender}, + Once, + }, +}; pub(crate) struct JobToken { /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. @@ -68,3 +74,48 @@ impl JobTokenServer { } } } + +/// Returns a suitable `jobserver::Client` used to coordinate +/// parallelism between build scripts. +pub(super) fn jobserver() -> jobserver::Client { + static INIT: Once = Once::new(); + static mut JOBSERVER: Option = None; + + fn _assert_sync() {} + _assert_sync::(); + + unsafe { + INIT.call_once(|| { + let server = default_jobserver(); + JOBSERVER = Some(server); + }); + JOBSERVER.clone().unwrap() + } +} + +unsafe fn default_jobserver() -> jobserver::Client { + // Try to use the environmental jobserver which Cargo typically + // initializes for us... + if let Some(client) = jobserver::Client::from_env() { + return client; + } + + // ... but if that fails for whatever reason select something + // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's + // configured by Cargo) and otherwise just fall back to a + // semi-reasonable number. Note that we could use `num_cpus` here + // but it's an extra dependency that will almost never be used, so + // it's generally not too worth it. + let mut parallelism = 4; + if let Ok(amt) = env::var("NUM_JOBS") { + if let Ok(amt) = amt.parse() { + parallelism = amt; + } + } + + // If we create our own jobserver then be sure to reserve one token + // for ourselves. + let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); + client.acquire_raw().expect("failed to acquire initial"); + return client; +} diff --git a/src/lib.rs b/src/lib.rs index ed7d1bf66..4be136757 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1294,7 +1294,7 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::sync::{mpsc, Once}; + use std::sync::mpsc; if objs.len() <= 1 { for obj in objs { @@ -1306,7 +1306,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let server = jobserver(); + let server = job_token::jobserver(); // Reacquire our process's token on drop // When compiling objects in parallel we do a few dirty tricks to speed @@ -1440,51 +1440,6 @@ impl Build { return wait_thread.join().expect("wait_thread panics"); - /// Returns a suitable `jobserver::Client` used to coordinate - /// parallelism between build scripts. - fn jobserver() -> jobserver::Client { - static INIT: Once = Once::new(); - static mut JOBSERVER: Option = None; - - fn _assert_sync() {} - _assert_sync::(); - - unsafe { - INIT.call_once(|| { - let server = default_jobserver(); - JOBSERVER = Some(server); - }); - JOBSERVER.clone().unwrap() - } - } - - unsafe fn default_jobserver() -> jobserver::Client { - // Try to use the environmental jobserver which Cargo typically - // initializes for us... - if let Some(client) = jobserver::Client::from_env() { - return client; - } - - // ... but if that fails for whatever reason select something - // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's - // configured by Cargo) and otherwise just fall back to a - // semi-reasonable number. Note that we could use `num_cpus` here - // but it's an extra dependency that will almost never be used, so - // it's generally not too worth it. - let mut parallelism = 4; - if let Ok(amt) = env::var("NUM_JOBS") { - if let Ok(amt) = amt.parse() { - parallelism = amt; - } - } - - // If we create our own jobserver then be sure to reserve one token - // for ourselves. - let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); - client.acquire_raw().expect("failed to acquire initial"); - return client; - } - struct KillOnDrop(Child); impl Drop for KillOnDrop { From 52afaf1080f1032ad0e44c07501f7517f7403ebf Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Sun, 1 Oct 2023 18:39:10 +0200 Subject: [PATCH 07/12] Remove should_return_to_queue member in favor of wrapping pool in an Option --- src/job_token.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 9f0518f41..c485f8bed 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -11,14 +11,15 @@ pub(crate) struct JobToken { /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. /// Both are valid values to put into queue. token: Option, - pool: Sender>, - should_return_to_queue: bool, + /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead + /// of storing it back in the pool - see [`Self::forget()`] function for that. + pool: Option>>, } impl Drop for JobToken { fn drop(&mut self) { - if self.should_return_to_queue { - let _ = self.pool.send(self.token.take()); + if let Some(pool) = &self.pool { + let _ = pool.send(self.token.take()); } } } @@ -29,7 +30,7 @@ impl JobToken { /// which is a correct thing to do once it is known that there won't be /// any more token acquisitions. pub(crate) fn forget(&mut self) { - self.should_return_to_queue = false; + self.pool.take(); } } @@ -69,8 +70,7 @@ impl JobTokenServer { }; JobToken { token, - pool: self.tx.clone(), - should_return_to_queue: true, + pool: Some(self.tx.clone()), } } } From 3d458416426402d403b1ff41d61b53625cfc39f8 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 2 Oct 2023 12:28:03 +0200 Subject: [PATCH 08/12] Make jobserver initialization private in job_token mod --- src/job_token.rs | 7 ++++--- src/lib.rs | 4 +--- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index c485f8bed..f5e69b8f7 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,4 +1,4 @@ -use jobserver::{Acquired, Client, HelperThread}; +use jobserver::{Acquired, HelperThread}; use std::{ env, sync::{ @@ -47,7 +47,8 @@ pub(crate) struct JobTokenServer { } impl JobTokenServer { - pub(crate) fn new(client: Client) -> Result { + pub(crate) fn new() -> Result { + let client = jobserver(); let (tx, rx) = mpsc::channel(); // Push the implicit token. Since JobTokens only give back what they got, // there should be at most one global implicit token in the wild. @@ -77,7 +78,7 @@ impl JobTokenServer { /// Returns a suitable `jobserver::Client` used to coordinate /// parallelism between build scripts. -pub(super) fn jobserver() -> jobserver::Client { +fn jobserver() -> jobserver::Client { static INIT: Once = Once::new(); static mut JOBSERVER: Option = None; diff --git a/src/lib.rs b/src/lib.rs index 4be136757..d3ed7dc8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1306,8 +1306,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let server = job_token::jobserver(); - // Reacquire our process's token on drop + let mut tokens = crate::job_token::JobTokenServer::new()?; // When compiling objects in parallel we do a few dirty tricks to speed // things up: @@ -1426,7 +1425,6 @@ impl Build { }; } })?; - let mut tokens = crate::job_token::JobTokenServer::new(server)?; for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; let token = tokens.acquire(); From 280c673b1e0d31bfe381a93e4af4f25dba061686 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 2 Oct 2023 20:01:05 +0200 Subject: [PATCH 09/12] Remove unnecessary mut on acquire fn --- src/job_token.rs | 4 ++-- src/lib.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index f5e69b8f7..9deaf82e8 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -40,7 +40,7 @@ impl JobToken { /// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. /// Furthermore, instead of giving up job tokens, it keeps them around /// for reuse if we know we're going to request another token after freeing the current one. -pub(crate) struct JobTokenServer { +pub(crate) struct GlobalJobTokenServer { helper: HelperThread, tx: Sender>, rx: Receiver>, @@ -60,7 +60,7 @@ impl JobTokenServer { Ok(Self { helper, tx, rx }) } - pub(crate) fn acquire(&mut self) -> JobToken { + pub(crate) fn acquire(&self) -> JobToken { let token = if let Ok(token) = self.rx.try_recv() { // Opportunistically check if there's a token that can be reused. token diff --git a/src/lib.rs b/src/lib.rs index d3ed7dc8f..d2cac31e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1306,7 +1306,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let mut tokens = crate::job_token::JobTokenServer::new()?; + let tokens = crate::job_token::JobTokenServer::new()?; // When compiling objects in parallel we do a few dirty tricks to speed // things up: From d55c382da734a0198f8e92bbeddec0314fb1b0b9 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Mon, 2 Oct 2023 20:15:06 +0200 Subject: [PATCH 10/12] Use shared global JobTokenServer --- src/job_token.rs | 28 ++++++++++++++++++---------- src/lib.rs | 2 +- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 9deaf82e8..f1c218875 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,4 +1,4 @@ -use jobserver::{Acquired, HelperThread}; +use jobserver::{Acquired, Client, HelperThread}; use std::{ env, sync::{ @@ -40,15 +40,17 @@ impl JobToken { /// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. /// Furthermore, instead of giving up job tokens, it keeps them around /// for reuse if we know we're going to request another token after freeing the current one. -pub(crate) struct GlobalJobTokenServer { +pub(crate) struct JobTokenServer { helper: HelperThread, tx: Sender>, rx: Receiver>, } impl JobTokenServer { - pub(crate) fn new() -> Result { - let client = jobserver(); + pub(crate) fn new() -> &'static Self { + jobserver() + } + fn new_inner(client: Client) -> Result { let (tx, rx) = mpsc::channel(); // Push the implicit token. Since JobTokens only give back what they got, // there should be at most one global implicit token in the wild. @@ -76,11 +78,16 @@ impl JobTokenServer { } } -/// Returns a suitable `jobserver::Client` used to coordinate -/// parallelism between build scripts. -fn jobserver() -> jobserver::Client { +/// Returns a suitable `JobTokenServer` used to coordinate +/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures +/// that only one implicit job token is used in the wild. +/// Having multiple separate job token servers would lead to each of them assuming that they have control +/// over the implicit job token. +/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most +/// one implicit job token in the wild. +fn jobserver() -> &'static JobTokenServer { static INIT: Once = Once::new(); - static mut JOBSERVER: Option = None; + static mut JOBSERVER: Option = None; fn _assert_sync() {} _assert_sync::(); @@ -88,9 +95,10 @@ fn jobserver() -> jobserver::Client { unsafe { INIT.call_once(|| { let server = default_jobserver(); - JOBSERVER = Some(server); + JOBSERVER = + Some(JobTokenServer::new_inner(server).expect("Job server initialization failed")); }); - JOBSERVER.clone().unwrap() + JOBSERVER.as_ref().unwrap() } } diff --git a/src/lib.rs b/src/lib.rs index d2cac31e6..93915f6eb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1306,7 +1306,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let tokens = crate::job_token::JobTokenServer::new()?; + let tokens = crate::job_token::JobTokenServer::new(); // When compiling objects in parallel we do a few dirty tricks to speed // things up: From 985dbaeb928093c47402133b9be3402554372fe4 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Tue, 3 Oct 2023 15:00:33 +0200 Subject: [PATCH 11/12] Change Option to MaybeUninit --- src/job_token.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index f1c218875..006586bf3 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -1,6 +1,7 @@ use jobserver::{Acquired, Client, HelperThread}; use std::{ env, + mem::MaybeUninit, sync::{ mpsc::{self, Receiver, Sender}, Once, @@ -87,7 +88,7 @@ impl JobTokenServer { /// one implicit job token in the wild. fn jobserver() -> &'static JobTokenServer { static INIT: Once = Once::new(); - static mut JOBSERVER: Option = None; + static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); fn _assert_sync() {} _assert_sync::(); @@ -95,10 +96,12 @@ fn jobserver() -> &'static JobTokenServer { unsafe { INIT.call_once(|| { let server = default_jobserver(); - JOBSERVER = - Some(JobTokenServer::new_inner(server).expect("Job server initialization failed")); + JOBSERVER = MaybeUninit::new( + JobTokenServer::new_inner(server).expect("Job server initialization failed"), + ); }); - JOBSERVER.as_ref().unwrap() + // Poor man's assume_init_ref, as that'd require a MSRV of 1.55. + &*JOBSERVER.as_ptr() } } From 65ab3711cffba13dd5188ae4fbcc1bd3c37ef5d5 Mon Sep 17 00:00:00 2001 From: Piotr Osiewicz <24362066+osiewicz@users.noreply.github.com> Date: Tue, 3 Oct 2023 15:10:40 +0200 Subject: [PATCH 12/12] Convert internal channel to use Result and do not panic in helper thread --- src/job_token.rs | 22 ++++++++++++++-------- src/lib.rs | 2 +- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/job_token.rs b/src/job_token.rs index 006586bf3..818917c8d 100644 --- a/src/job_token.rs +++ b/src/job_token.rs @@ -14,13 +14,14 @@ pub(crate) struct JobToken { token: Option, /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead /// of storing it back in the pool - see [`Self::forget()`] function for that. - pool: Option>>, + pool: Option>>>, } impl Drop for JobToken { fn drop(&mut self) { if let Some(pool) = &self.pool { - let _ = pool.send(self.token.take()); + // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. + let _ = pool.send(self.token.take().map(|token| Ok(token))); } } } @@ -43,8 +44,8 @@ impl JobToken { /// for reuse if we know we're going to request another token after freeing the current one. pub(crate) struct JobTokenServer { helper: HelperThread, - tx: Sender>, - rx: Receiver>, + tx: Sender>>, + rx: Receiver>>, } impl JobTokenServer { @@ -58,12 +59,12 @@ impl JobTokenServer { tx.send(None).unwrap(); let pool = tx.clone(); let helper = client.into_helper_thread(move |acq| { - let _ = pool.send(Some(acq.unwrap())); + let _ = pool.send(Some(acq.map_err(|e| e.into()))); })?; Ok(Self { helper, tx, rx }) } - pub(crate) fn acquire(&self) -> JobToken { + pub(crate) fn acquire(&self) -> Result { let token = if let Ok(token) = self.rx.try_recv() { // Opportunistically check if there's a token that can be reused. token @@ -72,10 +73,15 @@ impl JobTokenServer { self.helper.request_token(); self.rx.recv().unwrap() }; - JobToken { + let token = if let Some(token) = token { + Some(token?) + } else { + None + }; + Ok(JobToken { token, pool: Some(self.tx.clone()), - } + }) } } diff --git a/src/lib.rs b/src/lib.rs index 93915f6eb..21d4e0e45 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1427,7 +1427,7 @@ impl Build { })?; for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; - let token = tokens.acquire(); + let token = tokens.acquire()?; let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?; tx.send((cmd, program, KillOnDrop(child), token))