Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various items turned up by linting #22

Merged
merged 1 commit into from
Oct 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions src/flowgger/input/tls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
x.as_str().expect("input.listen must be an ip:port string")
})
.to_owned();
let threads = get_default_threads(&config);
let threads = get_default_threads(config);
let cert = config
.lookup("input.tls_cert")
.map_or(DEFAULT_CERT, |x| {
Expand Down Expand Up @@ -113,7 +113,7 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
x.as_bool()
.expect("input.tls_verify_peer must be a boolean")
});
let ca_file: Option<PathBuf> = config.lookup("input.tls_ca_file").map_or(None, |x| {
let ca_file: Option<PathBuf> = config.lookup("input.tls_ca_file").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("input.tls_ca_file must be a path to a file"),
Expand Down Expand Up @@ -142,26 +142,28 @@ pub fn config_parse(config: &Config) -> (TlsConfig, String, u64) {
.expect(r#"input.framing must be a string set to "line", "nul" or "syslen""#)
})
.to_owned();
let mut acceptor_builder = match tls_modern {
false => SslAcceptorBuilder::mozilla_intermediate_raw(SslMethod::tls()),
true => SslAcceptorBuilder::mozilla_modern_raw(SslMethod::tls()),
}.unwrap();
let mut acceptor_builder = (if tls_modern {
SslAcceptorBuilder::mozilla_modern_raw(SslMethod::tls())
}
else {
SslAcceptorBuilder::mozilla_intermediate_raw(SslMethod::tls())
}).unwrap();
{
let mut ctx = acceptor_builder.builder_mut();
if let Some(ca_file) = ca_file {
ctx.set_ca_file(&ca_file)
.expect("Unable to read the trusted CA file");
}
if verify_peer == false {
if !verify_peer {
ctx.set_verify(SSL_VERIFY_NONE);
} else {
ctx.set_verify_depth(TLS_VERIFY_DEPTH);
ctx.set_verify(SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT);
}
let mut opts =
SSL_OP_CIPHER_SERVER_PREFERENCE | SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION;
if compression == false {
opts = opts | SSL_OP_NO_COMPRESSION;
if !compression {
opts |= SSL_OP_NO_COMPRESSION;
}
ctx.set_options(opts);
set_fs(&mut ctx);
Expand Down
21 changes: 9 additions & 12 deletions src/flowgger/input/tls/tls_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct TlsInput {

impl TlsInput {
pub fn new(config: &Config) -> TlsInput {
let (tls_config, listen, timeout) = config_parse(&config);
let (tls_config, listen, timeout) = config_parse(config);
TlsInput {
listen: listen,
tls_config: tls_config,
Expand All @@ -35,17 +35,14 @@ impl Input for TlsInput {
) {
let listener = TcpListener::bind(&self.listen as &str).unwrap();
for client in listener.incoming() {
match client {
Ok(client) => {
let _ = client.set_read_timeout(self.timeout);
let tx = tx.clone();
let (decoder, encoder) = (decoder.clone_boxed(), encoder.clone_boxed());
let tls_config = self.tls_config.clone();
thread::spawn(move || {
handle_client(client, tx, decoder, encoder, tls_config);
});
}
Err(_) => {}
if let Ok(client) = client {
let _ = client.set_read_timeout(self.timeout);
let tx = tx.clone();
let (decoder, encoder) = (decoder.clone_boxed(), encoder.clone_boxed());
let tls_config = self.tls_config.clone();
thread::spawn(move || {
handle_client(client, tx, decoder, encoder, tls_config);
});
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/flowgger/input/udp_input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::str;
use std::sync::mpsc::SyncSender;

const DEFAULT_LISTEN: &'static str = "0.0.0.0:514";
const MAX_UDP_PACKET_SIZE: usize = 65527;
const MAX_UDP_PACKET_SIZE: usize = 65_527;
const MAX_COMPRESSION_RATIO: usize = 5;

pub struct UdpInput {
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Input for UdpInput {
Err(_) => continue,
};
let line = &buf[..length];
if let Err(e) = handle_record_maybe_compressed(&line, &tx, &decoder, &encoder) {
if let Err(e) = handle_record_maybe_compressed(line, &tx, &decoder, &encoder) {
let _ = writeln!(stderr(), "{}", e);
}
}
Expand All @@ -66,15 +66,15 @@ fn handle_record_maybe_compressed(
let mut decompressed = Vec::with_capacity(MAX_UDP_PACKET_SIZE * MAX_COMPRESSION_RATIO);
match line.zlib_decode().read_to_end(&mut decompressed) {
Ok(_) => handle_record(&decompressed, tx, decoder, encoder),
Err(_) => return Err("Corrupted compressed (zlib) record"),
Err(_) => Err("Corrupted compressed (zlib) record"),
}
} else if line.len() >= 24 && (line[0] == 0x1f && line[1] == 0x8b && line[2] == 0x08) {
let mut decompressed = Vec::with_capacity(MAX_UDP_PACKET_SIZE * MAX_COMPRESSION_RATIO);
match line.gz_decode()
.and_then(|mut x| x.read_to_end(&mut decompressed))
{
Ok(_) => handle_record(&decompressed, tx, decoder, encoder),
Err(_) => return Err("Corrupted compressed (gzip) record"),
Err(_) => Err("Corrupted compressed (gzip) record"),
}
} else {
handle_record(line, tx, decoder, encoder)
Expand Down
22 changes: 11 additions & 11 deletions src/flowgger/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ fn get_input_tcpco(_config: &Config) -> ! {

fn get_input(input_type: &str, config: &Config) -> Box<Input> {
match input_type {
"redis" => Box::new(RedisInput::new(&config)) as Box<Input>,
"stdin" => Box::new(StdinInput::new(&config)) as Box<Input>,
"tcp" | "syslog-tcp" => Box::new(TcpInput::new(&config)) as Box<Input>,
"tcp_co" | "tcpco" | "syslog-tcp_co" | "syslog-tcpco" => get_input_tcpco(&config),
"tls" | "syslog-tls" => Box::new(TlsInput::new(&config)) as Box<Input>,
"tls_co" | "tlsco" | "syslog-tls_co" | "syslog-tlsco" => get_input_tlsco(&config),
"udp" => Box::new(UdpInput::new(&config)) as Box<Input>,
"redis" => Box::new(RedisInput::new(config)) as Box<Input>,
"stdin" => Box::new(StdinInput::new(config)) as Box<Input>,
"tcp" | "syslog-tcp" => Box::new(TcpInput::new(config)) as Box<Input>,
"tcp_co" | "tcpco" | "syslog-tcp_co" | "syslog-tcpco" => get_input_tcpco(config),
"tls" | "syslog-tls" => Box::new(TlsInput::new(config)) as Box<Input>,
"tls_co" | "tlsco" | "syslog-tls_co" | "syslog-tlsco" => get_input_tlsco(config),
"udp" => Box::new(UdpInput::new(config)) as Box<Input>,
_ => panic!("Invalid input type: {}", input_type),
}
}

#[cfg(feature = "kafka")]
fn get_output_kafka(config: &Config) -> Box<Output> {
Box::new(KafkaOutput::new(&config)) as Box<Output>
Box::new(KafkaOutput::new(config)) as Box<Output>
}

#[cfg(not(feature = "kafka"))]
Expand All @@ -79,9 +79,9 @@ fn get_output_kafka(_config: &Config) -> ! {

fn get_output(output_type: &str, config: &Config) -> Box<Output> {
match output_type {
"stdout" | "debug" => Box::new(DebugOutput::new(&config)) as Box<Output>,
"kafka" => get_output_kafka(&config),
"tls" | "syslog-tls" => Box::new(TlsOutput::new(&config)) as Box<Output>,
"stdout" | "debug" => Box::new(DebugOutput::new(config)) as Box<Output>,
"kafka" => get_output_kafka(config),
"tls" | "syslog-tls" => Box::new(TlsOutput::new(config)) as Box<Output>,
_ => panic!("Invalid output type: {}", output_type),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/flowgger/output/kafka_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const KAFKA_DEFAULT_ACKS: i16 = 0;
const KAFKA_DEFAULT_COALESCE: usize = 1;
const KAFKA_DEFAULT_COMPRESSION: &'static str = "none";
const KAFKA_DEFAULT_THREADS: u32 = 1;
const KAFKA_DEFAULT_TIMEOUT: u64 = 60000;
const KAFKA_DEFAULT_TIMEOUT: u64 = 60_000;

pub struct KafkaOutput {
config: KafkaConfig,
Expand Down Expand Up @@ -201,7 +201,7 @@ impl Output for KafkaOutput {
let _ = writeln!(stderr(), "Output framing is ignored with the Kafka output");
}
for _ in 0..self.threads {
let arx = arx.clone();
let arx = Arc::clone(&arx);
let config = self.config.clone();
thread::spawn(move || {
let mut worker = KafkaWorker::new(arx, config);
Expand Down
26 changes: 13 additions & 13 deletions src/flowgger/output/tls_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl TlsWorker {
_ => return Err(e),
},
};
if self.tls_config.async == false {
if !self.tls_config.async {
writer.flush()?;
}
}
Expand All @@ -125,7 +125,7 @@ impl TlsWorker {
fn run(self) {
let tls_config = &self.tls_config;
let mut rng = rand::thread_rng();
let mut recovery_delay = tls_config.recovery_delay_init as f64;
let mut recovery_delay = f64::from(tls_config.recovery_delay_init);
let mut last_recovery;
loop {
last_recovery = chrono::offset::Utc::now();
Expand Down Expand Up @@ -162,10 +162,10 @@ impl TlsWorker {
}
let now = chrono::offset::Utc::now();
if now.signed_duration_since(last_recovery)
> chrono::Duration::milliseconds(tls_config.recovery_probe_time as i64)
> chrono::Duration::milliseconds(i64::from(tls_config.recovery_probe_time))
{
recovery_delay = tls_config.recovery_delay_init as f64;
} else if recovery_delay < tls_config.recovery_delay_max as f64 {
recovery_delay = f64::from(tls_config.recovery_delay_init);
} else if recovery_delay < f64::from(tls_config.recovery_delay_max) {
let between = Range::new(0.0, recovery_delay);
let mut rng = rand::thread_rng();
recovery_delay += between.ind_sample(&mut rng);
Expand All @@ -189,7 +189,7 @@ fn new_tcp(connect_chosen: &str) -> Result<TcpStream, io::Error> {

impl TlsOutput {
pub fn new(config: &Config) -> TlsOutput {
let (tls_config, threads) = config_parse(&config);
let (tls_config, threads) = config_parse(config);
TlsOutput {
config: tls_config,
threads: threads,
Expand All @@ -200,7 +200,7 @@ impl TlsOutput {
impl Output for TlsOutput {
fn start(&self, arx: Arc<Mutex<Receiver<Vec<u8>>>>, merger: Option<Box<Merger>>) {
for _ in 0..self.threads {
let arx = arx.clone();
let arx = Arc::clone(&arx);
let config = self.config.clone();
let merger = match merger {
Some(ref merger) => Some(merger.clone_boxed()) as Option<Box<Merger + Send>>,
Expand Down Expand Up @@ -245,13 +245,13 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
.to_owned()
})
.collect();
let cert: Option<PathBuf> = config.lookup("output.tls_cert").map_or(None, |x| {
let cert: Option<PathBuf> = config.lookup("output.tls_cert").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_cert must be a path to a .pem file"),
))
});
let key: Option<PathBuf> = config.lookup("output.tls_key").map_or(None, |x| {
let key: Option<PathBuf> = config.lookup("output.tls_key").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_key must be a path to a .pem file"),
Expand All @@ -270,7 +270,7 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
x.as_bool()
.expect("output.tls_verify_peer must be a boolean")
});
let ca_file: Option<PathBuf> = config.lookup("output.tls_ca_file").map_or(None, |x| {
let ca_file: Option<PathBuf> = config.lookup("output.tls_ca_file").and_then(|x| {
Some(PathBuf::from(
x.as_str()
.expect("output.tls_ca_file must be a path to a file"),
Expand Down Expand Up @@ -319,7 +319,7 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
let mut connector_builder = SslConnectorBuilder::new(SslMethod::tls()).unwrap();
{
let mut ctx = connector_builder.builder_mut();
if verify_peer == false {
if !verify_peer {
ctx.set_verify(SSL_VERIFY_NONE);
} else {
ctx.set_verify_depth(TLS_VERIFY_DEPTH);
Expand All @@ -331,8 +331,8 @@ fn config_parse(config: &Config) -> (TlsConfig, u32) {
}
let mut opts =
SSL_OP_CIPHER_SERVER_PREFERENCE | SSL_OP_NO_SESSION_RESUMPTION_ON_RENEGOTIATION;
if compression == false {
opts = opts | SSL_OP_NO_COMPRESSION;
if !compression {
opts |= SSL_OP_NO_COMPRESSION;
}
ctx.set_options(opts);
set_fs(&mut ctx);
Expand Down
24 changes: 12 additions & 12 deletions src/flowgger/record_capnp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub mod record {
}

impl<'a> Reader<'a> {
pub fn borrow<'b>(&'b self) -> Reader<'b> {
pub fn borrow(&self) -> Reader {
Reader { ..*self }
}

Expand Down Expand Up @@ -213,10 +213,10 @@ pub mod record {
pub fn as_reader(self) -> Reader<'a> {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}
pub fn borrow<'b>(&'b mut self) -> Builder<'b> {
pub fn borrow(&mut self) -> Builder {
Builder { ..*self }
}
pub fn borrow_as_reader<'b>(&'b self) -> Reader<'b> {
pub fn borrow_as_reader(&self) -> Reader {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}

Expand Down Expand Up @@ -499,7 +499,7 @@ pub mod pair {
}

impl<'a> Reader<'a> {
pub fn borrow<'b>(&'b self) -> Reader<'b> {
pub fn borrow(&self) -> Reader {
Reader { ..*self }
}

Expand Down Expand Up @@ -577,10 +577,10 @@ pub mod pair {
pub fn as_reader(self) -> Reader<'a> {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}
pub fn borrow<'b>(&'b mut self) -> Builder<'b> {
pub fn borrow(&mut self) -> Builder {
Builder { ..*self }
}
pub fn borrow_as_reader<'b>(&'b self) -> Reader<'b> {
pub fn borrow_as_reader(&self) -> Reader {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}

Expand Down Expand Up @@ -613,9 +613,9 @@ pub mod pair {
self.builder.set_data_field::<u16>(0, 0);
self.builder.get_pointer_field(1).clear();
self.builder.set_bool_field(16, false);
self.builder.set_data_field::<f64>(1, 0u8 as f64);
self.builder.set_data_field::<i64>(1, 0u8 as i64);
self.builder.set_data_field::<u64>(1, 0u8 as u64);
self.builder.set_data_field::<f64>(1, 0f64);
self.builder.set_data_field::<i64>(1, 0i64);
self.builder.set_data_field::<u64>(1, 0u64);
::capnp::traits::FromStructBuilder::new(self.builder)
}
}
Expand Down Expand Up @@ -702,7 +702,7 @@ pub mod pair {
}

impl<'a> Reader<'a> {
pub fn borrow<'b>(&'b self) -> Reader<'b> {
pub fn borrow(&self) -> Reader {
Reader { ..*self }
}

Expand Down Expand Up @@ -801,10 +801,10 @@ pub mod pair {
pub fn as_reader(self) -> Reader<'a> {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}
pub fn borrow<'b>(&'b mut self) -> Builder<'b> {
pub fn borrow(&mut self) -> Builder {
Builder { ..*self }
}
pub fn borrow_as_reader<'b>(&'b self) -> Reader<'b> {
pub fn borrow_as_reader(&self) -> Reader {
::capnp::traits::FromStructReader::new(self.builder.as_reader())
}

Expand Down
2 changes: 1 addition & 1 deletion src/flowgger/splitter/nul_splitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<T: Read> Splitter<T> for NulSplitter {
};
if let Err(e) = handle_line(line, &tx, &decoder, &encoder) {
let line = line.trim();
if line.len() > 0 {
if !line.is_empty() {
let _ = writeln!(stderr(), "{}: [{}]", e, line.trim());
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/flowgger/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ impl PreciseTimestamp {
pub fn now() -> Self {
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
PreciseTimestamp {
ts: now.as_secs() as f64 + now.subsec_nanos() as f64 / 1e9,
ts: now.as_secs() as f64 + f64::from(now.subsec_nanos()) / 1e9,
}
}

#[inline]
pub fn from_datetime(tsd: DateTime<FixedOffset>) -> Self {
PreciseTimestamp {
ts: tsd.timestamp() as f64 + tsd.naive_utc().nanosecond() as f64 / 1e9,
ts: tsd.timestamp() as f64 + f64::from(tsd.naive_utc().nanosecond()) / 1e9,
}
}

Expand Down