Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid memory leaks #46

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
- Replace dependency on threadpool crate with a custom solution built on the
standard library only, and only using scoped threads
-> fixes memory leaks observed when running under valgrind
- up MSRV to 1.63 for scoped threads

## [0.7.3] - 2024-05-10
- Default to single-threaded tests for WebAssembly (thanks @alexcrichton) in [#41](https://github.com/LukasKalbertodt/libtest-mimic/pull/41)
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libtest-mimic"
version = "0.7.3"
authors = ["Lukas Kalbertodt <[email protected]>"]
edition = "2021"
rust-version = "1.60"
rust-version = "1.63"

description = """
Write your own test harness that looks and behaves like the built-in test \
Expand All @@ -20,7 +20,6 @@ exclude = [".github"]

[dependencies]
clap = { version = "4.0.8", features = ["derive"] }
threadpool = "1.8.1"
termcolor = "1.0.5"
escape8259 = "0.5.2"

Expand Down
52 changes: 32 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,22 @@

#![forbid(unsafe_code)]

use std::{borrow::Cow, fmt, process::{self, ExitCode}, sync::mpsc, time::Instant};
use std::{
borrow::Cow,
fmt,
process::{self, ExitCode},
sync::mpsc,
time::Instant,
};

mod args;
mod pool;
mod printer;

use printer::Printer;
use threadpool::ThreadPool;

pub use crate::args::{Arguments, ColorSetting, FormatSetting};



/// A single test or benchmark.
///
/// The original `libtest` often calls benchmarks "tests", which is a bit
Expand Down Expand Up @@ -143,8 +147,9 @@ impl Trial {
Err(failed) => Outcome::Failed(failed),
Ok(_) if test_mode => Outcome::Passed,
Ok(Some(measurement)) => Outcome::Measured(measurement),
Ok(None)
=> Outcome::Failed("bench runner returned `Ok(None)` in bench mode".into()),
Ok(None) => {
Outcome::Failed("bench runner returned `Ok(None)` in bench mode".into())
}
}),
info: TestInfo {
name: name.into(),
Expand Down Expand Up @@ -284,13 +289,11 @@ impl Failed {
impl<M: std::fmt::Display> From<M> for Failed {
fn from(msg: M) -> Self {
Self {
msg: Some(msg.to_string())
msg: Some(msg.to_string()),
}
}
}



/// The outcome of performing a test/benchmark.
#[derive(Debug, Clone)]
enum Outcome {
Expand Down Expand Up @@ -473,15 +476,22 @@ pub fn run(args: &Arguments, mut tests: Vec<Trial>) -> Conclusion {
Outcome::Failed(failed) => {
failed_tests.push((test, failed.msg));
conclusion.num_failed += 1;
},
}
Outcome::Ignored => conclusion.num_ignored += 1,
Outcome::Measured(_) => conclusion.num_measured += 1,
}
};

// Execute all tests.
let test_mode = !args.bench;
if platform_defaults_to_one_thread() || args.test_threads == Some(1) {

let num_threads = platform_defaults_to_one_thread()
.then_some(1)
.or(args.test_threads)
.or_else(|| std::thread::available_parallelism().ok().map(Into::into))
.unwrap_or(1);

if num_threads == 1 {
// Run test sequentially in main thread
for test in tests {
// Print `test foo ...`, run the test, then print the outcome in
Expand All @@ -496,28 +506,29 @@ pub fn run(args: &Arguments, mut tests: Vec<Trial>) -> Conclusion {
}
} else {
// Run test in thread pool.
let pool = match args.test_threads {
Some(num_threads) => ThreadPool::new(num_threads),
None => ThreadPool::default()
};
let num_tests = tests.len();
let (sender, receiver) = mpsc::channel();

let num_tests = tests.len();
Comment on lines +509 to -505
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move the let num_tests line to its previous position (right above the loop) to reduce the diff size?

for test in tests {
let mut tasks: Vec<pool::BoxedTask> = Default::default();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this work?

Suggested change
let mut tasks: Vec<pool::BoxedTask> = Default::default();
let mut tasks = Vec::new();


for test in tests.into_iter() {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for test in tests.into_iter() {
for test in tests {

Should work?

if args.is_ignored(&test) {
sender.send((Outcome::Ignored, test.info)).unwrap();
} else {
let sender = sender.clone();
pool.execute(move || {

tasks.push(Box::new(move || {
// It's fine to ignore the result of sending. If the
// receiver has hung up, everything will wind down soon
// anyway.
let outcome = run_single(test.runner, test_mode);
let _ = sender.send((outcome, test.info));
});
}));
}
}

pool::scoped_run_tasks(tasks, num_threads);

Comment on lines +530 to +531
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is pretty short and only used in a single place (here), so why not inline it? Similar to what @hanna-kruppe said above, I think in this case we lose more by obscuring what actually happens compared to what we gain by black box abstraction.

Further, using thread::scope is actually problematic when looking at the next loop: that expects to run while the worker threads are doing something, in order to print something immediately after a test is finished. thread::scope waits for all threads to be done before returning. So in your code, all finished tests would fill up the channel, and only after all tests are done does the following loop run and print all tests.
This can be fixed by moving the printing loop below into the thread::scope closure, which also requires inlining the function.

Aaand in that case we can also get rid of the first loop, since we already have a vector of "tasks" (the trials) we can iterate over.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the way you inlined it in d194ff9 has a bug: it spawns num_threads scoped threads, each of which only takes one trial from iter and then terminates. So if you have fewer threads than trials, not all tests are executed.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooof, that's awkward. Thanks for catching it! Should be fixed in d6ac84f. Would you mind reviewing that, to avoid me publishing another avoidable bug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a comment on that commit.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing. I removed the outdated comment in e17efb9 and also added additional tests in 41a1856 to catch this kind of bug in the future. Thanks again for catching this, will release momentarily.

for (outcome, test_info) in receiver.iter().take(num_tests) {
// In multithreaded mode, we do only print the start of the line
// after the test ran, as otherwise it would lead to terribly
Expand Down Expand Up @@ -552,7 +563,8 @@ fn run_single(runner: Box<dyn FnOnce(bool) -> Outcome + Send>, test_mode: bool)
// The `panic` information is just an `Any` object representing the
// value the panic was invoked with. For most panics (which use
// `panic!` like `println!`), this is either `&str` or `String`.
let payload = e.downcast_ref::<String>()
let payload = e
.downcast_ref::<String>()
.map(|s| s.as_str())
.or(e.downcast_ref::<&str>().map(|s| *s));

Expand Down
27 changes: 27 additions & 0 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::{sync, thread};

pub(crate) type Task = dyn FnOnce() + Send;
pub(crate) type BoxedTask = Box<Task>;
Comment on lines +3 to +4
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these type aliases pull their weight. With my suggestion from a previous comment, they are only used in one spot. So please remove the aliases and just write out the type in the scoped_run_tasks function.


pub(crate) fn scoped_run_tasks(
tasks: Vec<BoxedTask>,
num_threads: usize,
) {
if num_threads < 2 {
// There is another code path for num_threads == 1 running entirely in the main thread.
panic!("`run_on_scoped_pool` may not be called with `num_threads` less than 2");
}

let sync_iter = sync::Mutex::new(tasks.into_iter());
let next_task = || sync_iter.lock().unwrap().next();

thread::scope(|scope| {
for _ in 0..num_threads {
scope.spawn(|| {
while let Some(task) = next_task() {
task();
}
});
}
});
}