diff --git a/build.rs b/build.rs index f4e52b6..442956a 100644 --- a/build.rs +++ b/build.rs @@ -1,7 +1,7 @@ -use std::process::Command; -use std::path::Path; use std::fs::File; use std::io::prelude::*; +use std::path::Path; +use std::process::Command; fn main() { let output = Command::new("git") diff --git a/src/config.rs b/src/config.rs index be39404..20a7ffd 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,20 +3,20 @@ //! The Config module provides the beamium configuration. //! It set defaults and then load config from '/etc', local dir and provided path. +use cast; +use hyper; +use regex; +use slog; +use std::collections::HashMap; +use std::error; +use std::error::Error; +use std::fmt; use std::fs::File; -use std::io::Read; use std::io; -use std::fmt; -use std::string::String; +use std::io::Read; use std::path::Path; -use std::error; -use std::error::Error; +use std::string::String; use yaml_rust::{ScanError, YamlLoader}; -use cast; -use std::collections::HashMap; -use regex; -use slog; -use hyper; pub const REST_TIME: u64 = 10; @@ -171,7 +171,7 @@ pub fn load_config(config_path: &str) -> Result { scan_period: 1000, sink_dir: String::from("sinks"), source_dir: String::from("sources"), - batch_size: 200000, + batch_size: 200_000, batch_count: 250, log_file: String::from(env!("CARGO_PKG_NAME")) + ".log", log_level: slog::Level::Info, @@ -198,6 +198,7 @@ pub fn load_config(config_path: &str) -> Result { } /// Extend config from file. +#[allow(unknown_lints, cyclomatic_complexity)] fn load_path>(file_path: P, config: &mut Config) -> Result<(), ConfigError> { let mut file = try!(File::open(file_path)); let mut contents = String::new(); @@ -205,7 +206,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let docs = try!(YamlLoader::load_from_str(&contents)); let config_scraper_keys = ["sources", "scrapers"]; for doc in &docs { - for config_scraper_key in config_scraper_keys.iter() { + for config_scraper_key in &config_scraper_keys { let key = *config_scraper_key; if !doc[key].is_badvalue() { if "sources" == key { @@ -215,19 +216,28 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co ) } - let scrapers = try!(doc[key].as_hash().ok_or(format!("{} should be a map", key))); + let scrapers = try!( + doc[key] + .as_hash() + .ok_or_else(|| format!("{} should be a map", key)) + ); for (k, v) in scrapers { - let name = try!(k.as_str().ok_or(format!("{} keys should be a string", key))); - let url = try!(v["url"].as_str().ok_or(format!( - "{}.{}.url is required and should be a string", - key, name - ))); + let name = try!( + k.as_str() + .ok_or_else(|| format!("{} keys should be a string", key)) + ); + let url = try!( + v["url"].as_str().ok_or_else(|| { + format!("{}.{}.url is required and should be a string", key, name) + }) + ); let url = try!(url.parse::()); - let period = try!(v["period"].as_i64().ok_or(format!( - "{}.{}.period is required and should be a number", - key, name - ))); + let period = try!( + v["period"].as_i64().ok_or_else(|| { + format!("{}.{}.period is required and should be a number", key, name) + }) + ); let period = try!( cast::u64(period) .map_err(|_| format!("scrapers.{}.period is invalid", name)) @@ -236,9 +246,9 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co ScraperFormat::Prometheus } else { let f = try!( - v["format"] - .as_str() - .ok_or(format!("scrapers.{}.format should be a string", name)) + v["format"].as_str().ok_or_else(|| { + format!("scrapers.{}.format should be a string", name) + }) ); if f == "prometheus" { @@ -258,14 +268,14 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co } else { let mut metrics = Vec::new(); let values = try!( - v["metrics"] - .as_vec() - .ok_or(format!("scrapers.{}.metrics should be an array", name)) + v["metrics"].as_vec().ok_or_else(|| { + format!("scrapers.{}.metrics should be an array", name) + }) ); for v in values { let value = try!(regex::Regex::new(try!( v.as_str() - .ok_or(format!("scrapers.{}.metrics is invalid", name)) + .ok_or_else(|| format!("scrapers.{}.metrics is invalid", name)) ))); metrics.push(String::from(r"^(\S*)\s") + value.as_str()); } @@ -278,17 +288,18 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co HashMap::new() } else { let heads = try!( - v["headers"] - .as_hash() - .ok_or(format!("scrapers.{}.headers should be a map", name)) + v["headers"].as_hash().ok_or_else(|| { + format!("scrapers.{}.headers should be a map", name) + }) ); let mut ret = HashMap::new(); for (k, v) in heads { - let hname = try!(k.as_str().ok_or(format!( - "scrapers.{}.headers keys should be a string", - name - ))); - let value = try!(v.as_str().ok_or(format!( + let hname = try!( + k.as_str().ok_or_else(|| { + format!("scrapers.{}.headers keys should be a string", name) + }) + ); + let value = try!(v.as_str().ok_or_else(|| format!( "scrapers.{}.headers.{} value should be a string", hname, name ))); @@ -300,13 +311,18 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let mut labels = HashMap::new(); if !v["labels"].is_badvalue() { - let slabels = try!(v["labels"].as_hash().ok_or("labels should be a map")); + let slabels = try!( + v["labels"] + .as_hash() + .ok_or_else(|| "labels should be a map") + ); for (k, v) in slabels { - let lname = try!(k.as_str().ok_or(format!( - "scrapers.{}.labels keys should be a string", - name - ))); - let value = try!(v.as_str().ok_or(format!( + let lname = try!( + k.as_str().ok_or_else(|| { + format!("scrapers.{}.labels keys should be a string", name) + }) + ); + let value = try!(v.as_str().ok_or_else(|| format!( "scrapers.{}.labels.{} value should be a string", name, lname ))); @@ -316,37 +332,43 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co config.scrapers.push(Scraper { name: String::from(name), - url: url, - period: period, - format: format, - metrics: metrics, - headers: headers, - labels: labels, + url, + period, + format, + metrics, + headers, + labels, }) } } } if !doc["sinks"].is_badvalue() { - let sinks = try!(doc["sinks"].as_hash().ok_or("sinks should be a map")); + let sinks = try!( + doc["sinks"] + .as_hash() + .ok_or_else(|| "sinks should be a map") + ); for (k, v) in sinks { - let name = try!(k.as_str().ok_or("sinks keys should be a string")); - let url = try!(v["url"].as_str().ok_or(format!( - "sinks.{}.url is required and should be a string", - name - ))); + let name = try!(k.as_str().ok_or_else(|| "sinks keys should be a string")); + let url = try!( + v["url"].as_str().ok_or_else(|| { + format!("sinks.{}.url is required and should be a string", name) + }) + ); let url = try!(url.parse::()); - let token = try!(v["token"].as_str().ok_or(format!( - "sinks.{}.token is required and should be a string", - name - ))); + let token = try!( + v["token"].as_str().ok_or_else(|| { + format!("sinks.{}.token is required and should be a string", name) + }) + ); let token_header = if v["token-header"].is_badvalue() { "X-Warp10-Token" } else { try!( - v["token-header"] - .as_str() - .ok_or(format!("sinks.{}.token-header should be a string", name)) + v["token-header"].as_str().ok_or_else(|| { + format!("sinks.{}.token-header should be a string", name) + }) ) }; @@ -356,7 +378,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co Some(try!(regex::Regex::new( format!( "^{}", - try!(v["selector"].as_str().ok_or(format!( + try!(v["selector"].as_str().ok_or_else(|| format!( "sinks.{}.selector \ is invalid", name @@ -371,7 +393,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let ttl = try!( v["ttl"] .as_i64() - .ok_or(format!("sinks.{}.ttl should be a number", name)) + .ok_or_else(|| format!("sinks.{}.ttl should be a number", name)) ); try!( cast::u64(ttl) @@ -379,20 +401,20 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co ) }; - let size = - if v["size"].is_badvalue() { - 1073741824 - } else { - let size = try!( - v["size"] - .as_i64() - .ok_or(format!("sinks.{}.size should be a number", name)) - ); - try!(cast::u64(size).map_err(|_| format!( - "sinks.{}.size should be a positive number", - name - ))) - }; + let size = if v["size"].is_badvalue() { + 1_073_741_824 + } else { + let size = try!( + v["size"] + .as_i64() + .ok_or_else(|| format!("sinks.{}.size should be a number", name)) + ); + try!( + cast::u64(size).map_err(|_| { + format!("sinks.{}.size should be a positive number", name) + }) + ) + }; let parallel = if v["parallel"].is_badvalue() { 1 @@ -400,45 +422,50 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let parallel = try!( v["parallel"] .as_i64() - .ok_or(format!("sinks.{}.parallel should be a number", name)) + .ok_or_else(|| format!("sinks.{}.parallel should be a number", name)) ); - try!(cast::u64(parallel).map_err(|_| format!( - "sinks.{}.parallel should be a positive number", - name - ))) + try!( + cast::u64(parallel).map_err(|_| { + format!("sinks.{}.parallel should be a positive number", name) + }) + ) }; let keep_alive = if v["keep-alive"].is_badvalue() { true } else { try!( - v["keep-alive"] - .as_bool() - .ok_or(format!("sinks.{}.keep-alive should be a boolean", name)) + v["keep-alive"].as_bool().ok_or_else(|| { + format!("sinks.{}.keep-alive should be a boolean", name) + }) ) }; config.sinks.push(Sink { name: String::from(name), - url: url, + url, token: String::from(token), token_header: String::from(token_header), - selector: selector, - ttl: ttl, - size: size, - parallel: parallel, - keep_alive: keep_alive, + selector, + ttl, + size, + parallel, + keep_alive, }) } } if !doc["labels"].is_badvalue() { - let labels = try!(doc["labels"].as_hash().ok_or("labels should be a map")); + let labels = try!( + doc["labels"] + .as_hash() + .ok_or_else(|| "labels should be a map") + ); for (k, v) in labels { - let name = try!(k.as_str().ok_or("labels keys should be a string")); + let name = try!(k.as_str().ok_or_else(|| "labels keys should be a string")); let value = try!( v.as_str() - .ok_or(format!("labels.{} value should be a string", name)) + .ok_or_else(|| format!("labels.{} value should be a string", name)) ); config .labels @@ -451,7 +478,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let source_dir = try!( doc["parameters"]["source-dir"] .as_str() - .ok_or(format!("parameters.source-dir should be a string")) + .ok_or_else(|| "parameters.source-dir should be a string".to_string()) ); config.parameters.source_dir = String::from(source_dir); } @@ -460,7 +487,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let sink_dir = try!( doc["parameters"]["sink-dir"] .as_str() - .ok_or(format!("parameters.sink-dir should be a string")) + .ok_or_else(|| "parameters.sink-dir should be a string".to_string()) ); config.parameters.sink_dir = String::from(sink_dir); } @@ -469,11 +496,11 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let scan_period = try!( doc["parameters"]["scan-period"] .as_i64() - .ok_or(format!("parameters.scan-period should be a number")) + .ok_or_else(|| "parameters.scan-period should be a number".to_string()) ); let scan_period = try!( cast::u64(scan_period) - .map_err(|_| format!("parameters.scan-period is invalid")) + .map_err(|_| "parameters.scan-period is invalid".to_string()) ); config.parameters.scan_period = scan_period; } @@ -482,10 +509,11 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let batch_size = try!( doc["parameters"]["batch-size"] .as_i64() - .ok_or(format!("parameters.batch-size should be a number")) + .ok_or_else(|| "parameters.batch-size should be a number".to_string()) ); let batch_size = try!( - cast::u64(batch_size).map_err(|_| format!("parameters.batch-size is invalid")) + cast::u64(batch_size) + .map_err(|_| "parameters.batch-size is invalid".to_string()) ); config.parameters.batch_size = batch_size; } @@ -494,11 +522,11 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let batch_count = try!( doc["parameters"]["batch-count"] .as_i64() - .ok_or(format!("parameters.batch-count should be a number")) + .ok_or_else(|| "parameters.batch-count should be a number".to_string()) ); let batch_count = try!( cast::u64(batch_count) - .map_err(|_| format!("parameters.batch-count is invalid")) + .map_err(|_| "parameters.batch-count is invalid".to_string()) ); config.parameters.batch_count = batch_count; } @@ -507,7 +535,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let log_file = try!( doc["parameters"]["log-file"] .as_str() - .ok_or(format!("parameters.log-file should be a string")) + .ok_or_else(|| "parameters.log-file should be a string".to_string()) ); config.parameters.log_file = String::from(log_file); } @@ -516,14 +544,14 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let log_level = try!( doc["parameters"]["log-level"] .as_i64() - .ok_or(format!("parameters.log-level should be a number")) + .ok_or_else(|| "parameters.log-level should be a number".to_string()) ); let log_level = try!( - cast::u64(log_level).map_err(|_| format!("parameters.log-level is invalid")) + cast::u64(log_level).map_err(|_| "parameters.log-level is invalid".to_string()) ); let log_level = try!( slog::Level::from_usize(log_level as usize) - .ok_or(format!("parameters.log-level is invalid")) + .ok_or_else(|| "parameters.log-level is invalid".to_string()) ); config.parameters.log_level = log_level; } @@ -532,7 +560,7 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let syslog = try!( doc["parameters"]["syslog"] .as_bool() - .ok_or("parameters.bool should be a boolean") + .ok_or_else(|| "parameters.bool should be a boolean") ); config.parameters.syslog = syslog; } @@ -540,20 +568,23 @@ fn load_path>(file_path: P, config: &mut Config) -> Result<(), Co let timeout = try!( doc["parameters"]["timeout"] .as_i64() - .ok_or("parameters.timeout should be a number") + .ok_or_else(|| "parameters.timeout should be a number") + ); + let timeout = try!( + cast::u64(timeout).map_err(|_| "parameters.timeout is invalid".to_string()) ); - let timeout = - try!(cast::u64(timeout).map_err(|_| format!("parameters.timeout is invalid"))); config.parameters.timeout = timeout; } if !doc["parameters"]["router-parallel"].is_badvalue() { let router_parallel = try!( doc["parameters"]["router-parallel"] .as_i64() - .ok_or("parameters.router-parallel should be a number") + .ok_or_else(|| "parameters.router-parallel should be a number") + ); + let router_parallel = try!( + cast::u64(router_parallel) + .map_err(|_| "parameters.router-parallel is invalid".to_string()) ); - let router_parallel = - try!(cast::u64(router_parallel).map_err(|_| format!("parameters.router-parallel is invalid"))); config.parameters.router_parallel = router_parallel; } } diff --git a/src/lib/mod.rs b/src/lib/mod.rs index ecfc5ea..11691e9 100644 --- a/src/lib/mod.rs +++ b/src/lib/mod.rs @@ -7,7 +7,7 @@ pub fn add_labels(line: &str, labels: &str) -> Result> { return Ok(String::from(line)); } - let mut parts = line.splitn(2, "{"); + let mut parts = line.splitn(2, '{'); let class = parts.next().ok_or("no_class")?; let class = String::from(class); @@ -15,7 +15,7 @@ pub fn add_labels(line: &str, labels: &str) -> Result> { let plabels = parts.next().ok_or("no_labels")?; let plabels = String::from(plabels); - let sep = if plabels.trim().starts_with("}") { + let sep = if plabels.trim().starts_with('}') { "" } else { "," diff --git a/src/lib/transcompiler.rs b/src/lib/transcompiler.rs index ce8d155..77e2f8d 100644 --- a/src/lib/transcompiler.rs +++ b/src/lib/transcompiler.rs @@ -11,17 +11,15 @@ pub struct Transcompiler<'a> { impl<'a> Transcompiler<'a> { pub fn new(format: &config::ScraperFormat) -> Transcompiler { let start = time::now_utc(); - let now = start.to_timespec().sec * 1000 * 1000 + (start.to_timespec().nsec as i64 / 1000); - Transcompiler { - format: format, - now: now, - } + let now = start.to_timespec().sec * 1000 * 1000 + + (i64::from(start.to_timespec().nsec) as i64 / 1000); + Transcompiler { format, now } } pub fn format(&self, line: &str) -> Result> { - match self.format { - &config::ScraperFormat::Sensision => format_warp10(line), - &config::ScraperFormat::Prometheus => format_prometheus(line, self.now), + match *self.format { + config::ScraperFormat::Sensision => format_warp10(line), + config::ScraperFormat::Prometheus => format_prometheus(line, self.now), } } } @@ -36,12 +34,12 @@ fn format_prometheus(line: &str, now: i64) -> Result> { let line = line.trim(); // Skip comments - if line.starts_with("#") { + if line.starts_with('#') { return Ok(String::new()); } // Extract Prometheus metric - let index = if line.contains("{") { + let index = if line.contains('{') { line.rfind('}').ok_or("bad class")? } else { line.find(' ').ok_or("bad class")? @@ -62,7 +60,7 @@ fn format_prometheus(line: &str, now: i64) -> Result> { .unwrap_or(now); // Format class - let mut parts = class.splitn(2, "{"); + let mut parts = class.splitn(2, '{'); let class = String::from(parts.next().ok_or("no_class")?); let class = class.trim(); let plabels = parts.next(); @@ -78,7 +76,7 @@ fn format_prometheus(line: &str, now: i64) -> Result> { .map(|v| v.replace(r"\n", "%0A")) // unescape .fold(String::new(), |acc, x| { // skip invalid values - if !x.contains("=") { + if !x.contains('=') { return acc } acc + &x + "," diff --git a/src/log.rs b/src/log.rs index c839ed8..95eecd7 100644 --- a/src/log.rs +++ b/src/log.rs @@ -1,22 +1,22 @@ //! # Log module. //! //! The Config module provides the log facility. -use slog::Logger; -use slog::Level; +use slog::Drain; use slog::Duplicate; +use slog::Level; use slog::LevelFilter; -use slog::Drain; +use slog::Logger; use slog_async; -use slog_term; +use slog_scope; use slog_syslog; use slog_syslog::Facility; -use slog_scope; -use std::fs::OpenOptions; -use std::os::unix::fs::OpenOptionsExt; +use slog_term; use std; -use std::path::Path; use std::error::Error; use std::fs; +use std::fs::OpenOptions; +use std::os::unix::fs::OpenOptionsExt; +use std::path::Path; use config; @@ -33,10 +33,9 @@ pub fn bootstrap() { /// Send log to console and log file, also handle log level. pub fn log(parameters: &config::Parameters, verbose: u64) -> Result<(), Box> { // Ensure log directory is present - match Path::new(¶meters.log_file).parent() { - Some(log_path) => fs::create_dir_all(log_path)?, - None => {} - }; + if let Some(log_path) = Path::new(¶meters.log_file).parent() { + fs::create_dir_all(log_path)? + } // Stdout drain let term_decorator = slog_term::TermDecorator::new().build(); diff --git a/src/main.rs b/src/main.rs index fd2525d..5e4870c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,18 +27,18 @@ extern crate tokio_timer; extern crate yaml_rust; use clap::App; -use std::thread; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::fs; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; use std::time::Duration; mod config; -mod scraper; -mod router; -mod sink; mod lib; mod log; +mod router; +mod scraper; +mod sink; include!("version.rs"); @@ -126,7 +126,7 @@ fn main() { handles.push(thread::spawn(move || { slog_scope::scope( &slog_scope::logger().new(o!("scraper" => scraper.name.clone())), - || scraper::scraper(&scraper, ¶meters, sigint), + || scraper::scraper(&scraper, ¶meters, &sigint), ); })); } diff --git a/src/router/fs.rs b/src/router/fs.rs index 871e4d1..369d7ef 100644 --- a/src/router/fs.rs +++ b/src/router/fs.rs @@ -6,30 +6,32 @@ use std::fs; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::thread; -use std::time::{Duration}; +use std::time::Duration; use time; use futures::future::Shared; use futures::sync::oneshot; - use config; +use router::RouterConfig; pub fn fs_thread( - dir: &str, - period: u64, - todo: Arc>>, - sigint: Shared>, + config: &RouterConfig, + todo: &Arc>>, + sigint: &Shared>, ) { let mut files: HashSet = HashSet::new(); loop { let start = time::now_utc(); - match list(dir, &files) { + match list(&config.dir) { Err(err) => error!("list fail: {}", err), - Ok((new, deleted)) => { - if new.len() > 0 { + Ok(entries) => { + let deleted: Vec = files.difference(&entries).cloned().collect(); + let new: Vec = entries.difference(&files).cloned().collect(); + + if !new.is_empty() { debug!("found {} files", new.len()); } @@ -47,10 +49,10 @@ pub fn fs_thread( } let elapsed = (time::now_utc() - start).num_milliseconds() as u64; - let sleep_time = if elapsed > period { + let sleep_time = if elapsed > config.watch_period { config::REST_TIME } else { - cmp::max(period - elapsed, config::REST_TIME) + cmp::max(config.watch_period - elapsed, config::REST_TIME) }; for _ in 0..sleep_time / config::REST_TIME { thread::sleep(Duration::from_millis(config::REST_TIME)); @@ -61,10 +63,7 @@ pub fn fs_thread( } } -fn list( - dir: &str, - files: &HashSet, -) -> Result<(Vec, Vec), Box> { +fn list(dir: &str) -> Result, Box> { let entries: HashSet = fs::read_dir(dir)? .filter_map(|entry| { if entry.is_err() { @@ -79,8 +78,5 @@ fn list( }) .collect(); - let deleted = files.difference(&entries).cloned().collect(); - let new = entries.difference(&files).cloned().collect(); - - Ok((new, deleted)) + Ok(entries) } diff --git a/src/router/mod.rs b/src/router/mod.rs index 36faea5..439d87e 100644 --- a/src/router/mod.rs +++ b/src/router/mod.rs @@ -1,267 +1,99 @@ //! # Router module. //! //! The Router module forward sources to sinks. -use std::thread; use std::collections::HashMap; -use std::path::PathBuf; use std::collections::VecDeque; +use std::path::PathBuf; +use std::thread; -use std::sync::{Arc, Mutex}; use futures::future::Shared; use futures::sync::oneshot; use futures::Future; +use std::sync::{Arc, Mutex}; -use slog_scope; use config; +use slog_scope; mod fs; -mod router; +mod route; -pub struct Router<'a> { - todo: Arc>>, - handles: Vec>, - dir: &'a String, +#[derive(Debug, Clone)] +pub struct RouterConfig { + dir: String, watch_period: u64, parallel: u64, - sinks: &'a Vec, - sink_dir: &'a String, - labels: &'a HashMap, + sinks: Vec, + sink_dir: String, + labels: String, +} + +pub struct Router { + config: RouterConfig, + todo: Arc>>, + handles: Vec>, sigint: (oneshot::Sender<()>, Shared>), } -impl<'a> Router<'a> { - pub fn new(sinks: &'a Vec, parameters: &'a config::Parameters, labels: &'a HashMap) -> Router<'a> { +impl Router { + pub fn new( + sinks: &[config::Sink], + parameters: &config::Parameters, + labels: &HashMap, + ) -> Router { let (shutdown_tx, shutdown_rx) = oneshot::channel(); + // Build labels string + let labels: String = labels.iter().fold(String::new(), |acc, (k, v)| { + let sep = if acc.is_empty() { "" } else { "," }; + acc + sep + k + "=" + v + }); + + let config = RouterConfig { + dir: parameters.source_dir.clone(), + watch_period: parameters.scan_period, + parallel: parameters.router_parallel, + sinks: sinks.to_owned(), + sink_dir: parameters.sink_dir.clone(), + labels, + }; Router { + config, todo: Arc::new(Mutex::new(VecDeque::new())), handles: Vec::new(), sigint: (shutdown_tx, shutdown_rx.shared()), - dir: ¶meters.source_dir, - parallel: parameters.router_parallel, - sinks: sinks, - sink_dir: ¶meters.sink_dir, - labels: labels, - watch_period: parameters.scan_period, } } pub fn start(&mut self) { debug!("start router"); - // Build labels string - let labels: String = self.labels.iter().fold(String::new(), |acc, (k, v)| { - let sep = if acc.is_empty() { "" } else { "," }; - acc + sep + k + "=" + v - }); // spawn fs thread - let (dir, todo) = (self.dir.clone(), self.todo.clone()); - let (period, sigint) = (self.watch_period, self.sigint.1.clone()); + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { - slog_scope::scope( - &slog_scope::logger().new(o!()), - || fs::fs_thread(&dir, period, todo, sigint), - ); + slog_scope::scope(&slog_scope::logger().new(o!()), || { + fs::fs_thread(&config, &todo, &sigint) + }); })); // spawn router threads - for idx in 0..self.parallel { - let sinks = self.sinks.clone(); - let sink_dir = self.sink_dir.clone(); - let sigint = self.sigint.1.clone(); - let todo = self.todo.clone(); - let labels = labels.clone(); + for idx in 0..self.config.parallel { + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { - slog_scope::scope( - &slog_scope::logger().new(o!()), - || { - router::router_thread(sinks, sink_dir, todo, labels, sigint, idx); - }, - ) + slog_scope::scope(&slog_scope::logger().new(o!()), || { + route::route_thread(&config, &todo, &sigint, idx); + }) })); } } pub fn stop(self) { - self.sigint.0.send(()).unwrap(); // FIXME - // for handle in self.handles { - // handle.join().unwrap(); - // } + self.sigint.0.send(()).unwrap(); + for handle in self.handles { + handle.join().unwrap(); + } } } - -// /// Thread sleeping time. -// const REST_TIME: u64 = 10; - -// /// Router loop. -// pub fn router( -// sinks: &Vec, -// labels: &HashMap, -// parameters: &config::Parameters, -// sigint: Arc, -// ) { -// let labels: String = labels.iter().fold(String::new(), |acc, (k, v)| { -// let sep = if acc.is_empty() { "" } else { "," }; -// acc + sep + k + "=" + v -// }); - -// loop { -// let start = time::now_utc(); - -// match route(sinks, parameters, &labels, sigint.clone()) { -// Err(err) => error!("route fail: {}", err), -// Ok(size) => if size > 0 { -// info!("route success - {}", size) -// }, -// } - -// let elapsed = (time::now_utc() - start).num_milliseconds() as u64; -// let sleep_time = if elapsed > parameters.scan_period { -// REST_TIME -// } else { -// cmp::max(parameters.scan_period - elapsed, REST_TIME) -// }; -// for _ in 0..sleep_time / REST_TIME { -// thread::sleep(Duration::from_millis(REST_TIME)); -// if sigint.load(Ordering::Relaxed) { -// return; -// } -// } -// } -// } - -// /// Route handle sources forwarding. -// fn route( -// sinks: &Vec, -// parameters: &config::Parameters, -// labels: &String, -// sigint: Arc, -// ) -> Result> { -// let mut proc_size = 0; -// let mut batch_count = 0; -// let start = time::now_utc().to_timespec(); -// let run_id = format!("{}#{}", start.sec, start.nsec); - -// loop { -// if sigint.load(Ordering::Relaxed) { -// return Ok(proc_size); -// } -// let entries = try!(fs::read_dir(¶meters.source_dir)); -// let mut files = Vec::with_capacity(parameters.batch_count as usize); -// let mut metrics: Vec = Vec::new(); - -// // Load metrics -// let mut batch_size = 0; -// for (i, entry) in entries.enumerate() { -// let entry = try!(entry); -// // Look only for metrics files -// if entry.path().extension() != Some(OsStr::new("metrics")) { -// continue; -// } - -// // Split metrics in capped batch -// if i > parameters.batch_count as usize || batch_size > parameters.batch_size as usize { -// break; -// } - -// debug!("open source file {}", format!("{:?}", entry.path())); -// let file = match read(entry.path()) { -// Err(err) => { -// warn!("{}", err); -// continue; -// } -// Ok(v) => v, -// }; - -// for line in file.lines() { -// match lib::add_labels(&line, labels) { -// Ok(v) => metrics.push(v), -// Err(_) => { -// warn!("bad line {}", &line); -// continue; -// } -// }; -// } - -// files.push(entry.path()); -// batch_size += file.len(); -// } - -// proc_size += metrics.len(); -// batch_count += 1; - -// // Nothing to do -// if files.len() == 0 { -// break; -// } - -// // Setup sinks files -// let dir = Path::new(¶meters.sink_dir); -// { -// let mut sink_files = Vec::with_capacity(sinks.len() as usize); -// // Open tmp files -// for sink in sinks { -// let sink_file = dir.join(format!("{}.tmp", sink.name)); -// debug!("open tmp sink file {}", format!("{:?}", sink_file)); -// sink_files.push(try!(File::create(sink_file))); -// } - -// // Write metrics -// debug!("write sink files"); -// for line in metrics { -// if line.is_empty() { -// continue; -// } - -// for (i, sink) in sinks.iter().enumerate() { -// if sink.selector.is_some() { -// let selector = sink.selector.as_ref().unwrap(); -// if !line.split_whitespace() -// .nth(1) -// .map_or(false, |class| selector.is_match(class)) -// { -// continue; -// } -// } -// try!(sink_files[i].write(line.as_bytes())); -// try!(sink_files[i].write(b"\n")); -// } -// } - -// // Flush -// for i in 0..sinks.len() { -// try!(sink_files[i].flush()); -// } -// } - -// // Rotate -// for sink in sinks { -// let dest_file = dir.join(format!("{}-{}-{}.metrics", sink.name, run_id, batch_count)); -// debug!("rotate tmp sink file to {}", format!("{:?}", dest_file)); -// try!(fs::rename( -// dir.join(format!("{}.tmp", sink.name)), -// dest_file -// )); -// } - -// // Delete forwarded data -// for f in files { -// debug!("delete source file {}", format!("{:?}", f)); -// try!(fs::remove_file(f)); -// } -// } - -// Ok(proc_size) -// } - -// /// Read a file as String -// fn read(path: PathBuf) -> Result> { -// let mut file = try!(File::open(path)); - -// let mut content = String::new(); -// try!(file.read_to_string(&mut content)); - -// Ok(content) -// } diff --git a/src/router/router.rs b/src/router/route.rs similarity index 75% rename from src/router/router.rs rename to src/router/route.rs index c632ccf..74e7586 100644 --- a/src/router/router.rs +++ b/src/router/route.rs @@ -1,33 +1,32 @@ -use std::thread; -use std::time::Duration; -use time; -use std::io::prelude::*; -use std::fs::File; -use std::error::Error; -use std::path::{Path, PathBuf}; use std::collections::VecDeque; +use std::error::Error; use std::fs; +use std::fs::File; +use std::io::prelude::*; use std::io::BufWriter; +use std::path::{Path, PathBuf}; +use std::thread; +use std::time::Duration; +use time; -use std::sync::{Arc, Mutex}; +use config; use futures::future::Shared; use futures::sync::oneshot; -use config; use lib; +use router::RouterConfig; +use std::sync::{Arc, Mutex}; -pub fn router_thread( - sinks: Vec, - sink_dir: String, - todo: Arc>>, - labels: String, - sigint: Shared>, - id: u64 +pub fn route_thread( + config: &RouterConfig, + todo: &Arc>>, + sigint: &Shared>, + id: u64, ) { loop { match todo.lock().unwrap().pop_front() { Some(path) => { - if let Err(err) = route(path, &labels, &sinks, &sink_dir, id) { - warn!("{}", err); + if let Err(err) = route(&path, &config, id) { + warn!("{}", err); } } None => { @@ -41,7 +40,7 @@ pub fn router_thread( } } -fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &String, id: u64) -> Result<(), Box> { +fn route(path: &PathBuf, config: &RouterConfig, id: u64) -> Result<(), Box> { let start = time::now_utc().to_timespec(); let run_id = format!("{}#{}", start.sec, start.nsec); @@ -52,11 +51,11 @@ fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &S let mut metrics: Vec = Vec::new(); for line in content.lines() { - metrics.push(lib::add_labels(&line, labels)?); + metrics.push(lib::add_labels(&line, &config.labels)?); } // Nothing to do - if metrics.len() == 0 { + if metrics.is_empty() { // Delete source file debug!("delete {}", format!("{:?}", path)); fs::remove_file(&path)?; @@ -65,31 +64,32 @@ fn route(path: PathBuf, labels: &String, sinks: &Vec, sink_dir: &S } // Setup sinks files - let dir = Path::new(&sink_dir); // FIXME + let dir = Path::new(&config.sink_dir); // Open tmp files - for sink in sinks { + for sink in &config.sinks { let sink_file_name = dir.join(format!("{}-{}.tmp", sink.name, id)); debug!("open tmp sink file {}", format!("{:?}", sink_file_name)); let mut sink_file = BufWriter::new(File::create(&sink_file_name)?); // Write metrics - for line in metrics.iter() { + for line in &metrics { if line.is_empty() { continue; } if sink.selector.is_some() { let selector = sink.selector.as_ref().unwrap(); - if !line.split_whitespace() + if !line + .split_whitespace() .nth(1) .map_or(false, |class| selector.is_match(class)) { continue; } } - sink_file.write(line.as_bytes())?; - sink_file.write(b"\n")?; + sink_file.write_all(line.as_bytes())?; + sink_file.write_all(b"\n")?; } // Write buffered datas diff --git a/src/scraper.rs b/src/scraper.rs index a50833a..95d4e19 100644 --- a/src/scraper.rs +++ b/src/scraper.rs @@ -1,23 +1,23 @@ //! # Scraper module. //! //! The Scraper module fetch metrics from an HTTP endpoint. -use std::thread; -use std::time::Duration; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; -use time; -use std::io; -use std::cmp; -use hyper; -use hyper_tls::HttpsConnector; -use hyper_timeout::TimeoutConnector; use futures::future::Future; use futures::Stream; -use std::io::prelude::*; +use hyper; +use hyper_timeout::TimeoutConnector; +use hyper_tls::HttpsConnector; +use std::cmp; +use std::error::Error; use std::fs; use std::fs::File; -use std::error::Error; +use std::io; +use std::io::prelude::*; use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use time; use config; @@ -30,7 +30,7 @@ const REST_TIME: u64 = 10; pub fn scraper( scraper: &config::Scraper, parameters: &config::Parameters, - sigint: Arc, + sigint: &Arc, ) { let labels: String = scraper.labels.iter().fold(String::new(), |acc, (k, v)| { let sep = if acc.is_empty() { "" } else { "," }; @@ -80,7 +80,7 @@ fn fetch( let client = hyper::Client::configure().connector(tm).build(&handle); let mut req = hyper::Request::new(hyper::Method::Get, scraper.url.clone()); - for (key, value) in scraper.headers.iter() { + for (key, value) in &scraper.headers { req.headers_mut() .set_raw(key.clone(), vec![value.clone().into_bytes()]); } @@ -107,7 +107,7 @@ fn fetch( // Get now as millis let start = time::now_utc(); - let now = start.to_timespec().sec * 1000 * 1000 + (start.to_timespec().nsec as i64 / 1000); + let now = start.to_timespec().sec * 1000 * 1000 + (i64::from(start.to_timespec().nsec) / 1000); let dir = Path::new(¶meters.source_dir); let temp_file = dir.join(format!("{}.tmp", scraper.name)); @@ -140,18 +140,17 @@ fn fetch( } }; - if scraper.metrics.is_some() { - if !scraper.metrics.as_ref().unwrap().is_match(&line) { - continue; - } + if scraper.metrics.is_some() && !scraper.metrics.as_ref().unwrap().is_match(&line) { + continue; } batch_size += line.len(); - if batch_size > parameters.batch_size as usize && !line.starts_with("=") { + if batch_size > parameters.batch_size as usize && !line.starts_with('=') { // Rotate scraped file file.flush()?; - let dest_file = dir.join(format!("{}-{}-{}.metrics", scraper.name, now, batch_count)); + let dest_file = + dir.join(format!("{}-{}-{}.metrics", scraper.name, now, batch_count)); debug!("rotate tmp file to {}", format!("{:?}", dest_file)); fs::rename(&temp_file, &dest_file)?; @@ -160,8 +159,8 @@ fn fetch( batch_count += 1; } - file.write(line.as_bytes())?; - file.write(b"\n")?; + file.write_all(line.as_bytes())?; + file.write_all(b"\n")?; } file.flush()?; diff --git a/src/sink/fs.rs b/src/sink/fs.rs index 6e509e6..2be766d 100644 --- a/src/sink/fs.rs +++ b/src/sink/fs.rs @@ -20,15 +20,12 @@ use futures::{lazy, Async, Stream}; use tokio_core::reactor::Core; use config; +use sink::SinkConfig; pub fn fs_thread( - name: &str, - dir: &str, - period: u64, - todo: Arc>>, - max_size: u64, - ttl: u64, - sigint: Shared>, + todo: &Arc>>, + config: &SinkConfig, + sigint: &Shared>, mut notify_rx: Receiver, ) { let mut files: HashSet = HashSet::new(); @@ -38,10 +35,13 @@ pub fn fs_thread( loop { let start = time::now_utc(); let work = lazy(|| -> FutureResult<(), ()> { - match list(name, dir, &files, ttl) { + match list(&config.name, &config.dir, config.ttl) { Err(err) => error!("list fail: {}", err), - Ok((new, deleted, size)) => { - if new.len() > 0 { + Ok((entries, size)) => { + let new: Vec = entries.difference(&files).cloned().collect(); + let deleted: Vec = files.difference(&entries).cloned().collect(); + + if !new.is_empty() { debug!("found {} files", new.len()); } @@ -71,7 +71,7 @@ pub fn fs_thread( files.remove(&f); } - match cappe(size, max_size, &todo) { + match cappe(size, config.max_size, &todo) { Err(err) => error!("cappe fail: {}", err), Ok(()) => {} } @@ -83,10 +83,10 @@ pub fn fs_thread( core.run(work).expect("always ok"); let elapsed = (time::now_utc() - start).num_milliseconds() as u64; - let sleep_time = if elapsed > period { + let sleep_time = if elapsed > config.watch_period { config::REST_TIME } else { - cmp::max(period - elapsed, config::REST_TIME) + cmp::max(config.watch_period - elapsed, config::REST_TIME) }; for _ in 0..sleep_time / config::REST_TIME { thread::sleep(Duration::from_millis(config::REST_TIME)); @@ -97,12 +97,7 @@ pub fn fs_thread( } } -fn list( - sink_name: &str, - dir: &str, - files: &HashSet, - ttl: u64, -) -> Result<(Vec, Vec, u64), Box> { +fn list(sink_name: &str, dir: &str, ttl: u64) -> Result<(HashSet, u64), Box> { let mut sink_name = String::from(sink_name); sink_name.push('-'); @@ -127,10 +122,10 @@ fn list( let meta = entry.metadata(); if meta.is_ok() { let meta = meta.unwrap(); - files_size = files_size + meta.len(); + files_size += meta.len(); - let modified = meta.modified().unwrap_or(SystemTime::now()); - let age = modified.elapsed().unwrap_or(Duration::new(0, 0)); + let modified = meta.modified().unwrap_or_else(|_| SystemTime::now()); + let age = modified.elapsed().unwrap_or_else(|_| Duration::new(0, 0)); if age.as_secs() > ttl { warn!("skip file {:?}", entry.path()); @@ -146,10 +141,7 @@ fn list( }) .collect(); - let deleted = files.difference(&entries).cloned().collect(); - let new = entries.difference(&files).cloned().collect(); - - Ok((new, deleted, files_size)) + Ok((entries, files_size)) } fn cappe(actual: u64, target: u64, todo: &Arc>>) -> Result<(), Box> { @@ -167,7 +159,7 @@ fn cappe(actual: u64, target: u64, todo: &Arc>>) -> Resu let path = path.unwrap(); let meta = fs::metadata(&path)?; - size = size - meta.len(); + size -= meta.len(); warn!("skip file {:?}", path); fs::remove_file(path)?; @@ -182,7 +174,7 @@ fn cappe(actual: u64, target: u64, todo: &Arc>>) -> Resu fn remove_empty(path: &PathBuf) -> Result> { let metadata = fs::metadata(path.clone())?; - if metadata.len() <= 0 { + if metadata.len() == 0 { fs::remove_file(path.clone())?; return Ok(true); diff --git a/src/sink/mod.rs b/src/sink/mod.rs index 355f3bf..fabc07b 100644 --- a/src/sink/mod.rs +++ b/src/sink/mod.rs @@ -28,80 +28,84 @@ mod send; const REACTOR_CLIENT: usize = 30; -pub struct Sink<'a> { - name: &'a String, - todo: Arc>>, - handles: Vec>, - dir: &'a String, +#[derive(Debug, Clone)] +pub struct SinkConfig { + name: String, + dir: String, + max_size: u64, + ttl: u64, watch_period: u64, + parallel: u64, + url: hyper::Uri, timeout: u64, keep_alive: bool, - token: &'a String, - token_header: &'a String, - url: &'a hyper::Uri, + token: String, + token_header: String, batch_count: u64, batch_size: u64, - max_size: u64, - ttl: u64, - parallel: u64, +} + +pub struct Sink { + config: SinkConfig, + todo: Arc>>, + handles: Vec>, sigint: (oneshot::Sender<()>, Shared>), } -impl<'a> Sink<'a> { - pub fn new(sink: &'a config::Sink, parameters: &'a config::Parameters) -> Sink<'a> { +impl Sink { + pub fn new(sink: &config::Sink, parameters: &config::Parameters) -> Sink { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - Sink { - name: &sink.name, - todo: Arc::new(Mutex::new(VecDeque::new())), - handles: Vec::new(), - sigint: (shutdown_tx, shutdown_rx.shared()), - dir: ¶meters.sink_dir, + let config = SinkConfig { + name: sink.name.clone(), + dir: parameters.sink_dir.clone(), + max_size: sink.size, + ttl: sink.ttl, watch_period: parameters.scan_period, + parallel: sink.parallel, + url: sink.url.clone(), timeout: parameters.timeout, keep_alive: sink.keep_alive, - token: &sink.token, - token_header: &sink.token_header, - url: &sink.url, + token: sink.token.clone(), + token_header: sink.token_header.clone(), batch_count: parameters.batch_count, batch_size: parameters.batch_size, - max_size: sink.size, - ttl: sink.ttl, - parallel: sink.parallel, + }; + + Sink { + config, + todo: Arc::new(Mutex::new(VecDeque::new())), + handles: Vec::new(), + sigint: (shutdown_tx, shutdown_rx.shared()), } } pub fn start(&mut self) { - debug!("start sink: {}", self.name); - let (notify_tx, notify_rx) = channel(self.parallel as usize); + debug!("start sink: {}", self.config.name); + let (notify_tx, notify_rx) = channel(self.config.parallel as usize); // spawn fs thread - let (name, dir, todo) = (self.name.clone(), self.dir.clone(), self.todo.clone()); - let (period, sigint) = (self.watch_period, self.sigint.1.clone()); - let (max_size, ttl) = (self.max_size, self.ttl); + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { slog_scope::scope( - &slog_scope::logger().new(o!("sink" => name.clone())), - || fs::fs_thread(&name, &dir, period, todo, max_size, ttl, sigint, notify_rx), + &slog_scope::logger().new(o!("sink" => config.name.clone())), + || fs::fs_thread(&todo, &config, &sigint, notify_rx), ); })); // spawn sender threads - let reactor_count = (self.parallel as f64 / REACTOR_CLIENT as f64).ceil() as u64; - let client_count = (self.parallel as f64 / reactor_count as f64).ceil() as u64; + let reactor_count = (self.config.parallel as f64 / REACTOR_CLIENT as f64).ceil() as u64; + let client_count = (self.config.parallel as f64 / reactor_count as f64).ceil() as u64; for _ in 0..reactor_count { - let (name, sigint) = (self.name.clone(), self.sigint.1.clone()); - let todo = self.todo.clone(); - let url = self.url.clone(); - let (timeout, keep_alive) = (self.timeout, self.keep_alive); - let (token, token_header) = (self.token.clone(), self.token_header.clone()); - let (batch_count, batch_size) = (self.batch_count, self.batch_size); + let (todo, sigint) = (self.todo.clone(), self.sigint.1.clone()); let notify_tx: Sender = notify_tx.clone(); + let config = self.config.clone(); self.handles.push(thread::spawn(move || { slog_scope::scope( - &slog_scope::logger().new(o!("sink" => name.clone())), + &slog_scope::logger().new(o!("sink" => config.name.clone())), || { let mut core = Core::new().expect("Fail to start tokio reactor"); let handle = core.handle(); @@ -110,12 +114,12 @@ impl<'a> Sink<'a> { // Handle connection timeouts let mut tm = TimeoutConnector::new(connector, &handle); - tm.set_connect_timeout(Some(Duration::from_secs(timeout))); + tm.set_connect_timeout(Some(Duration::from_secs(config.timeout))); let client = Arc::new( hyper::Client::configure() .body::() - .keep_alive(keep_alive) + .keep_alive(config.keep_alive) .connector(tm) .build(&handle), ); @@ -124,12 +128,8 @@ impl<'a> Sink<'a> { for _ in 0..client_count { let work = send::send_thread( client.clone(), - token.clone(), - token_header.clone(), - url.clone(), + config.clone(), todo.clone(), - batch_count, - batch_size, notify_tx.clone(), ); diff --git a/src/sink/send.rs b/src/sink/send.rs index 61de7e0..6f532b3 100644 --- a/src/sink/send.rs +++ b/src/sink/send.rs @@ -18,6 +18,7 @@ use futures::sync::mpsc::Sender; use futures::task::{current, Task}; use futures::{Async, Future, Poll, Stream}; +use sink::SinkConfig; // use flate2::Compression; // use flate2::write::ZlibEncoder; @@ -27,29 +28,26 @@ pub fn send_thread( client: Arc< hyper::Client>, PayloadBody>, >, - token: String, - token_header: String, - url: hyper::Uri, + config: SinkConfig, todo: Arc>>, - batch_count: u64, - batch_size: u64, notify_tx: Sender, ) -> Box> { - let work = PayloadStream::new(todo, batch_count, batch_size, notify_tx) + let work = PayloadStream::new(todo, config.batch_count, config.batch_size, notify_tx) .for_each(move |mut p| { let mut req: hyper::Request = - hyper::Request::new(hyper::Post, url.clone()); + hyper::Request::new(hyper::Post, config.url.clone()); req.set_body(p.body().expect("Body is never None")); req.headers_mut() - .set_raw(token_header.clone(), token.clone()); + .set_raw(config.token_header.clone(), config.token.clone()); - let req = client + client .clone() .request(req) .and_then(|res| { let status = res.status(); // TODO not read body if not debug - let body = res.body() + let body = res + .body() .fold(String::new(), |mut acc, chunk| { ok::({ acc.push_str(&String::from_utf8_lossy(&chunk)); @@ -91,13 +89,11 @@ pub fn send_thread( } }; ok(()) - }); - - req + }) }) .then(|_| ok(())); - return Box::new(work); + Box::new(work) } struct PayloadStream { @@ -115,10 +111,10 @@ impl PayloadStream { notify_tx: Sender, ) -> PayloadStream { PayloadStream { - batch_count: batch_count, - batch_size: batch_size, - todo: todo, - notify_tx: notify_tx, + batch_count, + batch_size, + todo, + notify_tx, } } } @@ -159,7 +155,7 @@ impl Payload { let mut body = PayloadBody::new(todo.clone(), batch_count, batch_size); body.load(Some(file)); Payload { - todo: todo, + todo, processed: body.processed(), sent_lines: body.sent_lines(), body: Some(body), @@ -211,7 +207,7 @@ impl PayloadBody { PayloadBody { remaining_count: batch_count, remaining_size: batch_size as i64, - todo: todo, + todo, processed: Arc::new(Mutex::new(Vec::new())), reader: None, sent_lines: Arc::new(AtomicUsize::new(0)), @@ -242,7 +238,7 @@ impl PayloadBody { debug!("load file {}", path.display()); self.processed.lock().unwrap().push(path.clone()); - self.remaining_count = self.remaining_count - 1; + self.remaining_count -= 1; match File::open(path.clone()) { Err(err) => { @@ -271,7 +267,8 @@ impl Stream for PayloadBody { // send file while idx < CHUNK_SIZE { - let size = self.reader + let size = self + .reader .as_mut() .expect("reader is never None") .read_line(&mut s)?; @@ -282,14 +279,14 @@ impl Stream for PayloadBody { break; // No more work to do } } else { - idx = idx + size; - sent_lines = sent_lines + 1; + idx += size; + sent_lines += 1; } } self.sent_lines.fetch_add(sent_lines, Ordering::Relaxed); - self.remaining_size = self.remaining_size - idx as i64; + self.remaining_size -= idx as i64; Ok(Async::Ready(Some(hyper::Chunk::from(s)))) } }