Skip to content

Commit

Permalink
Merge pull request #22 from chills42/master
Browse files Browse the repository at this point in the history
Fix various items turned up by linting
  • Loading branch information
jedisct1 authored Oct 23, 2017
2 parents 531b6f6 + fc1dac2 commit b2ae3eb
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 66 deletions.
20 changes: 11 additions & 9 deletions src/flowgger/input/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
x.as_str().expect("input.listen must be an ip:port string")
})
.to_owned();
let threads = get_default_threads(&config);
let threads = get_default_threads(config);
let cert = config
.lookup("input.tls_cert")
.map_or(DEFAULT_CERT, |x| {
Expand Down Expand Up @@ -113,7 +113,7 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
x.as_bool()
.expect("input.tls_verify_peer must be a boolean")
});
let ca_file: Option<PathBuf> = config.lookup("input.tls_ca_file").map_or(None, |x| {
let ca_file: Option<PathBuf> = config.lookup("input.tls_ca_file").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("input.tls_ca_file must be a path to a file"),
Expand Down Expand Up @@ -142,26 +142,28 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
.expect(r#"input.framing must be a string set to "line", "nul" or "syslen""#)
})
.to_owned();
let mut acceptor_builder = match tls_modern {
false => SslAcceptorBuilder::mozilla_intermediate_raw(SslMethod::tls()),
true => SslAcceptorBuilder::mozilla_modern_raw(SslMethod::tls()),
}.unwrap();
let mut acceptor_builder = (if tls_modern {
SslAcceptorBuilder::mozilla_modern_raw(SslMethod::tls())
}
else {
SslAcceptorBuilder::mozilla_intermediate_raw(SslMethod::tls())
}).unwrap();
{
let mut ctx = acceptor_builder.builder_mut();
if let Some(ca_file) = ca_file {
ctx.set_ca_file(&ca_file)
.expect("Unable to read the trusted CA file");
}
if verify_peer == false {
if !verify_peer {
ctx.set_verify(SSL_VERIFY_NONE);
} else {
ctx.set_verify_depth(TLS_VERIFY_DEPTH);
ctx.set_verify(SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT);
}
let mut opts =
SSL_OP_CIPHER_SERVER_PREFERENCE | SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION;
if compression == false {
opts = opts | SSL_OP_NO_COMPRESSION;
if !compression {
opts |= SSL_OP_NO_COMPRESSION;
}
ctx.set_options(opts);
set_fs(&mut ctx);
Expand Down
21 changes: 9 additions & 12 deletions src/flowgger/input/tls/tls_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct TlsInput {

impl TlsInput {
pub fn new(config: &Config) -> TlsInput {
let (tls_config, listen, timeout) = config_parse(&config);
let (tls_config, listen, timeout) = config_parse(config);
TlsInput {
listen: listen,
tls_config: tls_config,
Expand All @@ -35,17 +35,14 @@ impl Input for TlsInput {
) {
let listener = TcpListener::bind(&self.listen as &str).unwrap();
for client in listener.incoming() {
match client {
Ok(client) => {
let _ = client.set_read_timeout(self.timeout);
let tx = tx.clone();
let (decoder, encoder) = (decoder.clone_boxed(), encoder.clone_boxed());
let tls_config = self.tls_config.clone();
thread::spawn(move || {
handle_client(client, tx, decoder, encoder, tls_config);
});
}
Err(_) => {}
if let Ok(client) = client {
let _ = client.set_read_timeout(self.timeout);
let tx = tx.clone();
let (decoder, encoder) = (decoder.clone_boxed(), encoder.clone_boxed());
let tls_config = self.tls_config.clone();
thread::spawn(move || {
handle_client(client, tx, decoder, encoder, tls_config);
});
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/flowgger/input/udp_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::str;
use std::sync::mpsc::SyncSender;

const DEFAULT_LISTEN: &'static str = "0.0.0.0:514";
const MAX_UDP_PACKET_SIZE: usize = 65527;
const MAX_UDP_PACKET_SIZE: usize = 65_527;
const MAX_COMPRESSION_RATIO: usize = 5;

pub struct UdpInput {
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Input for UdpInput {
Err(_) => continue,
};
let line = &buf[..length];
if let Err(e) = handle_record_maybe_compressed(&line, &tx, &decoder, &encoder) {
if let Err(e) = handle_record_maybe_compressed(line, &tx, &decoder, &encoder) {
let _ = writeln!(stderr(), "{}", e);
}
}
Expand All @@ -66,15 +66,15 @@ fn handle_record_maybe_compressed(
let mut decompressed = Vec::with_capacity(MAX_UDP_PACKET_SIZE * MAX_COMPRESSION_RATIO);
match line.zlib_decode().read_to_end(&mut decompressed) {
Ok(_) => handle_record(&decompressed, tx, decoder, encoder),
Err(_) => return Err("Corrupted compressed (zlib) record"),
Err(_) => Err("Corrupted compressed (zlib) record"),
}
} else if line.len() >= 24 && (line[0] == 0x1f && line[1] == 0x8b && line[2] == 0x08) {
let mut decompressed = Vec::with_capacity(MAX_UDP_PACKET_SIZE * MAX_COMPRESSION_RATIO);
match line.gz_decode()
.and_then(|mut x| x.read_to_end(&mut decompressed))
{
Ok(_) => handle_record(&decompressed, tx, decoder, encoder),
Err(_) => return Err("Corrupted compressed (gzip) record"),
Err(_) => Err("Corrupted compressed (gzip) record"),
}
} else {
handle_record(line, tx, decoder, encoder)
Expand Down
22 changes: 11 additions & 11 deletions src/flowgger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ fn get_input_tcpco(_config: &Config) -> ! {

fn get_input(input_type: &str, config: &Config) -> Box<Input> {
match input_type {
"redis" => Box::new(RedisInput::new(&config)) as Box<Input>,
"stdin" => Box::new(StdinInput::new(&config)) as Box<Input>,
"tcp" | "syslog-tcp" => Box::new(TcpInput::new(&config)) as Box<Input>,
"tcp_co" | "tcpco" | "syslog-tcp_co" | "syslog-tcpco" => get_input_tcpco(&config),
"tls" | "syslog-tls" => Box::new(TlsInput::new(&config)) as Box<Input>,
"tls_co" | "tlsco" | "syslog-tls_co" | "syslog-tlsco" => get_input_tlsco(&config),
"udp" => Box::new(UdpInput::new(&config)) as Box<Input>,
"redis" => Box::new(RedisInput::new(config)) as Box<Input>,
"stdin" => Box::new(StdinInput::new(config)) as Box<Input>,
"tcp" | "syslog-tcp" => Box::new(TcpInput::new(config)) as Box<Input>,
"tcp_co" | "tcpco" | "syslog-tcp_co" | "syslog-tcpco" => get_input_tcpco(config),
"tls" | "syslog-tls" => Box::new(TlsInput::new(config)) as Box<Input>,
"tls_co" | "tlsco" | "syslog-tls_co" | "syslog-tlsco" => get_input_tlsco(config),
"udp" => Box::new(UdpInput::new(config)) as Box<Input>,
_ => panic!("Invalid input type: {}", input_type),
}
}

#[cfg(feature = "kafka")]
fn get_output_kafka(config: &Config) -> Box<Output> {
Box::new(KafkaOutput::new(&config)) as Box<Output>
Box::new(KafkaOutput::new(config)) as Box<Output>
}

#[cfg(not(feature = "kafka"))]
Expand All @@ -79,9 +79,9 @@ fn get_output_kafka(_config: &Config) -> ! {

fn get_output(output_type: &str, config: &Config) -> Box<Output> {
match output_type {
"stdout" | "debug" => Box::new(DebugOutput::new(&config)) as Box<Output>,
"kafka" => get_output_kafka(&config),
"tls" | "syslog-tls" => Box::new(TlsOutput::new(&config)) as Box<Output>,
"stdout" | "debug" => Box::new(DebugOutput::new(config)) as Box<Output>,
"kafka" => get_output_kafka(config),
"tls" | "syslog-tls" => Box::new(TlsOutput::new(config)) as Box<Output>,
_ => panic!("Invalid output type: {}", output_type),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/flowgger/output/kafka_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const KAFKA_DEFAULT_ACKS: i16 = 0;
const KAFKA_DEFAULT_COALESCE: usize = 1;
const KAFKA_DEFAULT_COMPRESSION: &'static str = "none";
const KAFKA_DEFAULT_THREADS: u32 = 1;
const KAFKA_DEFAULT_TIMEOUT: u64 = 60000;
const KAFKA_DEFAULT_TIMEOUT: u64 = 60_000;

pub struct KafkaOutput {
config: KafkaConfig,
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Output for KafkaOutput {
let _ = writeln!(stderr(), "Output framing is ignored with the Kafka output");
}
for _ in 0..self.threads {
let arx = arx.clone();
let arx = Arc::clone(&arx);
let config = self.config.clone();
thread::spawn(move || {
let mut worker = KafkaWorker::new(arx, config);
Expand Down
26 changes: 13 additions & 13 deletions src/flowgger/output/tls_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl TlsWorker {
_ => return Err(e),
},
};
if self.tls_config.async == false {
if !self.tls_config.async {
writer.flush()?;
}
}
Expand All @@ -125,7 +125,7 @@ impl TlsWorker {
fn run(self) {
let tls_config = &self.tls_config;
let mut rng = rand::thread_rng();
let mut recovery_delay = tls_config.recovery_delay_init as f64;
let mut recovery_delay = f64::from(tls_config.recovery_delay_init);
let mut last_recovery;
loop {
last_recovery = chrono::offset::Utc::now();
Expand Down Expand Up @@ -162,10 +162,10 @@ impl TlsWorker {
}
let now = chrono::offset::Utc::now();
if now.signed_duration_since(last_recovery)
> chrono::Duration::milliseconds(tls_config.recovery_probe_time as i64)
> chrono::Duration::milliseconds(i64::from(tls_config.recovery_probe_time))
{
recovery_delay = tls_config.recovery_delay_init as f64;
} else if recovery_delay < tls_config.recovery_delay_max as f64 {
recovery_delay = f64::from(tls_config.recovery_delay_init);
} else if recovery_delay < f64::from(tls_config.recovery_delay_max) {
let between = Range::new(0.0, recovery_delay);
let mut rng = rand::thread_rng();
recovery_delay += between.ind_sample(&mut rng);
Expand All @@ -189,7 +189,7 @@ fn new_tcp(connect_chosen: &str) -> Result<TcpStream, io::Error> {

impl TlsOutput {
pub fn new(config: &Config) -> TlsOutput {
let (tls_config, threads) = config_parse(&config);
let (tls_config, threads) = config_parse(config);
TlsOutput {
config: tls_config,
threads: threads,
Expand All @@ -200,7 +200,7 @@ impl TlsOutput {
impl Output for TlsOutput {
fn start(&self, arx: Arc<Mutex<Receiver<Vec<u8>>>>, merger: Option<Box<Merger>>) {
for _ in 0..self.threads {
let arx = arx.clone();
let arx = Arc::clone(&arx);
let config = self.config.clone();
let merger = match merger {
Some(ref merger) => Some(merger.clone_boxed()) as Option<Box<Merger + Send>>,
Expand Down Expand Up @@ -245,13 +245,13 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
.to_owned()
})
.collect();
let cert: Option<PathBuf> = config.lookup("output.tls_cert").map_or(None, |x| {
let cert: Option<PathBuf> = config.lookup("output.tls_cert").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_cert must be a path to a .pem file"),
))
});
let key: Option<PathBuf> = config.lookup("output.tls_key").map_or(None, |x| {
let key: Option<PathBuf> = config.lookup("output.tls_key").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_key must be a path to a .pem file"),
Expand All @@ -270,7 +270,7 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
x.as_bool()
.expect("output.tls_verify_peer must be a boolean")
});
let ca_file: Option<PathBuf> = config.lookup("output.tls_ca_file").map_or(None, |x| {
let ca_file: Option<PathBuf> = config.lookup("output.tls_ca_file").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_ca_file must be a path to a file"),
Expand Down Expand Up @@ -319,7 +319,7 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
let mut connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
{
let mut ctx = connector_builder.builder_mut();
if verify_peer == false {
if !verify_peer {
ctx.set_verify(SSL_VERIFY_NONE);
} else {
ctx.set_verify_depth(TLS_VERIFY_DEPTH);
Expand All @@ -331,8 +331,8 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
}
let mut opts =
SSL_OP_CIPHER_SERVER_PREFERENCE | SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION;
if compression == false {
opts = opts | SSL_OP_NO_COMPRESSION;
if !compression {
opts |= SSL_OP_NO_COMPRESSION;
}
ctx.set_options(opts);
set_fs(&mut ctx);
Expand Down
24 changes: 12 additions & 12 deletions src/flowgger/record_capnp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub mod record {
}

impl<'a> Reader<'a> {
pub fn borrow<'b>(&'b self) -> Reader<'b> {
pub fn borrow(&self) -> Reader {
Reader { ..*self }
}

Expand Down Expand Up @@ -213,10 +213,10 @@ pub mod record {
pub fn as_reader(self) -> Reader<'a> {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}
pub fn borrow<'b>(&'b mut self) -> Builder<'b> {
pub fn borrow(&mut self) -> Builder {
Builder { ..*self }
}
pub fn borrow_as_reader<'b>(&'b self) -> Reader<'b> {
pub fn borrow_as_reader(&self) -> Reader {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}

Expand Down Expand Up @@ -499,7 +499,7 @@ pub mod pair {
}

impl<'a> Reader<'a> {
pub fn borrow<'b>(&'b self) -> Reader<'b> {
pub fn borrow(&self) -> Reader {
Reader { ..*self }
}

Expand Down Expand Up @@ -577,10 +577,10 @@ pub mod pair {
pub fn as_reader(self) -> Reader<'a> {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}
pub fn borrow<'b>(&'b mut self) -> Builder<'b> {
pub fn borrow(&mut self) -> Builder {
Builder { ..*self }
}
pub fn borrow_as_reader<'b>(&'b self) -> Reader<'b> {
pub fn borrow_as_reader(&self) -> Reader {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}

Expand Down Expand Up @@ -613,9 +613,9 @@ pub mod pair {
self.builder.set_data_field::<u16>(0, 0);
self.builder.get_pointer_field(1).clear();
self.builder.set_bool_field(16, false);
self.builder.set_data_field::<f64>(1, 0u8 as f64);
self.builder.set_data_field::<i64>(1, 0u8 as i64);
self.builder.set_data_field::<u64>(1, 0u8 as u64);
self.builder.set_data_field::<f64>(1, 0f64);
self.builder.set_data_field::<i64>(1, 0i64);
self.builder.set_data_field::<u64>(1, 0u64);
::capnp::traits::FromStructBuilder::new(self.builder)
}
}
Expand Down Expand Up @@ -702,7 +702,7 @@ pub mod pair {
}

impl<'a> Reader<'a> {
pub fn borrow<'b>(&'b self) -> Reader<'b> {
pub fn borrow(&self) -> Reader {
Reader { ..*self }
}

Expand Down Expand Up @@ -801,10 +801,10 @@ pub mod pair {
pub fn as_reader(self) -> Reader<'a> {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}
pub fn borrow<'b>(&'b mut self) -> Builder<'b> {
pub fn borrow(&mut self) -> Builder {
Builder { ..*self }
}
pub fn borrow_as_reader<'b>(&'b self) -> Reader<'b> {
pub fn borrow_as_reader(&self) -> Reader {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}

Expand Down
2 changes: 1 addition & 1 deletion src/flowgger/splitter/nul_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<T: Read> Splitter<T> for NulSplitter {
};
if let Err(e) = handle_line(line, &tx, &decoder, &encoder) {
let line = line.trim();
if line.len() > 0 {
if !line.is_empty() {
let _ = writeln!(stderr(), "{}: [{}]", e, line.trim());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/flowgger/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ impl PreciseTimestamp {
pub fn now() -> Self {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
PreciseTimestamp {
ts: now.as_secs() as f64 + now.subsec_nanos() as f64 / 1e9,
ts: now.as_secs() as f64 + f64::from(now.subsec_nanos()) / 1e9,
}
}

#[inline]
pub fn from_datetime(tsd: DateTime<FixedOffset>) -> Self {
PreciseTimestamp {
ts: tsd.timestamp() as f64 + tsd.naive_utc().nanosecond() as f64 / 1e9,
ts: tsd.timestamp() as f64 + f64::from(tsd.naive_utc().nanosecond()) / 1e9,
}
}

Expand Down

0 comments on commit b2ae3eb

Please sign in to comment.