Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #19 from runabove/filter
Browse files Browse the repository at this point in the history
Filter
  • Loading branch information
d33d33 authored Feb 27, 2017
2 parents 27b2593 + c67484c commit 2990611
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beamium"
version = "1.3.1"
version = "1.4.0"
authors = [ "d33d33 <[email protected]>" ]

build = "build.rs"
Expand Down
38 changes: 22 additions & 16 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,30 @@ fn load_path<P: AsRef<Path>>(file_path: P, config: &mut Config) -> Result<(), Co
let period = try!(cast::u64(period)
.map_err(|_| format!("sources.{}.period is invalid", name)));
let format = if v["format"].is_badvalue() {
SourceFormat::Prometheus
} else {
let f = try!(v["format"]
.as_str()
.ok_or(format!("sinks.{}.format should be a string", name)));

if f == "prometheus" {
SourceFormat::Prometheus
} else if f == "sensision" {
SourceFormat::Sensision
} else {
let f = try!(v["format"]
.as_str()
.ok_or(format!("sinks.{}.format should be a string", name)));

if f == "prometheus" {
SourceFormat::Prometheus
} else if f == "sensision" {
SourceFormat::Sensision
} else {
return Err(format!("sinks.{}.format should be 'Prometheus' or 'sensision'", name).into())
}
};
return Err(format!("sinks.{}.format should be 'Prometheus' or 'sensision'",
name)
.into());
}
};
let metrics = if v["metrics"].is_badvalue() {
None
} else {
let mut metrics = Vec::new();
let values = try!(v["metrics"].as_vec().ok_or("metrics should be an array"));
for v in values {
let value = try!(regex::Regex::new(try!(v.as_str()
.ok_or(format!("metrics.{} is invalid", name)))));
.ok_or(format!("metrics.{} is invalid", name)))));
metrics.push(String::from(r"^(\S*)\s") + value.as_str());
}

Expand Down Expand Up @@ -267,9 +269,13 @@ fn load_path<P: AsRef<Path>>(file_path: P, config: &mut Config) -> Result<(), Co
let selector = if v["selector"].is_badvalue() {
None
} else {
Some(try!(regex::Regex::new(try!(v["selector"]
.as_str()
.ok_or(format!("sinks.{}.selector is invalid", name))))))
Some(try!(regex::Regex::new(format!("^{}",
try!(v["selector"]
.as_str()
.ok_or(format!("sinks.{}.selector \
is invalid",
name))))
.as_str())))
};

let ttl = if v["ttl"].is_badvalue() {
Expand Down
1 change: 0 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ fn main() {
}

// Synchronisation stuff
// let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);
let sigint = Arc::new(AtomicBool::new(false));
let mut handles = Vec::with_capacity(config.sources.len() + 1 + config.sinks.len());

Expand Down
23 changes: 17 additions & 6 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ pub fn router(sinks: &Vec<config::Sink>,
loop {
let start = time::now_utc();

match route(sinks, parameters, &labels) {
match route(sinks, parameters, &labels, sigint.clone()) {
Err(err) => error!("route fail: {}", err),
Ok(_) => info!("route success"),
Ok(size) => {
if size > 0 {
info!("route success - {}", size)
}
}
}

let elapsed = (time::now_utc() - start).num_milliseconds() as u64;
Expand All @@ -58,10 +62,15 @@ pub fn router(sinks: &Vec<config::Sink>,
/// Route handle sources forwarding.
fn route(sinks: &Vec<config::Sink>,
parameters: &config::Parameters,
labels: &String)
-> Result<(), Box<Error>> {
debug!("route");
labels: &String,
sigint: Arc<AtomicBool>)
-> Result<usize, Box<Error>> {
let mut proc_size = 0;

loop {
if sigint.load(Ordering::Relaxed) {
return Ok(proc_size);
}
let entries = try!(fs::read_dir(&parameters.source_dir));
let mut files = Vec::with_capacity(parameters.batch_count as usize);
let mut metrics: Vec<String> = Vec::new();
Expand Down Expand Up @@ -127,6 +136,8 @@ fn route(sinks: &Vec<config::Sink>,
batch_size += file.len();
}

proc_size += metrics.len();

// Nothing to do
if files.len() == 0 {
break;
Expand Down Expand Up @@ -185,7 +196,7 @@ fn route(sinks: &Vec<config::Sink>,
}
}

Ok(())
Ok(proc_size)
}

/// Read a file as String
Expand Down
55 changes: 35 additions & 20 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ pub fn sink(sink: &config::Sink, parameters: &config::Parameters, sigint: Arc<At
loop {
let start = time::now_utc();

match send(sink, parameters) {
match send(sink, parameters, sigint.clone()) {
Err(err) => error!("post fail: {}", err),
Ok(_) => info!("post success"),
Ok(size) => {
if size > 0 {
info!("post success - {}", size)
}
}
}

let res = cappe(sink, parameters);
Expand All @@ -54,10 +58,17 @@ pub fn sink(sink: &config::Sink, parameters: &config::Parameters, sigint: Arc<At
}

/// Send sink metrics to Warp10.
fn send(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<Error>> {
debug!("post {}", &sink.url);
fn send(sink: &config::Sink,
parameters: &config::Parameters,
sigint: Arc<AtomicBool>)
-> Result<usize, Box<Error>> {
let mut proc_size = 0;

loop {
if sigint.load(Ordering::Relaxed) {
return Ok(proc_size);
}

let entries = try!(files(&parameters.sink_dir, &sink.name));
let mut files = Vec::with_capacity(parameters.batch_count as usize);
let mut metrics = String::new();
Expand All @@ -82,6 +93,8 @@ fn send(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<
metrics.push_str("\n");
}

proc_size += metrics.len();

// Nothing to do
if metrics.len() == 0 {
break;
Expand All @@ -97,7 +110,7 @@ fn send(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<
let mut headers = hyper::header::Headers::new();
headers.set_raw(sink.token_header.clone(), vec![sink.token.clone().into()]);

debug!("post metrics");
debug!("post {}", &sink.url);
let request = client.post(&sink.url).headers(headers).body(&metrics);
let mut res = try!(request.send());
if !res.status.is_success() {
Expand All @@ -115,7 +128,7 @@ fn send(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<
}
}

Ok(())
Ok(proc_size)
}

fn cappe(sink: &config::Sink, parameters: &config::Parameters) -> Result<(), Box<Error>> {
Expand Down Expand Up @@ -159,23 +172,25 @@ fn read(path: PathBuf) -> Result<String, Box<Error>> {
}

fn files(dir: &str, sink_name: &str) -> Result<Vec<fs::DirEntry>, Box<Error>> {
let mut entries: Vec<fs::DirEntry> = try!(fs::read_dir(dir)).filter_map(|entry| {
if entry.is_err() {
return None;
}
let entry = entry.unwrap();
if entry.path().extension() != Some(OsStr::new("metrics")) {
return None;
}
let mut entries: Vec<fs::DirEntry> = try!(fs::read_dir(dir))
.filter_map(|entry| {
if entry.is_err() {
return None;
}
let entry = entry.unwrap();
if entry.path().extension() != Some(OsStr::new("metrics")) {
return None;
}

let file_name = String::from(entry.file_name().to_str().unwrap_or(""));
let file_name = String::from(entry.file_name().to_str().unwrap_or(""));

if !file_name.starts_with(sink_name) {
return None;
}
if !file_name.starts_with(sink_name) {
return None;
}

Some(entry)
}).collect();
Some(entry)
})
.collect();

entries.sort_by(|a, b| b.file_name().cmp(&a.file_name()));

Expand Down

0 comments on commit 2990611

Please sign in to comment.