Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move from protobuf to bincode for a wire format #85

Merged
merged 2 commits into from
Mar 27, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
388 changes: 245 additions & 143 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 11 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ repository = "https://github.com/mozilla/sccache/"

[dependencies]
app_dirs = "1.1.1"
bincode = { git = 'https://github.com/TyOverby/bincode' }
byteorder = "1.0"
chrono = "0.2.25"
clap = "2.3.0"
env_logger = "0.3.3"
error-chain = { version = "0.7.2", default-features = false }
fern = "0.3.5"
filetime = "0.1"
futures = "0.1"
futures = "0.1.11"
futures-cpupool = "0.1"
hyper = { git = "https://github.com/hyperium/hyper" }
hyper-tls = { git = "https://github.com/hyperium/hyper-tls" }
Expand All @@ -24,19 +26,23 @@ local-encoding = "0.2.0"
log = "0.3.6"
lru-disk-cache = { path = "lru-disk-cache" }
number_prefix = "0.2.5"
protobuf = "1.0.18"
regex = "0.1.65"
retry = "0.4.0"
rust-crypto = "0.2.36"
rustc-serialize = "0.3"
serde_json = "0.8.0"
serde = "0.9"
serde_derive = "0.9"
serde_json = "0.9.0"
sha1 = "0.2.0"
tempdir = "0.3.4"
time = "0.1.35"
tokio-core = "0.1"
tokio-core = "0.1.6"
tokio-io = "0.1"
tokio-process = "0.1"
tokio-proto = "0.1"
tokio-serde-bincode = { git = "https://github.com/alexcrichton/tokio-serde-bincode" }
tokio-service = "0.1"
tokio-process = "0.1"
tokio-tls = "0.1"
uuid = { version = "0.3.1", features = ["v4"] }
which = "0.2.1"
zip = { version = "0.2", default-features = false }
Expand Down
10 changes: 0 additions & 10 deletions generate-protocol.sh

This file was deleted.

46 changes: 20 additions & 26 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use protobuf::{
CodedInputStream,
Message,
ProtobufError,
ProtobufResult,
parse_from_bytes,
};
use protocol::{
ClientRequest,
ServerResponse,
};
use byteorder::{ByteOrder, BigEndian};
use protocol::{Request, Response};
use retry::retry;
use bincode;
use errors::*;
use std::io::{
self,
BufReader,
Expand Down Expand Up @@ -51,29 +44,30 @@ impl ServerConnection {
})
}

/// Send `request` to the server, read and return a `ServerResponse`.
pub fn request(&mut self, request : ClientRequest)
-> ProtobufResult<ServerResponse> {
/// Send `request` to the server, read and return a `Response`.
pub fn request(&mut self, request: Request) -> Result<Response> {
trace!("ServerConnection::request");
try!(request.write_length_delimited_to_writer(&mut self.writer));
try!(self.writer.flush().or_else(|e| Err(ProtobufError::IoError(e))));
let bytes = bincode::serialize(&request, bincode::Infinite)?;
let mut len = [0; 4];
BigEndian::write_u32(&mut len, bytes.len() as u32);
self.writer.write_all(&len)?;
self.writer.write_all(&bytes)?;
self.writer.flush()?;
trace!("ServerConnection::request: sent request");
self.read_one_response()
}

/// Read a single `ServerResponse` from the server.
pub fn read_one_response(&mut self) -> ProtobufResult<ServerResponse> {
/// Read a single `Response` from the server.
pub fn read_one_response(&mut self) -> Result<Response> {
trace!("ServerConnection::read_one_response");
//FIXME: wish `parse_length_delimited_from` worked here!
let len = try!({
let mut is = CodedInputStream::from_buffered_reader(&mut self.reader);
is.read_raw_varint32()
});
let mut bytes = [0; 4];
self.reader.read_exact(&mut bytes)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would things look nicer overall if we used tokio here instead of blocking IO? Originally I used blocking IO in the client because the mio stuff was so complicated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah I think they'd basically be equivalent. As soon as we'd want to do anything else, though, such as timeouts, then we'd definitely want to use Tokio as it'd look quite a bit better.

The tokio version would basically just do the futures equivalent of this, using tokio_io combinators, and a little less ergonomically.

let len = BigEndian::read_u32(&bytes);
trace!("Should read {} more bytes", len);
let mut buf = vec![0; len as usize];
try!(self.reader.read_exact(&mut buf).or_else(|e| Err(ProtobufError::IoError(e))));
let mut data = vec![0; len as usize];
self.reader.read_exact(&mut data)?;
trace!("Done reading");
parse_from_bytes::<ServerResponse>(&buf)
Ok(bincode::deserialize(&data)?)
}
}

Expand Down
163 changes: 58 additions & 105 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,8 @@ use number_prefix::{
Prefixed,
Standalone,
};
use protobuf::{RepeatedField, ProtobufError};
use protocol::{
CacheStats,
ClientRequest,
Compile,
CompileFinished,
CompileStarted,
GetStats,
Shutdown,
UnhandledCompile,
ZeroStats,
};
use protocol::{Request, Response, CacheStats, CompileResponse, CompileFinished};
use protocol::{Compile, CacheStat};
use server;
use std::env;
use std::ffi::{OsStr,OsString};
Expand All @@ -69,14 +59,6 @@ pub const DEFAULT_PORT: u16 = 4226;
/// The number of milliseconds to wait for server startup.
const SERVER_STARTUP_TIMEOUT_MS: u32 = 5000;

/// Possible responses from the server for a `Compile` request.
enum CompileResponse {
/// The compilation was started.
CompileStarted(CompileStarted),
/// The server could not handle this compilation request.
UnhandledCompile(UnhandledCompile),
}

// Should this just be a Result?
/// Result of background server startup.
enum ServerStartup {
Expand Down Expand Up @@ -105,7 +87,7 @@ fn run_server_process() -> Result<ServerStartup> {
use futures::{Future, Stream};
use std::time::Duration;
use tempdir::TempDir;
use tokio_core::io::read_exact;
use tokio_io::io::read_exact;
use tokio_core::reactor::Timeout;

trace!("run_server_process");
Expand Down Expand Up @@ -384,14 +366,12 @@ fn connect_or_start_server(port: u16) -> Result<ServerConnection> {
/// Send a `ZeroStats` request to the server, and return the `CacheStats` request if successful.
pub fn request_zero_stats(mut conn : ServerConnection) -> Result<CacheStats> {
debug!("request_stats");
let mut req = ClientRequest::new();
req.set_zero_stats(ZeroStats::new());
//TODO: better error mapping
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(Request::ZeroStats).chain_err(|| {
"failed to send zero statistics command to server or failed to receive respone"
})?;
if response.has_stats() {
Ok(response.take_stats())
if let Response::Stats(stats) = response {
Ok(stats)
} else {
bail!("Unexpected server response!")
}
Expand All @@ -400,14 +380,12 @@ pub fn request_zero_stats(mut conn : ServerConnection) -> Result<CacheStats> {
/// Send a `GetStats` request to the server, and return the `CacheStats` request if successful.
pub fn request_stats(mut conn : ServerConnection) -> Result<CacheStats> {
debug!("request_stats");
let mut req = ClientRequest::new();
req.set_get_stats(GetStats::new());
//TODO: better error mapping
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(Request::GetStats).chain_err(|| {
"Failed to send data to or receive data from server"
})?;
if response.has_stats() {
Ok(response.take_stats())
if let Response::Stats(stats) = response {
Ok(stats)
} else {
bail!("Unexpected server response!")
}
Expand All @@ -416,36 +394,34 @@ pub fn request_stats(mut conn : ServerConnection) -> Result<CacheStats> {
/// Send a `Shutdown` request to the server, and return the `CacheStats` contained within the response if successful.
pub fn request_shutdown(mut conn : ServerConnection) -> Result<CacheStats> {
debug!("request_shutdown");
let mut req = ClientRequest::new();
req.set_shutdown(Shutdown::new());
//TODO: better error mapping
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(Request::Shutdown).chain_err(|| {
"Failed to send data to or receive data from server"
})?;
if response.has_shutting_down() {
Ok(response.take_shutting_down().take_stats())
if let Response::ShuttingDown(stats) = response {
Ok(stats)
} else {
bail!("Unexpected server response!")
}
}

/// Print `stats` to stdout.
fn print_stats(stats: CacheStats) -> Result<()> {
let formatted = stats.get_stats().iter()
.map(|s| (s.get_name(), if s.has_count() {
format!("{}", s.get_count())
} else if s.has_str() {
s.get_str().to_owned()
} else if s.has_size() {
match binary_prefix(s.get_size() as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
} else {
String::from("???")
}))
let formatted = stats.stats.into_iter()
.map(|s| {
(s.name, match s.value {
CacheStat::Count(c) => c.to_string(),
CacheStat::String(s) => s,
CacheStat::Size(size) => {
match binary_prefix(size as f64) {
Standalone(bytes) => format!("{} bytes", bytes),
Prefixed(prefix, n) => format!("{:.0} {}B", n, prefix),
}
}
})
})
.collect::<Vec<_>>();
let name_width = formatted.iter().map(|&(n, _)| n.len()).max().unwrap();
let name_width = formatted.iter().map(|&(ref n, _)| n.len()).max().unwrap();
let stat_width = formatted.iter().map(|&(_, ref s)| s.len()).max().unwrap();
for (name, stat) in formatted {
println!("{:<name_width$} {:>stat_width$}", name, stat, name_width=name_width, stat_width=stat_width);
Expand All @@ -455,28 +431,20 @@ fn print_stats(stats: CacheStats) -> Result<()> {

/// Send a `Compile` request to the server, and return the server response if successful.
fn request_compile<W: AsRef<Path>, X: AsRef<OsStr>, Y: AsRef<Path>>(conn: &mut ServerConnection, exe: W, args: &Vec<X>, cwd: Y) -> Result<CompileResponse> {
//TODO: It'd be nicer to send these over as raw bytes.
let exe = exe.as_ref().to_str().ok_or("bad exe")?;
let cwd = cwd.as_ref().to_str().ok_or("bad cwd")?;
let args = args.iter().filter_map(|a| a.as_ref().to_str().map(|s| s.to_owned())).collect::<Vec<_>>();
if args.is_empty() {
bail!("bad commandline")
}
let mut req = ClientRequest::new();
let mut compile = Compile::new();
compile.set_exe(exe.to_owned());
compile.set_cwd(cwd.to_owned());
compile.set_command(RepeatedField::from_vec(args));
trace!("request_compile: {:?}", compile);
req.set_compile(compile);
let req = Request::Compile(Compile {
exe: exe.as_ref().to_str().ok_or("bad exe")?.to_owned(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a follow-up, we could now just use OsStr or OsString all the way for the values from the client commandline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I tried to do that but Serialize/Deserialize wasn't implemented for one of them :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... I'll send a PR!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this is now tracked at serde-rs/serde#823

cwd: cwd.as_ref().to_str().ok_or("bad cwd")?.to_owned(),
args: args.iter().map(|a| {
a.as_ref().to_str().ok_or("bad cwd".into()).map(|s| s.to_owned())
}).collect::<Result<_>>()?,
});
trace!("request_compile: {:?}", req);
//TODO: better error mapping?
let mut response = conn.request(req).chain_err(|| {
let response = conn.request(req).chain_err(|| {
"Failed to send data to or receive data from server"
})?;
if response.has_compile_started() {
Ok(CompileResponse::CompileStarted(response.take_compile_started()))
} else if response.has_unhandled_compile() {
Ok(CompileResponse::UnhandledCompile(response.take_unhandled_compile()))
if let Response::Compile(response) = response {
Ok(response)
} else {
bail!("Unexpected response from server")
}
Expand Down Expand Up @@ -504,18 +472,14 @@ fn handle_compile_finished(response: CompileFinished,
// It might be nice if the server sent stdout/stderr as the process
// ran, but then it would have to also save them in the cache as
// interleaved streams to really make it work.
if response.has_stdout() {
try!(stdout.write_all(response.get_stdout()));
}
if response.has_stderr() {
try!(stderr.write_all(response.get_stderr()));
}
if response.has_retcode() {
let ret = response.get_retcode();
stdout.write_all(&response.stdout)?;
stderr.write_all(&response.stderr)?;

if let Some(ret) = response.retcode {
trace!("compiler exited with status {}", ret);
Ok(ret)
} else if response.has_signal() {
println!("Compiler killed by signal {}", response.get_signal());
} else if let Some(signal) = response.signal {
println!("Compiler killed by signal {}", signal);
Ok(-2)
} else {
println!("Missing compiler exit status!");
Expand All @@ -542,39 +506,28 @@ fn handle_compile_response<T>(mut creator: T,
where T : CommandCreatorSync,
{
match response {
CompileResponse::CompileStarted(_) => {
CompileResponse::CompileStarted => {
debug!("Server sent CompileStarted");
// Wait for CompileFinished.
match conn.read_one_response() {
Ok(mut res) => {
if res.has_compile_finished() {
trace!("Server sent CompileFinished");
return handle_compile_finished(res.take_compile_finished(),
stdout, stderr)
} else {
bail!("unexpected response from server")
}
Ok(Response::CompileFinished(result)) => {
return handle_compile_finished(result, stdout, stderr)
}

// Currently the shutdown behavior of the remote sccache server
// is to wait at most N seconds for all active connections to
// finish and then close everything. If we get unlucky and don't
// get a response then we just forge ahead locally and run the
// compilation ourselves.
Err(ProtobufError::IoError(ref e))
if e.kind() == io::ErrorKind::UnexpectedEof => {
writeln!(io::stderr(),
"warning: sccache server looks like it shut down \
Ok(_) => bail!("unexpected response from server"),
Err(Error(ErrorKind::Io(ref e), _))
if e.kind() == io::ErrorKind::UnexpectedEof =>
{
writeln!(io::stderr(),
"warning: sccache server looks like it shut down \
unexpectedly, compiling locally instead").unwrap();
}

Err(e) => return Err(e).chain_err(|| {
Err(e) => return Err(e).chain_err(|| {
//TODO: something better here?
"error reading compile response from server"
})
}),
}
}
CompileResponse::UnhandledCompile(_) => {
CompileResponse::UnhandledCompile => {
debug!("Server sent UnhandledCompile");
}
};
Expand All @@ -586,12 +539,12 @@ fn handle_compile_response<T>(mut creator: T,
if log_enabled!(Trace) {
trace!("running command: {:?}", cmd);
}
let output = try!(core.run(run_input_output(cmd, None)));
let output = core.run(run_input_output(cmd, None))?;
if !output.stdout.is_empty() {
try!(stdout.write_all(&output.stdout));
stdout.write_all(&output.stdout)?;
}
if !output.stderr.is_empty() {
try!(stderr.write_all(&output.stderr));
stderr.write_all(&output.stderr)?;
}
Ok(output.status.code().unwrap_or_else(|| {
if let Some(sig) = status_signal(output.status) {
Expand Down
Loading