Skip to content

Commit

Permalink
Move from protobuf to bincode for a wire format
Browse files Browse the repository at this point in the history
This commit migrates away from the `protobuf` crate to instead just working with
bincode on the wire as a serialization format. This is done by leveraging a few
different crates:

* The `bincode` and `serde_derive` crates are used to define serialization
  for Rust structures as well as provide a bincode implementation.
* The `tokio_io::codec::length_delimited` module implements framing via length
  prefixes to transform an asynchronous stream of bytes into a literal `Stream`
  of `BytesMut`.
* The `tokio_serde_bincode` crate is then used to tie it all together, parsing
  these `BytesMut` as the request/response types of sccache.

Most of the changes here are related to moving away from the protobuf API
throughout the codebase (e.g. `has_foo` and `take_foo`) towards a more
rustic-ish API that just uses enums/structs. Overall it felt quite natural (as
one would expect) to just use the raw enum/struct values.

This may not be quite as performant as before but that doesn't really apply to
sccache's use case where perf is hugely dominated by actually compiling and
hashing, so I'm not too too worried about that.

My personal motivation for this is twofold:

1. Using `protobuf` was a little clunky throughout the codebase and definitely
   had some sharp edges that felt good to smooth out.
2. There's currently what I believe some mysterious segfault and/or stray write
   happening in sccache and I'm not sure where. The `protobuf` crate had a lot
   of `unsafe` code and in lieu of actually auditing it I figured it'd be good
   to kill two birds with one stone. I have no idea if this fixes my segfault
   problem (I never could reproduce it) but I figured it's worth a shot.
  • Loading branch information
alexcrichton committed Mar 25, 2017
1 parent 76982ee commit 07c64aa
Show file tree
Hide file tree
Showing 13 changed files with 372 additions and 3,744 deletions.
173 changes: 128 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 10 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ repository = "https://github.com/mozilla/sccache/"

[dependencies]
app_dirs = "1.1.1"
bytes = "0.4"
bincode = { git = 'https://github.com/TyOverby/bincode' }
byteorder = "1.0"
chrono = "0.2.25"
clap = "2.3.0"
env_logger = "0.3.3"
Expand All @@ -18,28 +19,30 @@ fern = "0.3.5"
filetime = "0.1"
futures = "0.1.11"
futures-cpupool = "0.1"
hyper = { git = "https://github.com/alexcrichton/hyper", branch = "tio" }
hyper-tls = { git = "https://github.com/alexcrichton/hyper-tls", branch = "tio" }
hyper = { git = "https://github.com/hyperium/hyper" }
hyper-tls = { git = "https://github.com/hyperium/hyper-tls" }
libc = "0.2.10"
local-encoding = "0.2.0"
log = "0.3.6"
lru-disk-cache = { path = "lru-disk-cache" }
number_prefix = "0.2.5"
protobuf = "1.0.18"
regex = "0.1.65"
retry = "0.4.0"
rust-crypto = "0.2.36"
rustc-serialize = "0.3"
serde_json = "0.8.0"
serde = "0.9"
serde_derive = "0.9"
serde_json = "0.9.0"
sha1 = "0.2.0"
tempdir = "0.3.4"
time = "0.1.35"
tokio-core = "0.1.6"
tokio-proto = "0.1"
tokio-io = "0.1"
tokio-process = "0.1"
tokio-proto = "0.1"
tokio-serde-bincode = { git = "https://github.com/alexcrichton/tokio-serde-bincode" }
tokio-service = "0.1"
tokio-tls = "0.1"
tokio-process = "0.1"
uuid = { version = "0.3.1", features = ["v4"] }
which = "0.2.1"
zip = { version = "0.2", default-features = false }
Expand Down
10 changes: 0 additions & 10 deletions generate-protocol.sh

This file was deleted.

42 changes: 18 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use protobuf::{
CodedInputStream,
Message,
ProtobufError,
ProtobufResult,
parse_from_bytes,
};
use protocol::{
ClientRequest,
ServerResponse,
};
use byteorder::{ByteOrder, BigEndian};
use protocol::{Request, Response};
use retry::retry;
use bincode;
use errors::*;
use std::io::{
self,
BufReader,
Expand Down Expand Up @@ -52,28 +45,29 @@ impl ServerConnection {
}

/// Send `request` to the server, read and return a `ServerResponse`.
pub fn request(&mut self, request : ClientRequest)
-> ProtobufResult<ServerResponse> {
pub fn request(&mut self, request: Request) -> Result<Response> {
trace!("ServerConnection::request");
try!(request.write_length_delimited_to_writer(&mut self.writer));
try!(self.writer.flush().or_else(|e| Err(ProtobufError::IoError(e))));
let bytes = bincode::serialize(&request, bincode::Infinite)?;
let mut len = [0; 4];
BigEndian::write_u32(&mut len, bytes.len() as u32);
self.writer.write_all(&len)?;
self.writer.write_all(&bytes)?;
self.writer.flush()?;
trace!("ServerConnection::request: sent request");
self.read_one_response()
}

/// Read a single `ServerResponse` from the server.
pub fn read_one_response(&mut self) -> ProtobufResult<ServerResponse> {
pub fn read_one_response(&mut self) -> Result<Response> {
trace!("ServerConnection::read_one_response");
//FIXME: wish `parse_length_delimited_from` worked here!
let len = try!({
let mut is = CodedInputStream::from_buffered_reader(&mut self.reader);
is.read_raw_varint32()
});
let mut bytes = [0; 4];
self.reader.read_exact(&mut bytes)?;
let len = BigEndian::read_u32(&bytes);
trace!("Should read {} more bytes", len);
let mut buf = vec![0; len as usize];
try!(self.reader.read_exact(&mut buf).or_else(|e| Err(ProtobufError::IoError(e))));
let mut data = vec![0; len as usize];
self.reader.read_exact(&mut data)?;
trace!("Done reading");
parse_from_bytes::<ServerResponse>(&buf)
Ok(bincode::deserialize(&data)?)
}
}

Expand Down
160 changes: 53 additions & 107 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,8 @@ use number_prefix::{
Prefixed,
Standalone,
};
use protobuf::{RepeatedField, ProtobufError};
use protocol::{
CacheStats,
ClientRequest,
Compile,
CompileFinished,
CompileStarted,
GetStats,
Shutdown,
UnhandledCompile,
ZeroStats,
};
use protocol::{Request, Response, CacheStats, CompileResponse, CompileFinished};
use protocol::{Compile, CacheStat};
use server;
use std::env;
use std::ffi::{OsStr,OsString};
Expand All @@ -69,14 +59,6 @@ pub const DEFAULT_PORT: u16 = 4226;
/// The number of milliseconds to wait for server startup.
const SERVER_STARTUP_TIMEOUT_MS: u32 = 5000;

/// Possible responses from the server for a `Compile` request.
enum CompileResponse {
/// The compilation was started.
CompileStarted(CompileStarted),
/// The server could not handle this compilation request.
UnhandledCompile(UnhandledCompile),
}

// Should this just be a Result?
/// Result of background server startup.
enum ServerStartup {
Expand Down Expand Up @@ -384,14 +366,12 @@ fn connect_or_start_server(port: u16) -> Result<ServerConnection> {
/// Send a `ZeroStats` request to the server, and return the `CacheStats` request if successful.
pub fn request_zero_stats(mut conn : ServerConnection) -> Result<CacheStats> {
debug!("request_stats");
let mut req = ClientRequest::new();
req.set_zero_stats(ZeroStats::new());
//TODO: better error mapping
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(Request::ZeroStats).chain_err(|| {
"failed to send zero statistics command to server or failed to receive respone"
})?;
if response.has_stats() {
Ok(response.take_stats())
if let Response::Stats(stats) = response {
Ok(stats)
} else {
bail!("Unexpected server response!")
}
Expand All @@ -400,14 +380,12 @@ pub fn request_zero_stats(mut conn : ServerConnection) -> Result<CacheStats> {
/// Send a `GetStats` request to the server, and return the `CacheStats` request if successful.
pub fn request_stats(mut conn : ServerConnection) -> Result<CacheStats> {
debug!("request_stats");
let mut req = ClientRequest::new();
req.set_get_stats(GetStats::new());
//TODO: better error mapping
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(Request::GetStats).chain_err(|| {
"Failed to send data to or receive data from server"
})?;
if response.has_stats() {
Ok(response.take_stats())
if let Response::Stats(stats) = response {
Ok(stats)
} else {
bail!("Unexpected server response!")
}
Expand All @@ -416,36 +394,34 @@ pub fn request_stats(mut conn : ServerConnection) -> Result<CacheStats> {
/// Send a `Shutdown` request to the server, and return the `CacheStats` contained within the response if successful.
pub fn request_shutdown(mut conn : ServerConnection) -> Result<CacheStats> {
debug!("request_shutdown");
let mut req = ClientRequest::new();
req.set_shutdown(Shutdown::new());
//TODO: better error mapping
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(Request::Shutdown).chain_err(|| {
"Failed to send data to or receive data from server"
})?;
if response.has_shutting_down() {
Ok(response.take_shutting_down().take_stats())
if let Response::ShuttingDown(stats) = response {
Ok(stats)
} else {
bail!("Unexpected server response!")
}
}

/// Print `stats` to stdout.
fn print_stats(stats: CacheStats) -> Result<()> {
let formatted = stats.get_stats().iter()
.map(|s| (s.get_name(), if s.has_count() {
format!("{}", s.get_count())
} else if s.has_str() {
s.get_str().to_owned()
} else if s.has_size() {
match binary_prefix(s.get_size() as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
} else {
String::from("???")
}))
let formatted = stats.stats.into_iter()
.map(|s| {
(s.name, match s.value {
CacheStat::Count(c) => c.to_string(),
CacheStat::String(s) => s,
CacheStat::Size(size) => {
match binary_prefix(size as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
}
})
})
.collect::<Vec<_>>();
let name_width = formatted.iter().map(|&(n, _)| n.len()).max().unwrap();
let name_width = formatted.iter().map(|&(ref n, _)| n.len()).max().unwrap();
let stat_width = formatted.iter().map(|&(_, ref s)| s.len()).max().unwrap();
for (name, stat) in formatted {
println!("{:<name_width$} {:>stat_width$}", name, stat, name_width=name_width, stat_width=stat_width);
Expand All @@ -455,28 +431,20 @@ fn print_stats(stats: CacheStats) -> Result<()> {

/// Send a `Compile` request to the server, and return the server response if successful.
fn request_compile<W: AsRef<Path>, X: AsRef<OsStr>, Y: AsRef<Path>>(conn: &mut ServerConnection, exe: W, args: &Vec<X>, cwd: Y) -> Result<CompileResponse> {
//TODO: It'd be nicer to send these over as raw bytes.
let exe = exe.as_ref().to_str().ok_or("bad exe")?;
let cwd = cwd.as_ref().to_str().ok_or("bad cwd")?;
let args = args.iter().filter_map(|a| a.as_ref().to_str().map(|s| s.to_owned())).collect::<Vec<_>>();
if args.is_empty() {
bail!("bad commandline")
}
let mut req = ClientRequest::new();
let mut compile = Compile::new();
compile.set_exe(exe.to_owned());
compile.set_cwd(cwd.to_owned());
compile.set_command(RepeatedField::from_vec(args));
trace!("request_compile: {:?}", compile);
req.set_compile(compile);
let req = Request::Compile(Compile {
exe: exe.as_ref().to_str().ok_or("bad exe")?.to_owned(),
cwd: cwd.as_ref().to_str().ok_or("bad cwd")?.to_owned(),
args: args.iter().map(|a| {
a.as_ref().to_str().ok_or("bad cwd".into()).map(|s| s.to_owned())
}).collect::<Result<_>>()?,
});
trace!("request_compile: {:?}", req);
//TODO: better error mapping?
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(req).chain_err(|| {
"Failed to send data to or receive data from server"
})?;
if response.has_compile_started() {
Ok(CompileResponse::CompileStarted(response.take_compile_started()))
} else if response.has_unhandled_compile() {
Ok(CompileResponse::UnhandledCompile(response.take_unhandled_compile()))
if let Response::Compile(response) = response {
Ok(response)
} else {
bail!("Unexpected response from server")
}
Expand Down Expand Up @@ -504,18 +472,14 @@ fn handle_compile_finished(response: CompileFinished,
// It might be nice if the server sent stdout/stderr as the process
// ran, but then it would have to also save them in the cache as
// interleaved streams to really make it work.
if response.has_stdout() {
try!(stdout.write_all(response.get_stdout()));
}
if response.has_stderr() {
try!(stderr.write_all(response.get_stderr()));
}
if response.has_retcode() {
let ret = response.get_retcode();
try!(stdout.write_all(&response.stdout));
try!(stderr.write_all(&response.stderr));

if let Some(ret) = response.retcode {
trace!("compiler exited with status {}", ret);
Ok(ret)
} else if response.has_signal() {
println!("Compiler killed by signal {}", response.get_signal());
} else if let Some(signal) = response.signal {
println!("Compiler killed by signal {}", signal);
Ok(-2)
} else {
println!("Missing compiler exit status!");
Expand All @@ -542,39 +506,21 @@ fn handle_compile_response<T>(mut creator: T,
where T : CommandCreatorSync,
{
match response {
CompileResponse::CompileStarted(_) => {
CompileResponse::CompileStarted => {
debug!("Server sent CompileStarted");
// Wait for CompileFinished.
match conn.read_one_response() {
Ok(mut res) => {
if res.has_compile_finished() {
trace!("Server sent CompileFinished");
return handle_compile_finished(res.take_compile_finished(),
stdout, stderr)
} else {
bail!("unexpected response from server")
}
}

// Currently the shutdown behavior of the remote sccache server
// is to wait at most N seconds for all active connections to
// finish and then close everything. If we get unlucky and don't
// get a response then we just forge ahead locally and run the
// compilation ourselves.
Err(ProtobufError::IoError(ref e))
if e.kind() == io::ErrorKind::UnexpectedEof => {
writeln!(io::stderr(),
"warning: sccache server looks like it shut down \
unexpectedly, compiling locally instead").unwrap();
}

Err(e) => return Err(e).chain_err(|| {
//TODO: something better here?
"error reading compile response from server"
})
let res = conn.read_one_response().chain_err(|| {
//TODO: something better here?
"error reading compile response from server"
})?;
if let Response::CompileFinished(result) = res {
trace!("Server sent CompileFinished");
return handle_compile_finished(result, stdout, stderr)
} else {
bail!("unexpected response from server")
}
}
CompileResponse::UnhandledCompile(_) => {
CompileResponse::UnhandledCompile => {
debug!("Server sent UnhandledCompile");
}
};
Expand Down
4 changes: 4 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ use std::boxed::Box;
use std::error;
use std::io;

use bincode;
use futures::Future;
use futures::future;
use hyper;
use lru_disk_cache;
use serde_json;

error_chain! {
foreign_links {
Io(io::Error);
Hyper(hyper::Error);
Lru(lru_disk_cache::Error);
Json(serde_json::Error);
Bincode(bincode::Error);
}

errors {
Expand Down
Loading

0 comments on commit 07c64aa

Please sign in to comment.