Skip to content

Commit

Permalink
rollup merge of rust-lang#20615: aturon/stab-2-thread
Browse files Browse the repository at this point in the history
This commit takes a first pass at stabilizing `std::thread`:

* It removes the `detach` method in favor of two constructors -- `spawn`
  for detached threads, `scoped` for "scoped" (i.e., must-join)
  threads. This addresses some of the surprise/frustrating debug
  sessions with the previous API, in which `spawn` produced a guard that
  on destruction joined the thread (unless `detach` was called).

  The reason to have the division in part is that `Send` will soon not
  imply `'static`, which means that `scoped` thread creation can take a
  closure over *shared stack data* of the parent thread. On the other
  hand, this means that the parent must not pop the relevant stack
  frames while the child thread is running. The `JoinGuard` is used to
  prevent this from happening by joining on drop (if you have not
  already explicitly `join`ed.) The APIs around `scoped` are
  future-proofed for the `Send` changes by taking an additional lifetime
  parameter. With the current definition of `Send`, this is forced to be
  `'static`, but when `Send` changes these APIs will gain their full
  flexibility immediately.

  Threads that are `spawn`ed, on the other hand, are detached from the
  start and do not yield an RAII guard.

  The hope is that, by making `scoped` an explicit opt-in with a very
  suggestive name, it will be drastically less likely to be caught by a
  surprising deadlock due to an implicit join at the end of a scope.

* The module itself is marked stable.

* Existing methods other than `spawn` and `scoped` are marked stable.

The migration path is:

```rust
Thread::spawn(f).detached()
```

becomes

```rust
Thread::spawn(f)
```

while

```rust
let res = Thread::spawn(f);
res.join()
```

becomes

```rust
let res = Thread::scoped(f);
res.join()
```

[breaking-change]
  • Loading branch information
alexcrichton committed Jan 6, 2015
2 parents 0631b46 + caca9b2 commit 36f5d12
Show file tree
Hide file tree
Showing 97 changed files with 359 additions and 291 deletions.
2 changes: 1 addition & 1 deletion src/compiletest/runtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ fn run_debuginfo_gdb_test(config: &Config, props: &TestProps, testfile: &Path) {
loop {
//waiting 1 second for gdbserver start
timer::sleep(Duration::milliseconds(1000));
let result = Thread::spawn(move || {
let result = Thread::scoped(move || {
tcp::TcpStream::connect("127.0.0.1:5039").unwrap();
}).join();
if result.is_err() {
Expand Down
9 changes: 4 additions & 5 deletions src/doc/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ fn main() {
for _ in range(0u, 10u) {
Thread::spawn(move || {
println!("Hello, world!");
}).detach();
});
}
}
```
Expand All @@ -405,8 +405,7 @@ This program creates ten threads, who all print `Hello, world!`. The
double bars `||`. (The `move` keyword indicates that the closure takes
ownership of any data it uses; we'll have more on the significance of
this shortly.) This closure is executed in a new thread created by
`spawn`. The `detach` method means that the child thread is allowed to
outlive its parent.
`spawn`.
One common form of problem in concurrent programs is a 'data race.'
This occurs when two different threads attempt to access the same
Expand All @@ -429,7 +428,7 @@ fn main() {
for i in range(0u, 3u) {
Thread::spawn(move || {
for j in range(0, 3) { numbers[j] += 1 }
}).detach();
});
}
}
```
Expand Down Expand Up @@ -488,7 +487,7 @@ fn main() {
(*array)[i] += 1;
println!("numbers[{}] is {}", i, (*array)[i]);
}).detach();
});
}
}
```
Expand Down
6 changes: 3 additions & 3 deletions src/liballoc/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
//!
//! Thread::spawn(move || {
//! println!("{}", five);
//! }).detach();
//! });
//! }
//! ```
//!
Expand All @@ -63,7 +63,7 @@
//! *number += 1;
//!
//! println!("{}", *number); // prints 6
//! }).detach();
//! });
//! }
//! ```
Expand Down Expand Up @@ -106,7 +106,7 @@ use heap::deallocate;
/// let local_numbers = child_numbers.as_slice();
///
/// // Work with the local numbers
/// }).detach();
/// });
/// }
/// }
/// ```
Expand Down
2 changes: 1 addition & 1 deletion src/libcollections/dlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ mod tests {
#[test]
fn test_send() {
let n = list_from(&[1i,2,3]);
Thread::spawn(move || {
Thread::scoped(move || {
check_links(&n);
let a: &[_] = &[&1,&2,&3];
assert_eq!(a, n.iter().collect::<Vec<&int>>());
Expand Down
2 changes: 1 addition & 1 deletion src/libcore/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
//! let spinlock_clone = spinlock.clone();
//! Thread::spawn(move|| {
//! spinlock_clone.store(0, Ordering::SeqCst);
//! }).detach();
//! });
//!
//! // Wait for the other task to release the lock
//! while spinlock.load(Ordering::SeqCst) != 0 {}
Expand Down
2 changes: 1 addition & 1 deletion src/librustc_driver/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ pub fn monitor<F:FnOnce()+Send>(f: F) {
cfg = cfg.stack_size(STACK_SIZE);
}

match cfg.spawn(move || { std::io::stdio::set_stderr(box w); f() }).join() {
match cfg.scoped(move || { std::io::stdio::set_stderr(box w); f() }).join() {
Ok(()) => { /* fallthrough */ }
Err(value) => {
// Thread panicked without emitting a fatal diagnostic
Expand Down
2 changes: 1 addition & 1 deletion src/librustc_trans/back/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,7 @@ fn run_work_multithreaded(sess: &Session,
}

tx.take().unwrap().send(()).unwrap();
}).detach();
});
}

let mut panicked = false;
Expand Down
4 changes: 2 additions & 2 deletions src/librustdoc/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ struct Output {

pub fn main() {
static STACK_SIZE: uint = 32000000; // 32MB
let res = std::thread::Builder::new().stack_size(STACK_SIZE).spawn(move || {
let res = std::thread::Builder::new().stack_size(STACK_SIZE).scoped(move || {
main_args(std::os::args().as_slice())
}).join();
std::os::set_exit_status(res.map_err(|_| ()).unwrap());
Expand Down Expand Up @@ -345,7 +345,7 @@ fn rust_input(cratefile: &str, externs: core::Externs, matches: &getopts::Matche
let cr = Path::new(cratefile);
info!("starting to run rustc");

let (mut krate, analysis) = std::thread::Thread::spawn(move |:| {
let (mut krate, analysis) = std::thread::Thread::scoped(move |:| {
let cr = cr;
core::run_core(paths, cfgs, externs, &cr, triple)
}).join().map_err(|_| "rustc failed").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/librustdoc/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ fn runtest(test: &str, cratename: &str, libs: SearchPaths,
None => box io::stderr() as Box<Writer>,
};
io::util::copy(&mut p, &mut err).unwrap();
}).detach();
});
let emitter = diagnostic::EmitterWriter::new(box w2, None);

// Compile the code
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/io/comm_adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ mod test {
tx.send(vec![3u8, 4u8]).unwrap();
tx.send(vec![5u8, 6u8]).unwrap();
tx.send(vec![7u8, 8u8]).unwrap();
}).detach();
});

let mut reader = ChanReader::new(rx);
let mut buf = [0u8; 3];
Expand Down Expand Up @@ -216,7 +216,7 @@ mod test {
tx.send(b"rld\nhow ".to_vec()).unwrap();
tx.send(b"are you?".to_vec()).unwrap();
tx.send(b"".to_vec()).unwrap();
}).detach();
});

let mut reader = ChanReader::new(rx);

Expand All @@ -235,7 +235,7 @@ mod test {
writer.write_be_u32(42).unwrap();

let wanted = vec![0u8, 0u8, 0u8, 42u8];
let got = match Thread::spawn(move|| { rx.recv().unwrap() }).join() {
let got = match Thread::scoped(move|| { rx.recv().unwrap() }).join() {
Ok(got) => got,
Err(_) => panic!(),
};
Expand Down
10 changes: 6 additions & 4 deletions src/libstd/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,12 @@
//! for stream in acceptor.incoming() {
//! match stream {
//! Err(e) => { /* connection failed */ }
//! Ok(stream) => Thread::spawn(move|| {
//! // connection succeeded
//! handle_client(stream)
//! }).detach()
//! Ok(stream) => {
//! Thread::spawn(move|| {
//! // connection succeeded
//! handle_client(stream)
//! });
//! }
//! }
//! }
//!
Expand Down
12 changes: 6 additions & 6 deletions src/libstd/io/net/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ mod tests {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv();
}).detach();
});

let mut b = [0];
let mut s = UnixStream::connect(&addr).unwrap();
Expand Down Expand Up @@ -645,7 +645,7 @@ mod tests {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv();
}).detach();
});

let mut s = UnixStream::connect(&addr).unwrap();
let s2 = s.clone();
Expand All @@ -672,7 +672,7 @@ mod tests {
rx.recv().unwrap();
assert!(s.write(&[0]).is_ok());
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
s.set_timeout(Some(20));
Expand Down Expand Up @@ -716,7 +716,7 @@ mod tests {
}
}
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
s.set_read_timeout(Some(20));
Expand All @@ -739,7 +739,7 @@ mod tests {
rx.recv().unwrap();
assert!(s.write(&[0]).is_ok());
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
s.set_write_timeout(Some(20));
Expand All @@ -766,7 +766,7 @@ mod tests {
rx.recv().unwrap();
assert!(s.write(&[0]).is_ok());
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
let s2 = s.clone();
Expand Down
26 changes: 14 additions & 12 deletions src/libstd/io/net/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl TcpStream {
/// timer::sleep(Duration::seconds(1));
/// let mut stream = stream2;
/// stream.close_read();
/// }).detach();
/// });
///
/// // wait for some data, will get canceled after one second
/// let mut buf = [0];
Expand Down Expand Up @@ -295,10 +295,12 @@ impl sys_common::AsInner<TcpStreamImp> for TcpStream {
/// for stream in acceptor.incoming() {
/// match stream {
/// Err(e) => { /* connection failed */ }
/// Ok(stream) => Thread::spawn(move|| {
/// // connection succeeded
/// handle_client(stream)
/// }).detach()
/// Ok(stream) => {
/// Thread::spawn(move|| {
/// // connection succeeded
/// handle_client(stream)
/// });
/// }
/// }
/// }
///
Expand Down Expand Up @@ -432,7 +434,7 @@ impl TcpAcceptor {
/// Err(e) => panic!("unexpected error: {}", e),
/// }
/// }
/// }).detach();
/// });
///
/// # fn wait_for_sigint() {}
/// // Now that our accept loop is running, wait for the program to be
Expand Down Expand Up @@ -1186,7 +1188,7 @@ mod test {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv().unwrap();
}).detach();
});

let mut b = [0];
let mut s = TcpStream::connect(addr).unwrap();
Expand Down Expand Up @@ -1223,7 +1225,7 @@ mod test {
let mut a = a;
let _s = a.accept().unwrap();
let _ = rx.recv().unwrap();
}).detach();
});

let mut s = TcpStream::connect(addr).unwrap();
let s2 = s.clone();
Expand All @@ -1250,7 +1252,7 @@ mod test {
rx.recv().unwrap();
assert!(s.write(&[0]).is_ok());
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
s.set_timeout(Some(20));
Expand Down Expand Up @@ -1289,7 +1291,7 @@ mod test {
}
}
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
s.set_read_timeout(Some(20));
Expand All @@ -1312,7 +1314,7 @@ mod test {
rx.recv().unwrap();
assert!(s.write(&[0]).is_ok());
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
s.set_write_timeout(Some(20));
Expand Down Expand Up @@ -1340,7 +1342,7 @@ mod test {
rx.recv().unwrap();
assert_eq!(s.write(&[0]), Ok(()));
let _ = rx.recv();
}).detach();
});

let mut s = a.accept().unwrap();
let s2 = s.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/libstd/io/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ impl Process {
Thread::spawn(move |:| {
let mut stream = stream;
tx.send(stream.read_to_end()).unwrap();
}).detach();
});
}
None => tx.send(Ok(Vec::new())).unwrap()
}
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/io/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ mod test {

Thread::spawn(move|| {
let _ = timer_rx.recv();
}).detach();
});

// when we drop the TimerWatcher we're going to destroy the channel,
// which must wake up the task on the other end
Expand All @@ -372,7 +372,7 @@ mod test {

Thread::spawn(move|| {
let _ = timer_rx.recv();
}).detach();
});

timer.oneshot(Duration::milliseconds(1));
}
Expand All @@ -385,7 +385,7 @@ mod test {

Thread::spawn(move|| {
let _ = timer_rx.recv();
}).detach();
});

timer.sleep(Duration::milliseconds(1));
}
Expand Down
4 changes: 2 additions & 2 deletions src/libstd/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ macro_rules! try {
/// # fn long_running_task() {}
/// # fn calculate_the_answer() -> int { 42i }
///
/// Thread::spawn(move|| { long_running_task(); tx1.send(()) }).detach();
/// Thread::spawn(move|| { tx2.send(calculate_the_answer()) }).detach();
/// Thread::spawn(move|| { long_running_task(); tx1.send(()).unwrap(); });
/// Thread::spawn(move|| { tx2.send(calculate_the_answer()).unwrap(); });
///
/// select! (
/// _ = rx1.recv() => println!("the long running task finished first"),
Expand Down
6 changes: 3 additions & 3 deletions src/libstd/path/posix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,17 @@ mod tests {
#[test]
fn test_null_byte() {
use thread::Thread;
let result = Thread::spawn(move|| {
let result = Thread::scoped(move|| {
Path::new(b"foo/bar\0")
}).join();
assert!(result.is_err());

let result = Thread::spawn(move|| {
let result = Thread::scoped(move|| {
Path::new("test").set_filename(b"f\0o")
}).join();
assert!(result.is_err());

let result = Thread::spawn(move|| {
let result = Thread::scoped(move|| {
Path::new("test").push(b"f\0o");
}).join();
assert!(result.is_err());
Expand Down
Loading

0 comments on commit 36f5d12

Please sign in to comment.