From ec164346916ca77eed137c615f2ddd96937fa4df Mon Sep 17 00:00:00 2001 From: d33d33 Date: Wed, 4 Jul 2018 16:08:25 +0200 Subject: [PATCH 1/3] feat(router): multi threaded router --- Cargo.lock | 2 +- Cargo.toml | 2 +- README.md | 1 + src/config.rs | 12 ++ src/main.rs | 17 +-- src/router.rs | 193 ------------------------------- src/router/fs.rs | 86 ++++++++++++++ src/router/mod.rs | 267 +++++++++++++++++++++++++++++++++++++++++++ src/router/router.rs | 109 ++++++++++++++++++ 9 files changed, 481 insertions(+), 208 deletions(-) delete mode 100644 src/router.rs create mode 100644 src/router/fs.rs create mode 100644 src/router/mod.rs create mode 100644 src/router/router.rs diff --git a/Cargo.lock b/Cargo.lock index df5a60f..311ebe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -43,7 +43,7 @@ dependencies = [ [[package]] name = "beamium" -version = "1.8.2" +version = "1.9.0" dependencies = [ "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "cast 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/Cargo.toml b/Cargo.toml index 4de3bdf..2671e85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beamium" -version = "1.8.2" +version = "1.9.0" authors = [ "d33d33 " ] build = "build.rs" diff --git a/README.md b/README.md index 889ae06..9d9a48f 100644 --- a/README.md +++ b/README.md @@ -109,6 +109,7 @@ parameters: # Parameters definitions (Optional) log-file: beamium.log # Log file (Optional, default: beamium.log) log-level: 4 # Log level (Optional, default: info) timeout: 500 # Http timeout (seconds) (Optional, default: 500) + router-parallel: 1 # Routing threads (Optional, default: 1) ``` ## Contributing diff --git a/src/config.rs b/src/config.rs index 8ddd483..be39404 100644 --- a/src/config.rs +++ b/src/config.rs @@ -74,6 +74,7 @@ pub struct Parameters { pub log_level: slog::Level, pub syslog: bool, pub timeout: u64, + pub router_parallel: u64, } #[derive(Debug)] @@ -176,6 +177,7 @@ pub fn load_config(config_path: &str) -> Result { log_level: slog::Level::Info, syslog: false, timeout: 300, + router_parallel: 1, }, }; @@ -544,6 +546,16 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co try!(cast::u64(timeout).map_err(|_| format!("parameters.timeout is invalid"))); config.parameters.timeout = timeout; } + if !doc["parameters"]["router-parallel"].is_badvalue() { + let router_parallel = try!( + doc["parameters"]["router-parallel"] + .as_i64() + .ok_or("parameters.router-parallel should be a number") + ); + let router_parallel = + try!(cast::u64(router_parallel).map_err(|_| format!("parameters.router-parallel is invalid"))); + config.parameters.router_parallel = router_parallel; + } } } Ok(()) diff --git a/src/main.rs b/src/main.rs index 085fd8b..818bc67 100644 --- a/src/main.rs +++ b/src/main.rs @@ -138,19 +138,8 @@ fn main() { // Spawn router info!("spawning router"); - { - let (sinks, labels, parameters, sigint) = ( - config.sinks.clone(), - config.labels.clone(), - config.parameters.clone(), - sigint.clone(), - ); - handles.push(thread::spawn(move || { - slog_scope::scope(&slog_scope::logger().new(o!()), || { - router::router(&sinks, &labels, ¶meters, sigint) - }); - })); - } + let mut router = router::Router::new(&config.sinks, &config.parameters, &config.labels); // FIXME + router.start(); // Spawn sinks info!("spawning sinks"); @@ -178,6 +167,8 @@ fn main() { handle.join().unwrap(); } + router.stop(); + for s in sinks { s.stop(); } diff --git a/src/router.rs b/src/router.rs deleted file mode 100644 index b724a97..0000000 --- a/src/router.rs +++ /dev/null @@ -1,193 +0,0 @@ -//! # Router module. -//! -//! The Router module forward sources to sinks. -use std::thread; -use std::time::Duration; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use time; -use std::cmp; -use std::collections::HashMap; -use std::io::prelude::*; -use std::fs; -use std::fs::File; -use std::error::Error; -use std::ffi::OsStr; -use std::path::{Path, PathBuf}; - -use config; -use lib; - -/// Thread sleeping time. -const REST_TIME: u64 = 10; - -/// Router loop. -pub fn router( - sinks: &Vec, - labels: &HashMap, - parameters: &config::Parameters, - sigint: Arc, -) { - let labels: String = labels.iter().fold(String::new(), |acc, (k, v)| { - let sep = if acc.is_empty() { "" } else { "," }; - acc + sep + k + "=" + v - }); - - loop { - let start = time::now_utc(); - - match route(sinks, parameters, &labels, sigint.clone()) { - Err(err) => error!("route fail: {}", err), - Ok(size) => if size > 0 { - info!("route success - {}", size) - }, - } - - let elapsed = (time::now_utc() - start).num_milliseconds() as u64; - let sleep_time = if elapsed > parameters.scan_period { - REST_TIME - } else { - cmp::max(parameters.scan_period - elapsed, REST_TIME) - }; - for _ in 0..sleep_time / REST_TIME { - thread::sleep(Duration::from_millis(REST_TIME)); - if sigint.load(Ordering::Relaxed) { - return; - } - } - } -} - -/// Route handle sources forwarding. -fn route( - sinks: &Vec, - parameters: &config::Parameters, - labels: &String, - sigint: Arc, -) -> Result> { - let mut proc_size = 0; - let mut batch_count = 0; - let start = time::now_utc().to_timespec(); - let run_id = format!("{}#{}", start.sec, start.nsec); - - loop { - if sigint.load(Ordering::Relaxed) { - return Ok(proc_size); - } - let entries = try!(fs::read_dir(¶meters.source_dir)); - let mut files = Vec::with_capacity(parameters.batch_count as usize); - let mut metrics: Vec = Vec::new(); - - // Load metrics - let mut batch_size = 0; - for (i, entry) in entries.enumerate() { - let entry = try!(entry); - // Look only for metrics files - if entry.path().extension() != Some(OsStr::new("metrics")) { - continue; - } - - // Split metrics in capped batch - if i > parameters.batch_count as usize || batch_size > parameters.batch_size as usize { - break; - } - - debug!("open source file {}", format!("{:?}", entry.path())); - let file = match read(entry.path()) { - Err(err) => { - warn!("{}", err); - continue; - } - Ok(v) => v, - }; - - for line in file.lines() { - match lib::add_labels(&line, labels) { - Ok(v) => metrics.push(v), - Err(_) => { - warn!("bad line {}", &line); - continue; - } - }; - } - - files.push(entry.path()); - batch_size += file.len(); - } - - proc_size += metrics.len(); - batch_count += 1; - - // Nothing to do - if files.len() == 0 { - break; - } - - // Setup sinks files - let dir = Path::new(¶meters.sink_dir); - { - let mut sink_files = Vec::with_capacity(sinks.len() as usize); - // Open tmp files - for sink in sinks { - let sink_file = dir.join(format!("{}.tmp", sink.name)); - debug!("open tmp sink file {}", format!("{:?}", sink_file)); - sink_files.push(try!(File::create(sink_file))); - } - - // Write metrics - debug!("write sink files"); - for line in metrics { - if line.is_empty() { - continue; - } - - for (i, sink) in sinks.iter().enumerate() { - if sink.selector.is_some() { - let selector = sink.selector.as_ref().unwrap(); - if !line.split_whitespace() - .nth(1) - .map_or(false, |class| selector.is_match(class)) - { - continue; - } - } - try!(sink_files[i].write(line.as_bytes())); - try!(sink_files[i].write(b"\n")); - } - } - - // Flush - for i in 0..sinks.len() { - try!(sink_files[i].flush()); - } - } - - // Rotate - for sink in sinks { - let dest_file = dir.join(format!("{}-{}-{}.metrics", sink.name, run_id, batch_count)); - debug!("rotate tmp sink file to {}", format!("{:?}", dest_file)); - try!(fs::rename( - dir.join(format!("{}.tmp", sink.name)), - dest_file - )); - } - - // Delete forwarded data - for f in files { - debug!("delete source file {}", format!("{:?}", f)); - try!(fs::remove_file(f)); - } - } - - Ok(proc_size) -} - -/// Read a file as String -fn read(path: PathBuf) -> Result> { - let mut file = try!(File::open(path)); - - let mut content = String::new(); - try!(file.read_to_string(&mut content)); - - Ok(content) -} diff --git a/src/router/fs.rs b/src/router/fs.rs new file mode 100644 index 0000000..871e4d1 --- /dev/null +++ b/src/router/fs.rs @@ -0,0 +1,86 @@ +use std::cmp; +use std::collections::{HashSet, VecDeque}; +use std::error::Error; +use std::ffi::OsStr; +use std::fs; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration}; + +use time; + +use futures::future::Shared; +use futures::sync::oneshot; + + +use config; + +pub fn fs_thread( + dir: &str, + period: u64, + todo: Arc>>, + sigint: Shared>, +) { + let mut files: HashSet = HashSet::new(); + + loop { + let start = time::now_utc(); + match list(dir, &files) { + Err(err) => error!("list fail: {}", err), + Ok((new, deleted)) => { + if new.len() > 0 { + debug!("found {} files", new.len()); + } + + { + let mut todo = todo.lock().unwrap(); + for f in new { + files.insert(f.clone()); + todo.push_front(f); + } + } + for f in deleted { + files.remove(&f); + } + } + } + + let elapsed = (time::now_utc() - start).num_milliseconds() as u64; + let sleep_time = if elapsed > period { + config::REST_TIME + } else { + cmp::max(period - elapsed, config::REST_TIME) + }; + for _ in 0..sleep_time / config::REST_TIME { + thread::sleep(Duration::from_millis(config::REST_TIME)); + if sigint.peek().is_some() { + return; + } + } + } +} + +fn list( + dir: &str, + files: &HashSet, +) -> Result<(Vec, Vec), Box> { + let entries: HashSet = fs::read_dir(dir)? + .filter_map(|entry| { + if entry.is_err() { + return None; + } + let entry = entry.unwrap(); + if entry.path().extension() != Some(OsStr::new("metrics")) { + return None; + } + + Some(entry.path()) + }) + .collect(); + + let deleted = files.difference(&entries).cloned().collect(); + let new = entries.difference(&files).cloned().collect(); + + Ok((new, deleted)) +} diff --git a/src/router/mod.rs b/src/router/mod.rs new file mode 100644 index 0000000..36faea5 --- /dev/null +++ b/src/router/mod.rs @@ -0,0 +1,267 @@ +//! # Router module. +//! +//! The Router module forward sources to sinks. +use std::thread; +use std::collections::HashMap; +use std::path::PathBuf; +use std::collections::VecDeque; + +use std::sync::{Arc, Mutex}; +use futures::future::Shared; +use futures::sync::oneshot; +use futures::Future; + +use slog_scope; +use config; + +mod fs; +mod router; + +pub struct Router<'a> { + todo: Arc>>, + handles: Vec>, + dir: &'a String, + watch_period: u64, + parallel: u64, + sinks: &'a Vec, + sink_dir: &'a String, + labels: &'a HashMap, + sigint: (oneshot::Sender<()>, Shared>), +} + +impl<'a> Router<'a> { + pub fn new(sinks: &'a Vec, parameters: &'a config::Parameters, labels: &'a HashMap) -> Router<'a> { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + Router { + todo: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + sigint: (shutdown_tx, shutdown_rx.shared()), + dir: ¶meters.source_dir, + parallel: parameters.router_parallel, + sinks: sinks, + sink_dir: ¶meters.sink_dir, + labels: labels, + watch_period: parameters.scan_period, + } + } + + pub fn start(&mut self) { + debug!("start router"); + // Build labels string + let labels: String = self.labels.iter().fold(String::new(), |acc, (k, v)| { + let sep = if acc.is_empty() { "" } else { "," }; + acc + sep + k + "=" + v + }); + + // spawn fs thread + let (dir, todo) = (self.dir.clone(), self.todo.clone()); + let (period, sigint) = (self.watch_period, self.sigint.1.clone()); + + self.handles.push(thread::spawn(move || { + slog_scope::scope( + &slog_scope::logger().new(o!()), + || fs::fs_thread(&dir, period, todo, sigint), + ); + })); + + // spawn router threads + for idx in 0..self.parallel { + let sinks = self.sinks.clone(); + let sink_dir = self.sink_dir.clone(); + let sigint = self.sigint.1.clone(); + let todo = self.todo.clone(); + let labels = labels.clone(); + + self.handles.push(thread::spawn(move || { + slog_scope::scope( + &slog_scope::logger().new(o!()), + || { + router::router_thread(sinks, sink_dir, todo, labels, sigint, idx); + }, + ) + })); + } + } + + pub fn stop(self) { + self.sigint.0.send(()).unwrap(); // FIXME + // for handle in self.handles { + // handle.join().unwrap(); + // } + } +} + +// /// Thread sleeping time. +// const REST_TIME: u64 = 10; + +// /// Router loop. +// pub fn router( +// sinks: &Vec, +// labels: &HashMap, +// parameters: &config::Parameters, +// sigint: Arc, +// ) { +// let labels: String = labels.iter().fold(String::new(), |acc, (k, v)| { +// let sep = if acc.is_empty() { "" } else { "," }; +// acc + sep + k + "=" + v +// }); + +// loop { +// let start = time::now_utc(); + +// match route(sinks, parameters, &labels, sigint.clone()) { +// Err(err) => error!("route fail: {}", err), +// Ok(size) => if size > 0 { +// info!("route success - {}", size) +// }, +// } + +// let elapsed = (time::now_utc() - start).num_milliseconds() as u64; +// let sleep_time = if elapsed > parameters.scan_period { +// REST_TIME +// } else { +// cmp::max(parameters.scan_period - elapsed, REST_TIME) +// }; +// for _ in 0..sleep_time / REST_TIME { +// thread::sleep(Duration::from_millis(REST_TIME)); +// if sigint.load(Ordering::Relaxed) { +// return; +// } +// } +// } +// } + +// /// Route handle sources forwarding. +// fn route( +// sinks: &Vec, +// parameters: &config::Parameters, +// labels: &String, +// sigint: Arc, +// ) -> Result> { +// let mut proc_size = 0; +// let mut batch_count = 0; +// let start = time::now_utc().to_timespec(); +// let run_id = format!("{}#{}", start.sec, start.nsec); + +// loop { +// if sigint.load(Ordering::Relaxed) { +// return Ok(proc_size); +// } +// let entries = try!(fs::read_dir(¶meters.source_dir)); +// let mut files = Vec::with_capacity(parameters.batch_count as usize); +// let mut metrics: Vec = Vec::new(); + +// // Load metrics +// let mut batch_size = 0; +// for (i, entry) in entries.enumerate() { +// let entry = try!(entry); +// // Look only for metrics files +// if entry.path().extension() != Some(OsStr::new("metrics")) { +// continue; +// } + +// // Split metrics in capped batch +// if i > parameters.batch_count as usize || batch_size > parameters.batch_size as usize { +// break; +// } + +// debug!("open source file {}", format!("{:?}", entry.path())); +// let file = match read(entry.path()) { +// Err(err) => { +// warn!("{}", err); +// continue; +// } +// Ok(v) => v, +// }; + +// for line in file.lines() { +// match lib::add_labels(&line, labels) { +// Ok(v) => metrics.push(v), +// Err(_) => { +// warn!("bad line {}", &line); +// continue; +// } +// }; +// } + +// files.push(entry.path()); +// batch_size += file.len(); +// } + +// proc_size += metrics.len(); +// batch_count += 1; + +// // Nothing to do +// if files.len() == 0 { +// break; +// } + +// // Setup sinks files +// let dir = Path::new(¶meters.sink_dir); +// { +// let mut sink_files = Vec::with_capacity(sinks.len() as usize); +// // Open tmp files +// for sink in sinks { +// let sink_file = dir.join(format!("{}.tmp", sink.name)); +// debug!("open tmp sink file {}", format!("{:?}", sink_file)); +// sink_files.push(try!(File::create(sink_file))); +// } + +// // Write metrics +// debug!("write sink files"); +// for line in metrics { +// if line.is_empty() { +// continue; +// } + +// for (i, sink) in sinks.iter().enumerate() { +// if sink.selector.is_some() { +// let selector = sink.selector.as_ref().unwrap(); +// if !line.split_whitespace() +// .nth(1) +// .map_or(false, |class| selector.is_match(class)) +// { +// continue; +// } +// } +// try!(sink_files[i].write(line.as_bytes())); +// try!(sink_files[i].write(b"\n")); +// } +// } + +// // Flush +// for i in 0..sinks.len() { +// try!(sink_files[i].flush()); +// } +// } + +// // Rotate +// for sink in sinks { +// let dest_file = dir.join(format!("{}-{}-{}.metrics", sink.name, run_id, batch_count)); +// debug!("rotate tmp sink file to {}", format!("{:?}", dest_file)); +// try!(fs::rename( +// dir.join(format!("{}.tmp", sink.name)), +// dest_file +// )); +// } + +// // Delete forwarded data +// for f in files { +// debug!("delete source file {}", format!("{:?}", f)); +// try!(fs::remove_file(f)); +// } +// } + +// Ok(proc_size) +// } + +// /// Read a file as String +// fn read(path: PathBuf) -> Result> { +// let mut file = try!(File::open(path)); + +// let mut content = String::new(); +// try!(file.read_to_string(&mut content)); + +// Ok(content) +// } diff --git a/src/router/router.rs b/src/router/router.rs new file mode 100644 index 0000000..c632ccf --- /dev/null +++ b/src/router/router.rs @@ -0,0 +1,109 @@ +use std::thread; +use std::time::Duration; +use time; +use std::io::prelude::*; +use std::fs::File; +use std::error::Error; +use std::path::{Path, PathBuf}; +use std::collections::VecDeque; +use std::fs; +use std::io::BufWriter; + +use std::sync::{Arc, Mutex}; +use futures::future::Shared; +use futures::sync::oneshot; +use config; +use lib; + +pub fn router_thread( + sinks: Vec, + sink_dir: String, + todo: Arc>>, + labels: String, + sigint: Shared>, + id: u64 +) { + loop { + match todo.lock().unwrap().pop_front() { + Some(path) => { + if let Err(err) = route(path, &labels, &sinks, &sink_dir, id) { + warn!("{}", err); + } + } + None => { + thread::sleep(Duration::from_millis(config::REST_TIME)); + } + } + + if sigint.peek().is_some() { + return; + } + } +} + +fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &String, id: u64) -> Result<(), Box> { + let start = time::now_utc().to_timespec(); + let run_id = format!("{}#{}", start.sec, start.nsec); + + // Load metrics + debug!("open {}", format!("{:?}", path)); + let mut content = String::new(); + File::open(&path)?.read_to_string(&mut content)?; + + let mut metrics: Vec = Vec::new(); + for line in content.lines() { + metrics.push(lib::add_labels(&line, labels)?); + } + + // Nothing to do + if metrics.len() == 0 { + // Delete source file + debug!("delete {}", format!("{:?}", path)); + fs::remove_file(&path)?; + + return Ok(()); + } + + // Setup sinks files + let dir = Path::new(&sink_dir); // FIXME + + // Open tmp files + for sink in sinks { + let sink_file_name = dir.join(format!("{}-{}.tmp", sink.name, id)); + debug!("open tmp sink file {}", format!("{:?}", sink_file_name)); + let mut sink_file = BufWriter::new(File::create(&sink_file_name)?); + + // Write metrics + for line in metrics.iter() { + if line.is_empty() { + continue; + } + + if sink.selector.is_some() { + let selector = sink.selector.as_ref().unwrap(); + if !line.split_whitespace() + .nth(1) + .map_or(false, |class| selector.is_match(class)) + { + continue; + } + } + sink_file.write(line.as_bytes())?; + sink_file.write(b"\n")?; + } + + // Write buffered datas + sink_file.flush()?; + + // Rotate + let dest_file = dir.join(format!("{}-{}-{}.metrics", sink.name, id, run_id)); + debug!("rename {:?} to {:?}", sink_file_name, dest_file); + fs::rename(sink_file_name, dest_file)?; + + // Delete source file + debug!("delete {}", format!("{:?}", path)); + fs::remove_file(&path)?; + } + + Ok(()) +} From d1e4dc04923ecb891bcefd3601501e5db3596daf Mon Sep 17 00:00:00 2001 From: d33d33 Date: Wed, 4 Jul 2018 19:00:44 +0200 Subject: [PATCH 2/3] chore: fix clippy --- build.rs | 4 +- src/config.rs | 390 +++++++++++++---------------- src/lib/mod.rs | 4 +- src/lib/transcompiler.rs | 22 +- src/log.rs | 23 +- src/main.rs | 16 +- src/router/fs.rs | 34 ++- src/router/mod.rs | 274 ++++---------------- src/router/{router.rs => route.rs} | 54 ++-- src/scraper.rs | 53 ++-- src/sink/fs.rs | 48 ++-- src/sink/mod.rs | 96 +++---- src/sink/send.rs | 47 ++-- 13 files changed, 417 insertions(+), 648 deletions(-) rename src/router/{router.rs => route.rs} (75%) diff --git a/build.rs b/build.rs index 92befd2..9a3293e 100644 --- a/build.rs +++ b/build.rs @@ -1,8 +1,8 @@ -use std::process::Command; -use std::path::Path; use std::env; use std::fs::File; use std::io::prelude::*; +use std::path::Path; +use std::process::Command; fn main() { let output = Command::new("git") diff --git a/src/config.rs b/src/config.rs index be39404..68208e9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,20 +3,20 @@ //! The Config module provides the beamium configuration. //! It set defaults and then load config from '/etc', local dir and provided path. +use cast; +use hyper; +use regex; +use slog; +use std::collections::HashMap; +use std::error; +use std::error::Error; +use std::fmt; use std::fs::File; -use std::io::Read; use std::io; -use std::fmt; -use std::string::String; +use std::io::Read; use std::path::Path; -use std::error; -use std::error::Error; +use std::string::String; use yaml_rust::{ScanError, YamlLoader}; -use cast; -use std::collections::HashMap; -use regex; -use slog; -use hyper; pub const REST_TIME: u64 = 10; @@ -171,7 +171,7 @@ pub fn load_config(config_path: &str) -> Result { scan_period: 1000, sink_dir: String::from("sinks"), source_dir: String::from("sources"), - batch_size: 200000, + batch_size: 200_000, batch_count: 250, log_file: String::from(env!("CARGO_PKG_NAME")) + ".log", log_level: slog::Level::Info, @@ -184,28 +184,29 @@ pub fn load_config(config_path: &str) -> Result { if config_path.is_empty() { // Load from etc if Path::new("/etc/beamium/config.yaml").exists() { - try!(load_path("/etc/beamium/config.yaml", &mut config)); + load_path("/etc/beamium/config.yaml", &mut config)?; } else if Path::new("config.yaml").exists() { // Load local - try!(load_path("config.yaml", &mut config)); + load_path("config.yaml", &mut config)?; } } else { // Load from provided path - try!(load_path(config_path, &mut config)); + load_path(config_path, &mut config)?; } Ok(config) } /// Extend config from file. +#[allow(unknown_lints, cyclomatic_complexity)] fn load_path>(file_path: P, config: &mut Config) -> Result<(), ConfigError> { - let mut file = try!(File::open(file_path)); + let mut file = File::open(file_path)?; let mut contents = String::new(); - try!(file.read_to_string(&mut contents)); - let docs = try!(YamlLoader::load_from_str(&contents)); + file.read_to_string(&mut contents)?; + let docs = YamlLoader::load_from_str(&contents)?; let config_scraper_keys = ["sources", "scrapers"]; for doc in &docs { - for config_scraper_key in config_scraper_keys.iter() { + for config_scraper_key in &config_scraper_keys { let key = *config_scraper_key; if !doc[key].is_badvalue() { if "sources" == key { @@ -215,31 +216,29 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co ) } - let scrapers = try!(doc[key].as_hash().ok_or(format!("{} should be a map", key))); + let scrapers = doc[key] + .as_hash() + .ok_or_else(|| format!("{} should be a map", key))?; for (k, v) in scrapers { - let name = try!(k.as_str().ok_or(format!("{} keys should be a string", key))); - let url = try!(v["url"].as_str().ok_or(format!( - "{}.{}.url is required and should be a string", - key, name - ))); - let url = try!(url.parse::()); - let period = try!(v["period"].as_i64().ok_or(format!( - "{}.{}.period is required and should be a number", - key, name - ))); - let period = try!( - cast::u64(period) - .map_err(|_| format!("scrapers.{}.period is invalid", name)) - ); + let name = k + .as_str() + .ok_or_else(|| format!("{} keys should be a string", key))?; + let url = v["url"].as_str().ok_or_else(|| { + format!("{}.{}.url is required and should be a string", key, name) + })?; + let url = url.parse::()?; + let period = v["period"].as_i64().ok_or_else(|| { + format!("{}.{}.period is required and should be a number", key, name) + })?; + let period = cast::u64(period) + .map_err(|_| format!("scrapers.{}.period is invalid", name))?; let format = if v["format"].is_badvalue() { ScraperFormat::Prometheus } else { - let f = try!( - v["format"] - .as_str() - .ok_or(format!("scrapers.{}.format should be a string", name)) - ); + let f = v["format"] + .as_str() + .ok_or_else(|| format!("scrapers.{}.format should be a string", name))?; if f == "prometheus" { ScraperFormat::Prometheus @@ -257,41 +256,38 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co None } else { let mut metrics = Vec::new(); - let values = try!( - v["metrics"] - .as_vec() - .ok_or(format!("scrapers.{}.metrics should be an array", name)) - ); + let values = v["metrics"].as_vec().ok_or_else(|| { + format!("scrapers.{}.metrics should be an array", name) + })?; for v in values { - let value = try!(regex::Regex::new(try!( - v.as_str() - .ok_or(format!("scrapers.{}.metrics is invalid", name)) - ))); + let value = + regex::Regex::new(v.as_str().ok_or_else(|| { + format!("scrapers.{}.metrics is invalid", name) + })?)?; metrics.push(String::from(r"^(\S*)\s") + value.as_str()); } - Some(try!(regex::RegexSet::new(&metrics))) + Some(regex::RegexSet::new(&metrics)?) }; // let headers = HashMap::new(); let headers = if v["headers"].is_badvalue() { HashMap::new() } else { - let heads = try!( - v["headers"] - .as_hash() - .ok_or(format!("scrapers.{}.headers should be a map", name)) - ); + let heads = v["headers"] + .as_hash() + .ok_or_else(|| format!("scrapers.{}.headers should be a map", name))?; let mut ret = HashMap::new(); for (k, v) in heads { - let hname = try!(k.as_str().ok_or(format!( - "scrapers.{}.headers keys should be a string", - name - ))); - let value = try!(v.as_str().ok_or(format!( - "scrapers.{}.headers.{} value should be a string", - hname, name - ))); + let hname = k.as_str().ok_or_else(|| { + format!("scrapers.{}.headers keys should be a string", name) + })?; + let value = v.as_str().ok_or_else(|| { + format!( + "scrapers.{}.headers.{} value should be a string", + hname, name + ) + })?; ret.insert(String::from(hname), String::from(value)); } @@ -300,146 +296,134 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let mut labels = HashMap::new(); if !v["labels"].is_badvalue() { - let slabels = try!(v["labels"].as_hash().ok_or("labels should be a map")); + let slabels = v["labels"] + .as_hash() + .ok_or_else(|| "labels should be a map")?; for (k, v) in slabels { - let lname = try!(k.as_str().ok_or(format!( - "scrapers.{}.labels keys should be a string", - name - ))); - let value = try!(v.as_str().ok_or(format!( - "scrapers.{}.labels.{} value should be a string", - name, lname - ))); + let lname = k.as_str().ok_or_else(|| { + format!("scrapers.{}.labels keys should be a string", name) + })?; + let value = v.as_str().ok_or_else(|| { + format!( + "scrapers.{}.labels.{} value should be a string", + name, lname + ) + })?; labels.insert(String::from(lname), String::from(value)); } } config.scrapers.push(Scraper { name: String::from(name), - url: url, - period: period, - format: format, - metrics: metrics, - headers: headers, - labels: labels, + url, + period, + format, + metrics, + headers, + labels, }) } } } if !doc["sinks"].is_badvalue() { - let sinks = try!(doc["sinks"].as_hash().ok_or("sinks should be a map")); + let sinks = doc["sinks"] + .as_hash() + .ok_or_else(|| "sinks should be a map")?; for (k, v) in sinks { - let name = try!(k.as_str().ok_or("sinks keys should be a string")); - let url = try!(v["url"].as_str().ok_or(format!( - "sinks.{}.url is required and should be a string", - name - ))); - let url = try!(url.parse::()); - let token = try!(v["token"].as_str().ok_or(format!( - "sinks.{}.token is required and should be a string", - name - ))); + let name = k.as_str().ok_or_else(|| "sinks keys should be a string")?; + let url = v["url"].as_str().ok_or_else(|| { + format!("sinks.{}.url is required and should be a string", name) + })?; + let url = url.parse::()?; + let token = v["token"].as_str().ok_or_else(|| { + format!("sinks.{}.token is required and should be a string", name) + })?; let token_header = if v["token-header"].is_badvalue() { "X-Warp10-Token" } else { - try!( - v["token-header"] - .as_str() - .ok_or(format!("sinks.{}.token-header should be a string", name)) - ) + v["token-header"] + .as_str() + .ok_or_else(|| format!("sinks.{}.token-header should be a string", name))? }; let selector = if v["selector"].is_badvalue() { None } else { - Some(try!(regex::Regex::new( + Some(regex::Regex::new( format!( "^{}", - try!(v["selector"].as_str().ok_or(format!( + v["selector"].as_str().ok_or_else(|| format!( "sinks.{}.selector \ is invalid", name - ))) - ).as_str() - ))) + ))? + ).as_str(), + )?) }; let ttl = if v["ttl"].is_badvalue() { 3600 } else { - let ttl = try!( - v["ttl"] - .as_i64() - .ok_or(format!("sinks.{}.ttl should be a number", name)) - ); - try!( - cast::u64(ttl) - .map_err(|_| format!("sinks.{}.ttl should be a positive number", name)) - ) + let ttl = v["ttl"] + .as_i64() + .ok_or_else(|| format!("sinks.{}.ttl should be a number", name))?; + cast::u64(ttl) + .map_err(|_| format!("sinks.{}.ttl should be a positive number", name))? }; - let size = - if v["size"].is_badvalue() { - 1073741824 - } else { - let size = try!( - v["size"] - .as_i64() - .ok_or(format!("sinks.{}.size should be a number", name)) - ); - try!(cast::u64(size).map_err(|_| format!( - "sinks.{}.size should be a positive number", - name - ))) - }; + let size = if v["size"].is_badvalue() { + 1_073_741_824 + } else { + let size = v["size"] + .as_i64() + .ok_or_else(|| format!("sinks.{}.size should be a number", name))?; + cast::u64(size) + .map_err(|_| format!("sinks.{}.size should be a positive number", name))? + }; let parallel = if v["parallel"].is_badvalue() { 1 } else { - let parallel = try!( - v["parallel"] - .as_i64() - .ok_or(format!("sinks.{}.parallel should be a number", name)) - ); - try!(cast::u64(parallel).map_err(|_| format!( - "sinks.{}.parallel should be a positive number", - name - ))) + let parallel = v["parallel"] + .as_i64() + .ok_or_else(|| format!("sinks.{}.parallel should be a number", name))?; + cast::u64(parallel).map_err(|_| { + format!("sinks.{}.parallel should be a positive number", name) + })? }; let keep_alive = if v["keep-alive"].is_badvalue() { true } else { - try!( - v["keep-alive"] - .as_bool() - .ok_or(format!("sinks.{}.keep-alive should be a boolean", name)) - ) + v["keep-alive"] + .as_bool() + .ok_or_else(|| format!("sinks.{}.keep-alive should be a boolean", name))? }; config.sinks.push(Sink { name: String::from(name), - url: url, + url, token: String::from(token), token_header: String::from(token_header), - selector: selector, - ttl: ttl, - size: size, - parallel: parallel, - keep_alive: keep_alive, + selector, + ttl, + size, + parallel, + keep_alive, }) } } if !doc["labels"].is_badvalue() { - let labels = try!(doc["labels"].as_hash().ok_or("labels should be a map")); + let labels = doc["labels"] + .as_hash() + .ok_or_else(|| "labels should be a map")?; for (k, v) in labels { - let name = try!(k.as_str().ok_or("labels keys should be a string")); - let value = try!( - v.as_str() - .ok_or(format!("labels.{} value should be a string", name)) - ); + let name = k.as_str().ok_or_else(|| "labels keys should be a string")?; + let value = v + .as_str() + .ok_or_else(|| format!("labels.{} value should be a string", name))?; config .labels .insert(String::from(name), String::from(value)); @@ -448,112 +432,84 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co if !doc["parameters"].is_badvalue() { if !doc["parameters"]["source-dir"].is_badvalue() { - let source_dir = try!( - doc["parameters"]["source-dir"] - .as_str() - .ok_or(format!("parameters.source-dir should be a string")) - ); + let source_dir = doc["parameters"]["source-dir"] + .as_str() + .ok_or_else(|| "parameters.source-dir should be a string".to_string())?; config.parameters.source_dir = String::from(source_dir); } if !doc["parameters"]["sink-dir"].is_badvalue() { - let sink_dir = try!( - doc["parameters"]["sink-dir"] - .as_str() - .ok_or(format!("parameters.sink-dir should be a string")) - ); + let sink_dir = doc["parameters"]["sink-dir"] + .as_str() + .ok_or_else(|| "parameters.sink-dir should be a string".to_string())?; config.parameters.sink_dir = String::from(sink_dir); } if !doc["parameters"]["scan-period"].is_badvalue() { - let scan_period = try!( - doc["parameters"]["scan-period"] - .as_i64() - .ok_or(format!("parameters.scan-period should be a number")) - ); - let scan_period = try!( - cast::u64(scan_period) - .map_err(|_| format!("parameters.scan-period is invalid")) - ); + let scan_period = doc["parameters"]["scan-period"] + .as_i64() + .ok_or_else(|| "parameters.scan-period should be a number".to_string())?; + let scan_period = cast::u64(scan_period) + .map_err(|_| "parameters.scan-period is invalid".to_string())?; config.parameters.scan_period = scan_period; } if !doc["parameters"]["batch-size"].is_badvalue() { - let batch_size = try!( - doc["parameters"]["batch-size"] - .as_i64() - .ok_or(format!("parameters.batch-size should be a number")) - ); - let batch_size = try!( - cast::u64(batch_size).map_err(|_| format!("parameters.batch-size is invalid")) - ); + let batch_size = doc["parameters"]["batch-size"] + .as_i64() + .ok_or_else(|| "parameters.batch-size should be a number".to_string())?; + let batch_size = cast::u64(batch_size) + .map_err(|_| "parameters.batch-size is invalid".to_string())?; config.parameters.batch_size = batch_size; } if !doc["parameters"]["batch-count"].is_badvalue() { - let batch_count = try!( - doc["parameters"]["batch-count"] - .as_i64() - .ok_or(format!("parameters.batch-count should be a number")) - ); - let batch_count = try!( - cast::u64(batch_count) - .map_err(|_| format!("parameters.batch-count is invalid")) - ); + let batch_count = doc["parameters"]["batch-count"] + .as_i64() + .ok_or_else(|| "parameters.batch-count should be a number".to_string())?; + let batch_count = cast::u64(batch_count) + .map_err(|_| "parameters.batch-count is invalid".to_string())?; config.parameters.batch_count = batch_count; } if !doc["parameters"]["log-file"].is_badvalue() { - let log_file = try!( - doc["parameters"]["log-file"] - .as_str() - .ok_or(format!("parameters.log-file should be a string")) - ); + let log_file = doc["parameters"]["log-file"] + .as_str() + .ok_or_else(|| "parameters.log-file should be a string".to_string())?; config.parameters.log_file = String::from(log_file); } if !doc["parameters"]["log-level"].is_badvalue() { - let log_level = try!( - doc["parameters"]["log-level"] - .as_i64() - .ok_or(format!("parameters.log-level should be a number")) - ); - let log_level = try!( - cast::u64(log_level).map_err(|_| format!("parameters.log-level is invalid")) - ); - let log_level = try!( - slog::Level::from_usize(log_level as usize) - .ok_or(format!("parameters.log-level is invalid")) - ); + let log_level = doc["parameters"]["log-level"] + .as_i64() + .ok_or_else(|| "parameters.log-level should be a number".to_string())?; + let log_level = cast::u64(log_level) + .map_err(|_| "parameters.log-level is invalid".to_string())?; + let log_level = slog::Level::from_usize(log_level as usize) + .ok_or_else(|| "parameters.log-level is invalid".to_string())?; config.parameters.log_level = log_level; } if !doc["parameters"]["syslog"].is_badvalue() { - let syslog = try!( - doc["parameters"]["syslog"] - .as_bool() - .ok_or("parameters.bool should be a boolean") - ); + let syslog = doc["parameters"]["syslog"] + .as_bool() + .ok_or_else(|| "parameters.bool should be a boolean")?; config.parameters.syslog = syslog; } if !doc["parameters"]["timeout"].is_badvalue() { - let timeout = try!( - doc["parameters"]["timeout"] - .as_i64() - .ok_or("parameters.timeout should be a number") - ); + let timeout = doc["parameters"]["timeout"] + .as_i64() + .ok_or_else(|| "parameters.timeout should be a number")?; let timeout = - try!(cast::u64(timeout).map_err(|_| format!("parameters.timeout is invalid"))); + cast::u64(timeout).map_err(|_| "parameters.timeout is invalid".to_string())?; config.parameters.timeout = timeout; } if !doc["parameters"]["router-parallel"].is_badvalue() { - let router_parallel = try!( - doc["parameters"]["router-parallel"] - .as_i64() - .ok_or("parameters.router-parallel should be a number") - ); - let router_parallel = - try!(cast::u64(router_parallel).map_err(|_| format!("parameters.router-parallel is invalid"))); + let router_parallel = doc["parameters"]["router-parallel"] + .as_i64() + .ok_or_else(|| "parameters.router-parallel should be a number")?; + let router_parallel = cast::u64(router_parallel) + .map_err(|_| "parameters.router-parallel is invalid".to_string())?; config.parameters.router_parallel = router_parallel; } } diff --git a/src/lib/mod.rs b/src/lib/mod.rs index ecfc5ea..11691e9 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -7,7 +7,7 @@ pub fn add_labels(line: &str, labels: &str) -> Result> { return Ok(String::from(line)); } - let mut parts = line.splitn(2, "{"); + let mut parts = line.splitn(2, '{'); let class = parts.next().ok_or("no_class")?; let class = String::from(class); @@ -15,7 +15,7 @@ pub fn add_labels(line: &str, labels: &str) -> Result> { let plabels = parts.next().ok_or("no_labels")?; let plabels = String::from(plabels); - let sep = if plabels.trim().starts_with("}") { + let sep = if plabels.trim().starts_with('}') { "" } else { "," diff --git a/src/lib/transcompiler.rs b/src/lib/transcompiler.rs index ce8d155..77e2f8d 100644 --- a/src/lib/transcompiler.rs +++ b/src/lib/transcompiler.rs @@ -11,17 +11,15 @@ pub struct Transcompiler<'a> { impl<'a> Transcompiler<'a> { pub fn new(format: &config::ScraperFormat) -> Transcompiler { let start = time::now_utc(); - let now = start.to_timespec().sec * 1000 * 1000 + (start.to_timespec().nsec as i64 / 1000); - Transcompiler { - format: format, - now: now, - } + let now = start.to_timespec().sec * 1000 * 1000 + + (i64::from(start.to_timespec().nsec) as i64 / 1000); + Transcompiler { format, now } } pub fn format(&self, line: &str) -> Result> { - match self.format { - &config::ScraperFormat::Sensision => format_warp10(line), - &config::ScraperFormat::Prometheus => format_prometheus(line, self.now), + match *self.format { + config::ScraperFormat::Sensision => format_warp10(line), + config::ScraperFormat::Prometheus => format_prometheus(line, self.now), } } } @@ -36,12 +34,12 @@ fn format_prometheus(line: &str, now: i64) -> Result> { let line = line.trim(); // Skip comments - if line.starts_with("#") { + if line.starts_with('#') { return Ok(String::new()); } // Extract Prometheus metric - let index = if line.contains("{") { + let index = if line.contains('{') { line.rfind('}').ok_or("bad class")? } else { line.find(' ').ok_or("bad class")? @@ -62,7 +60,7 @@ fn format_prometheus(line: &str, now: i64) -> Result> { .unwrap_or(now); // Format class - let mut parts = class.splitn(2, "{"); + let mut parts = class.splitn(2, '{'); let class = String::from(parts.next().ok_or("no_class")?); let class = class.trim(); let plabels = parts.next(); @@ -78,7 +76,7 @@ fn format_prometheus(line: &str, now: i64) -> Result> { .map(|v| v.replace(r"\n", "%0A")) // unescape .fold(String::new(), |acc, x| { // skip invalid values - if !x.contains("=") { + if !x.contains('=') { return acc } acc + &x + "," diff --git a/src/log.rs b/src/log.rs index c839ed8..95eecd7 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,22 +1,22 @@ //! # Log module. //! //! The Config module provides the log facility. -use slog::Logger; -use slog::Level; +use slog::Drain; use slog::Duplicate; +use slog::Level; use slog::LevelFilter; -use slog::Drain; +use slog::Logger; use slog_async; -use slog_term; +use slog_scope; use slog_syslog; use slog_syslog::Facility; -use slog_scope; -use std::fs::OpenOptions; -use std::os::unix::fs::OpenOptionsExt; +use slog_term; use std; -use std::path::Path; use std::error::Error; use std::fs; +use std::fs::OpenOptions; +use std::os::unix::fs::OpenOptionsExt; +use std::path::Path; use config; @@ -33,10 +33,9 @@ pub fn bootstrap() { /// Send log to console and log file, also handle log level. pub fn log(parameters: &config::Parameters, verbose: u64) -> Result<(), Box> { // Ensure log directory is present - match Path::new(¶meters.log_file).parent() { - Some(log_path) => fs::create_dir_all(log_path)?, - None => {} - }; + if let Some(log_path) = Path::new(¶meters.log_file).parent() { + fs::create_dir_all(log_path)? + } // Stdout drain let term_decorator = slog_term::TermDecorator::new().build(); diff --git a/src/main.rs b/src/main.rs index 818bc67..2c3fc00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,18 +27,18 @@ extern crate tokio_timer; extern crate yaml_rust; use clap::App; -use std::thread; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::fs; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; use std::time::Duration; mod config; -mod scraper; -mod router; -mod sink; mod lib; mod log; +mod router; +mod scraper; +mod sink; include!("version.rs"); @@ -131,14 +131,14 @@ fn main() { handles.push(thread::spawn(move || { slog_scope::scope( &slog_scope::logger().new(o!("scraper" => scraper.name.clone())), - || scraper::scraper(&scraper, ¶meters, sigint), + || scraper::scraper(&scraper, ¶meters, &sigint), ); })); } // Spawn router info!("spawning router"); - let mut router = router::Router::new(&config.sinks, &config.parameters, &config.labels); // FIXME + let mut router = router::Router::new(&config.sinks, &config.parameters, &config.labels); router.start(); // Spawn sinks diff --git a/src/router/fs.rs b/src/router/fs.rs index 871e4d1..369d7ef 100644 --- a/src/router/fs.rs +++ b/src/router/fs.rs @@ -6,30 +6,32 @@ use std::fs; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::thread; -use std::time::{Duration}; +use std::time::Duration; use time; use futures::future::Shared; use futures::sync::oneshot; - use config; +use router::RouterConfig; pub fn fs_thread( - dir: &str, - period: u64, - todo: Arc>>, - sigint: Shared>, + config: &RouterConfig, + todo: &Arc>>, + sigint: &Shared>, ) { let mut files: HashSet = HashSet::new(); loop { let start = time::now_utc(); - match list(dir, &files) { + match list(&config.dir) { Err(err) => error!("list fail: {}", err), - Ok((new, deleted)) => { - if new.len() > 0 { + Ok(entries) => { + let deleted: Vec = files.difference(&entries).cloned().collect(); + let new: Vec = entries.difference(&files).cloned().collect(); + + if !new.is_empty() { debug!("found {} files", new.len()); } @@ -47,10 +49,10 @@ pub fn fs_thread( } let elapsed = (time::now_utc() - start).num_milliseconds() as u64; - let sleep_time = if elapsed > period { + let sleep_time = if elapsed > config.watch_period { config::REST_TIME } else { - cmp::max(period - elapsed, config::REST_TIME) + cmp::max(config.watch_period - elapsed, config::REST_TIME) }; for _ in 0..sleep_time / config::REST_TIME { thread::sleep(Duration::from_millis(config::REST_TIME)); @@ -61,10 +63,7 @@ pub fn fs_thread( } } -fn list( - dir: &str, - files: &HashSet, -) -> Result<(Vec, Vec), Box> { +fn list(dir: &str) -> Result, Box> { let entries: HashSet = fs::read_dir(dir)? .filter_map(|entry| { if entry.is_err() { @@ -79,8 +78,5 @@ fn list( }) .collect(); - let deleted = files.difference(&entries).cloned().collect(); - let new = entries.difference(&files).cloned().collect(); - - Ok((new, deleted)) + Ok(entries) } diff --git a/src/router/mod.rs b/src/router/mod.rs index 36faea5..439d87e 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,267 +1,99 @@ //! # Router module. //! //! The Router module forward sources to sinks. -use std::thread; use std::collections::HashMap; -use std::path::PathBuf; use std::collections::VecDeque; +use std::path::PathBuf; +use std::thread; -use std::sync::{Arc, Mutex}; use futures::future::Shared; use futures::sync::oneshot; use futures::Future; +use std::sync::{Arc, Mutex}; -use slog_scope; use config; +use slog_scope; mod fs; -mod router; +mod route; -pub struct Router<'a> { - todo: Arc>>, - handles: Vec>, - dir: &'a String, +#[derive(Debug, Clone)] +pub struct RouterConfig { + dir: String, watch_period: u64, parallel: u64, - sinks: &'a Vec, - sink_dir: &'a String, - labels: &'a HashMap, + sinks: Vec, + sink_dir: String, + labels: String, +} + +pub struct Router { + config: RouterConfig, + todo: Arc>>, + handles: Vec>, sigint: (oneshot::Sender<()>, Shared>), } -impl<'a> Router<'a> { - pub fn new(sinks: &'a Vec, parameters: &'a config::Parameters, labels: &'a HashMap) -> Router<'a> { +impl Router { + pub fn new( + sinks: &[config::Sink], + parameters: &config::Parameters, + labels: &HashMap, + ) -> Router { let (shutdown_tx, shutdown_rx) = oneshot::channel(); + // Build labels string + let labels: String = labels.iter().fold(String::new(), |acc, (k, v)| { + let sep = if acc.is_empty() { "" } else { "," }; + acc + sep + k + "=" + v + }); + + let config = RouterConfig { + dir: parameters.source_dir.clone(), + watch_period: parameters.scan_period, + parallel: parameters.router_parallel, + sinks: sinks.to_owned(), + sink_dir: parameters.sink_dir.clone(), + labels, + }; Router { + config, todo: Arc::new(Mutex::new(VecDeque::new())), handles: Vec::new(), sigint: (shutdown_tx, shutdown_rx.shared()), - dir: ¶meters.source_dir, - parallel: parameters.router_parallel, - sinks: sinks, - sink_dir: ¶meters.sink_dir, - labels: labels, - watch_period: parameters.scan_period, } } pub fn start(&mut self) { debug!("start router"); - // Build labels string - let labels: String = self.labels.iter().fold(String::new(), |acc, (k, v)| { - let sep = if acc.is_empty() { "" } else { "," }; - acc + sep + k + "=" + v - }); // spawn fs thread - let (dir, todo) = (self.dir.clone(), self.todo.clone()); - let (period, sigint) = (self.watch_period, self.sigint.1.clone()); + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { - slog_scope::scope( - &slog_scope::logger().new(o!()), - || fs::fs_thread(&dir, period, todo, sigint), - ); + slog_scope::scope(&slog_scope::logger().new(o!()), || { + fs::fs_thread(&config, &todo, &sigint) + }); })); // spawn router threads - for idx in 0..self.parallel { - let sinks = self.sinks.clone(); - let sink_dir = self.sink_dir.clone(); - let sigint = self.sigint.1.clone(); - let todo = self.todo.clone(); - let labels = labels.clone(); + for idx in 0..self.config.parallel { + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { - slog_scope::scope( - &slog_scope::logger().new(o!()), - || { - router::router_thread(sinks, sink_dir, todo, labels, sigint, idx); - }, - ) + slog_scope::scope(&slog_scope::logger().new(o!()), || { + route::route_thread(&config, &todo, &sigint, idx); + }) })); } } pub fn stop(self) { - self.sigint.0.send(()).unwrap(); // FIXME - // for handle in self.handles { - // handle.join().unwrap(); - // } + self.sigint.0.send(()).unwrap(); + for handle in self.handles { + handle.join().unwrap(); + } } } - -// /// Thread sleeping time. -// const REST_TIME: u64 = 10; - -// /// Router loop. -// pub fn router( -// sinks: &Vec, -// labels: &HashMap, -// parameters: &config::Parameters, -// sigint: Arc, -// ) { -// let labels: String = labels.iter().fold(String::new(), |acc, (k, v)| { -// let sep = if acc.is_empty() { "" } else { "," }; -// acc + sep + k + "=" + v -// }); - -// loop { -// let start = time::now_utc(); - -// match route(sinks, parameters, &labels, sigint.clone()) { -// Err(err) => error!("route fail: {}", err), -// Ok(size) => if size > 0 { -// info!("route success - {}", size) -// }, -// } - -// let elapsed = (time::now_utc() - start).num_milliseconds() as u64; -// let sleep_time = if elapsed > parameters.scan_period { -// REST_TIME -// } else { -// cmp::max(parameters.scan_period - elapsed, REST_TIME) -// }; -// for _ in 0..sleep_time / REST_TIME { -// thread::sleep(Duration::from_millis(REST_TIME)); -// if sigint.load(Ordering::Relaxed) { -// return; -// } -// } -// } -// } - -// /// Route handle sources forwarding. -// fn route( -// sinks: &Vec, -// parameters: &config::Parameters, -// labels: &String, -// sigint: Arc, -// ) -> Result> { -// let mut proc_size = 0; -// let mut batch_count = 0; -// let start = time::now_utc().to_timespec(); -// let run_id = format!("{}#{}", start.sec, start.nsec); - -// loop { -// if sigint.load(Ordering::Relaxed) { -// return Ok(proc_size); -// } -// let entries = try!(fs::read_dir(¶meters.source_dir)); -// let mut files = Vec::with_capacity(parameters.batch_count as usize); -// let mut metrics: Vec = Vec::new(); - -// // Load metrics -// let mut batch_size = 0; -// for (i, entry) in entries.enumerate() { -// let entry = try!(entry); -// // Look only for metrics files -// if entry.path().extension() != Some(OsStr::new("metrics")) { -// continue; -// } - -// // Split metrics in capped batch -// if i > parameters.batch_count as usize || batch_size > parameters.batch_size as usize { -// break; -// } - -// debug!("open source file {}", format!("{:?}", entry.path())); -// let file = match read(entry.path()) { -// Err(err) => { -// warn!("{}", err); -// continue; -// } -// Ok(v) => v, -// }; - -// for line in file.lines() { -// match lib::add_labels(&line, labels) { -// Ok(v) => metrics.push(v), -// Err(_) => { -// warn!("bad line {}", &line); -// continue; -// } -// }; -// } - -// files.push(entry.path()); -// batch_size += file.len(); -// } - -// proc_size += metrics.len(); -// batch_count += 1; - -// // Nothing to do -// if files.len() == 0 { -// break; -// } - -// // Setup sinks files -// let dir = Path::new(¶meters.sink_dir); -// { -// let mut sink_files = Vec::with_capacity(sinks.len() as usize); -// // Open tmp files -// for sink in sinks { -// let sink_file = dir.join(format!("{}.tmp", sink.name)); -// debug!("open tmp sink file {}", format!("{:?}", sink_file)); -// sink_files.push(try!(File::create(sink_file))); -// } - -// // Write metrics -// debug!("write sink files"); -// for line in metrics { -// if line.is_empty() { -// continue; -// } - -// for (i, sink) in sinks.iter().enumerate() { -// if sink.selector.is_some() { -// let selector = sink.selector.as_ref().unwrap(); -// if !line.split_whitespace() -// .nth(1) -// .map_or(false, |class| selector.is_match(class)) -// { -// continue; -// } -// } -// try!(sink_files[i].write(line.as_bytes())); -// try!(sink_files[i].write(b"\n")); -// } -// } - -// // Flush -// for i in 0..sinks.len() { -// try!(sink_files[i].flush()); -// } -// } - -// // Rotate -// for sink in sinks { -// let dest_file = dir.join(format!("{}-{}-{}.metrics", sink.name, run_id, batch_count)); -// debug!("rotate tmp sink file to {}", format!("{:?}", dest_file)); -// try!(fs::rename( -// dir.join(format!("{}.tmp", sink.name)), -// dest_file -// )); -// } - -// // Delete forwarded data -// for f in files { -// debug!("delete source file {}", format!("{:?}", f)); -// try!(fs::remove_file(f)); -// } -// } - -// Ok(proc_size) -// } - -// /// Read a file as String -// fn read(path: PathBuf) -> Result> { -// let mut file = try!(File::open(path)); - -// let mut content = String::new(); -// try!(file.read_to_string(&mut content)); - -// Ok(content) -// } diff --git a/src/router/router.rs b/src/router/route.rs similarity index 75% rename from src/router/router.rs rename to src/router/route.rs index c632ccf..74e7586 100644 --- a/src/router/router.rs +++ b/src/router/route.rs @@ -1,33 +1,32 @@ -use std::thread; -use std::time::Duration; -use time; -use std::io::prelude::*; -use std::fs::File; -use std::error::Error; -use std::path::{Path, PathBuf}; use std::collections::VecDeque; +use std::error::Error; use std::fs; +use std::fs::File; +use std::io::prelude::*; use std::io::BufWriter; +use std::path::{Path, PathBuf}; +use std::thread; +use std::time::Duration; +use time; -use std::sync::{Arc, Mutex}; +use config; use futures::future::Shared; use futures::sync::oneshot; -use config; use lib; +use router::RouterConfig; +use std::sync::{Arc, Mutex}; -pub fn router_thread( - sinks: Vec, - sink_dir: String, - todo: Arc>>, - labels: String, - sigint: Shared>, - id: u64 +pub fn route_thread( + config: &RouterConfig, + todo: &Arc>>, + sigint: &Shared>, + id: u64, ) { loop { match todo.lock().unwrap().pop_front() { Some(path) => { - if let Err(err) = route(path, &labels, &sinks, &sink_dir, id) { - warn!("{}", err); + if let Err(err) = route(&path, &config, id) { + warn!("{}", err); } } None => { @@ -41,7 +40,7 @@ pub fn router_thread( } } -fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &String, id: u64) -> Result<(), Box> { +fn route(path: &PathBuf, config: &RouterConfig, id: u64) -> Result<(), Box> { let start = time::now_utc().to_timespec(); let run_id = format!("{}#{}", start.sec, start.nsec); @@ -52,11 +51,11 @@ fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &S let mut metrics: Vec = Vec::new(); for line in content.lines() { - metrics.push(lib::add_labels(&line, labels)?); + metrics.push(lib::add_labels(&line, &config.labels)?); } // Nothing to do - if metrics.len() == 0 { + if metrics.is_empty() { // Delete source file debug!("delete {}", format!("{:?}", path)); fs::remove_file(&path)?; @@ -65,31 +64,32 @@ fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &S } // Setup sinks files - let dir = Path::new(&sink_dir); // FIXME + let dir = Path::new(&config.sink_dir); // Open tmp files - for sink in sinks { + for sink in &config.sinks { let sink_file_name = dir.join(format!("{}-{}.tmp", sink.name, id)); debug!("open tmp sink file {}", format!("{:?}", sink_file_name)); let mut sink_file = BufWriter::new(File::create(&sink_file_name)?); // Write metrics - for line in metrics.iter() { + for line in &metrics { if line.is_empty() { continue; } if sink.selector.is_some() { let selector = sink.selector.as_ref().unwrap(); - if !line.split_whitespace() + if !line + .split_whitespace() .nth(1) .map_or(false, |class| selector.is_match(class)) { continue; } } - sink_file.write(line.as_bytes())?; - sink_file.write(b"\n")?; + sink_file.write_all(line.as_bytes())?; + sink_file.write_all(b"\n")?; } // Write buffered datas diff --git a/src/scraper.rs b/src/scraper.rs index a50833a..2971428 100644 --- a/src/scraper.rs +++ b/src/scraper.rs @@ -1,23 +1,23 @@ //! # Scraper module. //! //! The Scraper module fetch metrics from an HTTP endpoint. -use std::thread; -use std::time::Duration; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use time; -use std::io; -use std::cmp; -use hyper; -use hyper_tls::HttpsConnector; -use hyper_timeout::TimeoutConnector; use futures::future::Future; use futures::Stream; -use std::io::prelude::*; +use hyper; +use hyper_timeout::TimeoutConnector; +use hyper_tls::HttpsConnector; +use std::cmp; +use std::error::Error; use std::fs; use std::fs::File; -use std::error::Error; +use std::io; +use std::io::prelude::*; use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use time; use config; @@ -30,7 +30,7 @@ const REST_TIME: u64 = 10; pub fn scraper( scraper: &config::Scraper, parameters: &config::Parameters, - sigint: Arc, + sigint: &Arc, ) { let labels: String = scraper.labels.iter().fold(String::new(), |acc, (k, v)| { let sep = if acc.is_empty() { "" } else { "," }; @@ -68,9 +68,9 @@ fn fetch( debug!("fetch {}", &scraper.url); // Fetch metrics - let mut core = try!(::tokio_core::reactor::Core::new()); + let mut core = ::tokio_core::reactor::Core::new()?; let handle = core.handle(); - let connector = try!(HttpsConnector::new(4, &handle)); + let connector = HttpsConnector::new(4, &handle)?; let mut tm = TimeoutConnector::new(connector, &handle); tm.set_connect_timeout(Some(Duration::from_secs(parameters.timeout))); @@ -80,7 +80,7 @@ fn fetch( let client = hyper::Client::configure().connector(tm).build(&handle); let mut req = hyper::Request::new(hyper::Method::Get, scraper.url.clone()); - for (key, value) in scraper.headers.iter() { + for (key, value) in &scraper.headers { req.headers_mut() .set_raw(key.clone(), vec![value.clone().into_bytes()]); } @@ -107,7 +107,7 @@ fn fetch( // Get now as millis let start = time::now_utc(); - let now = start.to_timespec().sec * 1000 * 1000 + (start.to_timespec().nsec as i64 / 1000); + let now = start.to_timespec().sec * 1000 * 1000 + (i64::from(start.to_timespec().nsec) / 1000); let dir = Path::new(¶meters.source_dir); let temp_file = dir.join(format!("{}.tmp", scraper.name)); @@ -116,7 +116,7 @@ fn fetch( debug!("write to tmp file {}", format!("{:?}", temp_file)); { // Open tmp file - let mut file = try!(File::create(&temp_file)); + let mut file = File::create(&temp_file)?; let ts = lib::transcompiler::Transcompiler::new(&scraper.format); for line in body.lines() { @@ -140,18 +140,17 @@ fn fetch( } }; - if scraper.metrics.is_some() { - if !scraper.metrics.as_ref().unwrap().is_match(&line) { - continue; - } + if scraper.metrics.is_some() && !scraper.metrics.as_ref().unwrap().is_match(&line) { + continue; } batch_size += line.len(); - if batch_size > parameters.batch_size as usize && !line.starts_with("=") { + if batch_size > parameters.batch_size as usize && !line.starts_with('=') { // Rotate scraped file file.flush()?; - let dest_file = dir.join(format!("{}-{}-{}.metrics", scraper.name, now, batch_count)); + let dest_file = + dir.join(format!("{}-{}-{}.metrics", scraper.name, now, batch_count)); debug!("rotate tmp file to {}", format!("{:?}", dest_file)); fs::rename(&temp_file, &dest_file)?; @@ -160,8 +159,8 @@ fn fetch( batch_count += 1; } - file.write(line.as_bytes())?; - file.write(b"\n")?; + file.write_all(line.as_bytes())?; + file.write_all(b"\n")?; } file.flush()?; @@ -170,7 +169,7 @@ fn fetch( // Rotate scraped file let dest_file = dir.join(format!("{}-{}-{}.metrics", scraper.name, now, batch_count)); debug!("rotate tmp file to {}", format!("{:?}", dest_file)); - try!(fs::rename(&temp_file, &dest_file)); + fs::rename(&temp_file, &dest_file)?; Ok(()) } diff --git a/src/sink/fs.rs b/src/sink/fs.rs index 6e509e6..2be766d 100644 --- a/src/sink/fs.rs +++ b/src/sink/fs.rs @@ -20,15 +20,12 @@ use futures::{lazy, Async, Stream}; use tokio_core::reactor::Core; use config; +use sink::SinkConfig; pub fn fs_thread( - name: &str, - dir: &str, - period: u64, - todo: Arc>>, - max_size: u64, - ttl: u64, - sigint: Shared>, + todo: &Arc>>, + config: &SinkConfig, + sigint: &Shared>, mut notify_rx: Receiver, ) { let mut files: HashSet = HashSet::new(); @@ -38,10 +35,13 @@ pub fn fs_thread( loop { let start = time::now_utc(); let work = lazy(|| -> FutureResult<(), ()> { - match list(name, dir, &files, ttl) { + match list(&config.name, &config.dir, config.ttl) { Err(err) => error!("list fail: {}", err), - Ok((new, deleted, size)) => { - if new.len() > 0 { + Ok((entries, size)) => { + let new: Vec = entries.difference(&files).cloned().collect(); + let deleted: Vec = files.difference(&entries).cloned().collect(); + + if !new.is_empty() { debug!("found {} files", new.len()); } @@ -71,7 +71,7 @@ pub fn fs_thread( files.remove(&f); } - match cappe(size, max_size, &todo) { + match cappe(size, config.max_size, &todo) { Err(err) => error!("cappe fail: {}", err), Ok(()) => {} } @@ -83,10 +83,10 @@ pub fn fs_thread( core.run(work).expect("always ok"); let elapsed = (time::now_utc() - start).num_milliseconds() as u64; - let sleep_time = if elapsed > period { + let sleep_time = if elapsed > config.watch_period { config::REST_TIME } else { - cmp::max(period - elapsed, config::REST_TIME) + cmp::max(config.watch_period - elapsed, config::REST_TIME) }; for _ in 0..sleep_time / config::REST_TIME { thread::sleep(Duration::from_millis(config::REST_TIME)); @@ -97,12 +97,7 @@ pub fn fs_thread( } } -fn list( - sink_name: &str, - dir: &str, - files: &HashSet, - ttl: u64, -) -> Result<(Vec, Vec, u64), Box> { +fn list(sink_name: &str, dir: &str, ttl: u64) -> Result<(HashSet, u64), Box> { let mut sink_name = String::from(sink_name); sink_name.push('-'); @@ -127,10 +122,10 @@ fn list( let meta = entry.metadata(); if meta.is_ok() { let meta = meta.unwrap(); - files_size = files_size + meta.len(); + files_size += meta.len(); - let modified = meta.modified().unwrap_or(SystemTime::now()); - let age = modified.elapsed().unwrap_or(Duration::new(0, 0)); + let modified = meta.modified().unwrap_or_else(|_| SystemTime::now()); + let age = modified.elapsed().unwrap_or_else(|_| Duration::new(0, 0)); if age.as_secs() > ttl { warn!("skip file {:?}", entry.path()); @@ -146,10 +141,7 @@ fn list( }) .collect(); - let deleted = files.difference(&entries).cloned().collect(); - let new = entries.difference(&files).cloned().collect(); - - Ok((new, deleted, files_size)) + Ok((entries, files_size)) } fn cappe(actual: u64, target: u64, todo: &Arc>>) -> Result<(), Box> { @@ -167,7 +159,7 @@ fn cappe(actual: u64, target: u64, todo: &Arc>>) -> Resu let path = path.unwrap(); let meta = fs::metadata(&path)?; - size = size - meta.len(); + size -= meta.len(); warn!("skip file {:?}", path); fs::remove_file(path)?; @@ -182,7 +174,7 @@ fn cappe(actual: u64, target: u64, todo: &Arc>>) -> Resu fn remove_empty(path: &PathBuf) -> Result> { let metadata = fs::metadata(path.clone())?; - if metadata.len() <= 0 { + if metadata.len() == 0 { fs::remove_file(path.clone())?; return Ok(true); diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 355f3bf..fabc07b 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -28,80 +28,84 @@ mod send; const REACTOR_CLIENT: usize = 30; -pub struct Sink<'a> { - name: &'a String, - todo: Arc>>, - handles: Vec>, - dir: &'a String, +#[derive(Debug, Clone)] +pub struct SinkConfig { + name: String, + dir: String, + max_size: u64, + ttl: u64, watch_period: u64, + parallel: u64, + url: hyper::Uri, timeout: u64, keep_alive: bool, - token: &'a String, - token_header: &'a String, - url: &'a hyper::Uri, + token: String, + token_header: String, batch_count: u64, batch_size: u64, - max_size: u64, - ttl: u64, - parallel: u64, +} + +pub struct Sink { + config: SinkConfig, + todo: Arc>>, + handles: Vec>, sigint: (oneshot::Sender<()>, Shared>), } -impl<'a> Sink<'a> { - pub fn new(sink: &'a config::Sink, parameters: &'a config::Parameters) -> Sink<'a> { +impl Sink { + pub fn new(sink: &config::Sink, parameters: &config::Parameters) -> Sink { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Sink { - name: &sink.name, - todo: Arc::new(Mutex::new(VecDeque::new())), - handles: Vec::new(), - sigint: (shutdown_tx, shutdown_rx.shared()), - dir: ¶meters.sink_dir, + let config = SinkConfig { + name: sink.name.clone(), + dir: parameters.sink_dir.clone(), + max_size: sink.size, + ttl: sink.ttl, watch_period: parameters.scan_period, + parallel: sink.parallel, + url: sink.url.clone(), timeout: parameters.timeout, keep_alive: sink.keep_alive, - token: &sink.token, - token_header: &sink.token_header, - url: &sink.url, + token: sink.token.clone(), + token_header: sink.token_header.clone(), batch_count: parameters.batch_count, batch_size: parameters.batch_size, - max_size: sink.size, - ttl: sink.ttl, - parallel: sink.parallel, + }; + + Sink { + config, + todo: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + sigint: (shutdown_tx, shutdown_rx.shared()), } } pub fn start(&mut self) { - debug!("start sink: {}", self.name); - let (notify_tx, notify_rx) = channel(self.parallel as usize); + debug!("start sink: {}", self.config.name); + let (notify_tx, notify_rx) = channel(self.config.parallel as usize); // spawn fs thread - let (name, dir, todo) = (self.name.clone(), self.dir.clone(), self.todo.clone()); - let (period, sigint) = (self.watch_period, self.sigint.1.clone()); - let (max_size, ttl) = (self.max_size, self.ttl); + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { slog_scope::scope( - &slog_scope::logger().new(o!("sink" => name.clone())), - || fs::fs_thread(&name, &dir, period, todo, max_size, ttl, sigint, notify_rx), + &slog_scope::logger().new(o!("sink" => config.name.clone())), + || fs::fs_thread(&todo, &config, &sigint, notify_rx), ); })); // spawn sender threads - let reactor_count = (self.parallel as f64 / REACTOR_CLIENT as f64).ceil() as u64; - let client_count = (self.parallel as f64 / reactor_count as f64).ceil() as u64; + let reactor_count = (self.config.parallel as f64 / REACTOR_CLIENT as f64).ceil() as u64; + let client_count = (self.config.parallel as f64 / reactor_count as f64).ceil() as u64; for _ in 0..reactor_count { - let (name, sigint) = (self.name.clone(), self.sigint.1.clone()); - let todo = self.todo.clone(); - let url = self.url.clone(); - let (timeout, keep_alive) = (self.timeout, self.keep_alive); - let (token, token_header) = (self.token.clone(), self.token_header.clone()); - let (batch_count, batch_size) = (self.batch_count, self.batch_size); + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); let notify_tx: Sender = notify_tx.clone(); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { slog_scope::scope( - &slog_scope::logger().new(o!("sink" => name.clone())), + &slog_scope::logger().new(o!("sink" => config.name.clone())), || { let mut core = Core::new().expect("Fail to start tokio reactor"); let handle = core.handle(); @@ -110,12 +114,12 @@ impl<'a> Sink<'a> { // Handle connection timeouts let mut tm = TimeoutConnector::new(connector, &handle); - tm.set_connect_timeout(Some(Duration::from_secs(timeout))); + tm.set_connect_timeout(Some(Duration::from_secs(config.timeout))); let client = Arc::new( hyper::Client::configure() .body::() - .keep_alive(keep_alive) + .keep_alive(config.keep_alive) .connector(tm) .build(&handle), ); @@ -124,12 +128,8 @@ impl<'a> Sink<'a> { for _ in 0..client_count { let work = send::send_thread( client.clone(), - token.clone(), - token_header.clone(), - url.clone(), + config.clone(), todo.clone(), - batch_count, - batch_size, notify_tx.clone(), ); diff --git a/src/sink/send.rs b/src/sink/send.rs index 61de7e0..6f532b3 100644 --- a/src/sink/send.rs +++ b/src/sink/send.rs @@ -18,6 +18,7 @@ use futures::sync::mpsc::Sender; use futures::task::{current, Task}; use futures::{Async, Future, Poll, Stream}; +use sink::SinkConfig; // use flate2::Compression; // use flate2::write::ZlibEncoder; @@ -27,29 +28,26 @@ pub fn send_thread( client: Arc< hyper::Client>, PayloadBody>, >, - token: String, - token_header: String, - url: hyper::Uri, + config: SinkConfig, todo: Arc>>, - batch_count: u64, - batch_size: u64, notify_tx: Sender, ) -> Box> { - let work = PayloadStream::new(todo, batch_count, batch_size, notify_tx) + let work = PayloadStream::new(todo, config.batch_count, config.batch_size, notify_tx) .for_each(move |mut p| { let mut req: hyper::Request = - hyper::Request::new(hyper::Post, url.clone()); + hyper::Request::new(hyper::Post, config.url.clone()); req.set_body(p.body().expect("Body is never None")); req.headers_mut() - .set_raw(token_header.clone(), token.clone()); + .set_raw(config.token_header.clone(), config.token.clone()); - let req = client + client .clone() .request(req) .and_then(|res| { let status = res.status(); // TODO not read body if not debug - let body = res.body() + let body = res + .body() .fold(String::new(), |mut acc, chunk| { ok::({ acc.push_str(&String::from_utf8_lossy(&chunk)); @@ -91,13 +89,11 @@ pub fn send_thread( } }; ok(()) - }); - - req + }) }) .then(|_| ok(())); - return Box::new(work); + Box::new(work) } struct PayloadStream { @@ -115,10 +111,10 @@ impl PayloadStream { notify_tx: Sender, ) -> PayloadStream { PayloadStream { - batch_count: batch_count, - batch_size: batch_size, - todo: todo, - notify_tx: notify_tx, + batch_count, + batch_size, + todo, + notify_tx, } } } @@ -159,7 +155,7 @@ impl Payload { let mut body = PayloadBody::new(todo.clone(), batch_count, batch_size); body.load(Some(file)); Payload { - todo: todo, + todo, processed: body.processed(), sent_lines: body.sent_lines(), body: Some(body), @@ -211,7 +207,7 @@ impl PayloadBody { PayloadBody { remaining_count: batch_count, remaining_size: batch_size as i64, - todo: todo, + todo, processed: Arc::new(Mutex::new(Vec::new())), reader: None, sent_lines: Arc::new(AtomicUsize::new(0)), @@ -242,7 +238,7 @@ impl PayloadBody { debug!("load file {}", path.display()); self.processed.lock().unwrap().push(path.clone()); - self.remaining_count = self.remaining_count - 1; + self.remaining_count -= 1; match File::open(path.clone()) { Err(err) => { @@ -271,7 +267,8 @@ impl Stream for PayloadBody { // send file while idx < CHUNK_SIZE { - let size = self.reader + let size = self + .reader .as_mut() .expect("reader is never None") .read_line(&mut s)?; @@ -282,14 +279,14 @@ impl Stream for PayloadBody { break; // No more work to do } } else { - idx = idx + size; - sent_lines = sent_lines + 1; + idx += size; + sent_lines += 1; } } self.sent_lines.fetch_add(sent_lines, Ordering::Relaxed); - self.remaining_size = self.remaining_size - idx as i64; + self.remaining_size -= idx as i64; Ok(Async::Ready(Some(hyper::Chunk::from(s)))) } } From f3e149b1ca90f9d1c669bbe432df66ede8773821 Mon Sep 17 00:00:00 2001 From: d33d33 Date: Tue, 10 Jul 2018 14:55:53 +0200 Subject: [PATCH 3/3] fix(router): src file is removed multiple time --- src/router/route.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/router/route.rs b/src/router/route.rs index 74e7586..8938347 100644 --- a/src/router/route.rs +++ b/src/router/route.rs @@ -25,8 +25,9 @@ pub fn route_thread( loop { match todo.lock().unwrap().pop_front() { Some(path) => { + debug!("Processing: {:?}", path); if let Err(err) = route(&path, &config, id) { - warn!("{}", err); + warn!("Fail to process:{:?} - {}", path, err); } } None => { @@ -99,11 +100,11 @@ fn route(path: &PathBuf, config: &RouterConfig, id: u64) -> Result<(), Box