Skip to content
This repository has been archived by the owner on Nov 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #76 from ovh/router
Browse files Browse the repository at this point in the history
Router
  • Loading branch information
d33d33 authored Aug 27, 2018
2 parents 785f737 + f3e149b commit b8b134f
Show file tree
Hide file tree
Showing 17 changed files with 624 additions and 581 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "beamium"
version = "1.8.2"
version = "1.9.0"
authors = [ "d33d33 <[email protected]>" ]

build = "build.rs"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ parameters: # Parameters definitions (Optional)
log-file: beamium.log # Log file (Optional, default: beamium.log)
log-level: 4 # Log level (Optional, default: info)
timeout: 500 # Http timeout (seconds) (Optional, default: 500)
router-parallel: 1 # Routing threads (Optional, default: 1)
```
## Contributing
Expand Down
4 changes: 2 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::process::Command;
use std::path::Path;
use std::env;
use std::fs::File;
use std::io::prelude::*;
use std::path::Path;
use std::process::Command;

fn main() {
let output = Command::new("git")
Expand Down
388 changes: 178 additions & 210 deletions src/config.rs

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ pub fn add_labels(line: &str, labels: &str) -> Result<String, Box<Error>> {
return Ok(String::from(line));
}

let mut parts = line.splitn(2, "{");
let mut parts = line.splitn(2, '{');

let class = parts.next().ok_or("no_class")?;
let class = String::from(class);

let plabels = parts.next().ok_or("no_labels")?;
let plabels = String::from(plabels);

let sep = if plabels.trim().starts_with("}") {
let sep = if plabels.trim().starts_with('}') {
""
} else {
","
Expand Down
22 changes: 10 additions & 12 deletions src/lib/transcompiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ pub struct Transcompiler<'a> {
impl<'a> Transcompiler<'a> {
pub fn new(format: &config::ScraperFormat) -> Transcompiler {
let start = time::now_utc();
let now = start.to_timespec().sec * 1000 * 1000 + (start.to_timespec().nsec as i64 / 1000);
Transcompiler {
format: format,
now: now,
}
let now = start.to_timespec().sec * 1000 * 1000
+ (i64::from(start.to_timespec().nsec) as i64 / 1000);
Transcompiler { format, now }
}

pub fn format(&self, line: &str) -> Result<String, Box<Error>> {
match self.format {
&config::ScraperFormat::Sensision => format_warp10(line),
&config::ScraperFormat::Prometheus => format_prometheus(line, self.now),
match *self.format {
config::ScraperFormat::Sensision => format_warp10(line),
config::ScraperFormat::Prometheus => format_prometheus(line, self.now),
}
}
}
Expand All @@ -36,12 +34,12 @@ fn format_prometheus(line: &str, now: i64) -> Result<String, Box<Error>> {
let line = line.trim();

// Skip comments
if line.starts_with("#") {
if line.starts_with('#') {
return Ok(String::new());
}

// Extract Prometheus metric
let index = if line.contains("{") {
let index = if line.contains('{') {
line.rfind('}').ok_or("bad class")?
} else {
line.find(' ').ok_or("bad class")?
Expand All @@ -62,7 +60,7 @@ fn format_prometheus(line: &str, now: i64) -> Result<String, Box<Error>> {
.unwrap_or(now);

// Format class
let mut parts = class.splitn(2, "{");
let mut parts = class.splitn(2, '{');
let class = String::from(parts.next().ok_or("no_class")?);
let class = class.trim();
let plabels = parts.next();
Expand All @@ -78,7 +76,7 @@ fn format_prometheus(line: &str, now: i64) -> Result<String, Box<Error>> {
.map(|v| v.replace(r"\n", "%0A")) // unescape
.fold(String::new(), |acc, x| {
// skip invalid values
if !x.contains("=") {
if !x.contains('=') {
return acc
}
acc + &x + ","
Expand Down
23 changes: 11 additions & 12 deletions src/log.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
//! # Log module.
//!
//! The Config module provides the log facility.
use slog::Logger;
use slog::Level;
use slog::Drain;
use slog::Duplicate;
use slog::Level;
use slog::LevelFilter;
use slog::Drain;
use slog::Logger;
use slog_async;
use slog_term;
use slog_scope;
use slog_syslog;
use slog_syslog::Facility;
use slog_scope;
use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use slog_term;
use std;
use std::path::Path;
use std::error::Error;
use std::fs;
use std::fs::OpenOptions;
use std::os::unix::fs::OpenOptionsExt;
use std::path::Path;

use config;

Expand All @@ -33,10 +33,9 @@ pub fn bootstrap() {
/// Send log to console and log file, also handle log level.
pub fn log(parameters: &config::Parameters, verbose: u64) -> Result<(), Box<Error>> {
// Ensure log directory is present
match Path::new(&parameters.log_file).parent() {
Some(log_path) => fs::create_dir_all(log_path)?,
None => {}
};
if let Some(log_path) = Path::new(&parameters.log_file).parent() {
fs::create_dir_all(log_path)?
}

// Stdout drain
let term_decorator = slog_term::TermDecorator::new().build();
Expand Down
31 changes: 11 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ extern crate tokio_timer;
extern crate yaml_rust;

use clap::App;
use std::thread;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::fs;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

mod config;
mod scraper;
mod router;
mod sink;
mod lib;
mod log;
mod router;
mod scraper;
mod sink;

include!("version.rs");

Expand Down Expand Up @@ -131,26 +131,15 @@ fn main() {
handles.push(thread::spawn(move || {
slog_scope::scope(
&slog_scope::logger().new(o!("scraper" => scraper.name.clone())),
|| scraper::scraper(&scraper, &parameters, sigint),
|| scraper::scraper(&scraper, &parameters, &sigint),
);
}));
}

// Spawn router
info!("spawning router");
{
let (sinks, labels, parameters, sigint) = (
config.sinks.clone(),
config.labels.clone(),
config.parameters.clone(),
sigint.clone(),
);
handles.push(thread::spawn(move || {
slog_scope::scope(&slog_scope::logger().new(o!()), || {
router::router(&sinks, &labels, &parameters, sigint)
});
}));
}
let mut router = router::Router::new(&config.sinks, &config.parameters, &config.labels);
router.start();

// Spawn sinks
info!("spawning sinks");
Expand Down Expand Up @@ -178,6 +167,8 @@ fn main() {
handle.join().unwrap();
}

router.stop();

for s in sinks {
s.stop();
}
Expand Down
Loading

0 comments on commit b8b134f

Please sign in to comment.