Skip to content

Commit

Permalink
adding worker pool in crawler crate (#18)
Browse files Browse the repository at this point in the history
- adjusting the original way of one task with one thread
- using fixed-size workers in a pool to consume tasks from one provider
  • Loading branch information
tommady authored May 19, 2022
1 parent c9f69ea commit 1937501
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 116 deletions.
12 changes: 5 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ impl Default for Config {
#[serde(default)]
pub(crate) struct Crawler {
pub(crate) targets: Vec<Target>,
pub(crate) worker_n: usize,
pub(crate) frequency_ms: u64,
}

impl Default for Crawler {
fn default() -> Self {
Crawler {
worker_n: 3,
frequency_ms: 15000,
targets: vec![Target {
host_addr: "http://127.0.0.1:26657".to_string(),
task_name: TaskName::NetworkFunctional,
frequency_ms: 15000,
registry: None,
extra_opts: None,
}],
Expand Down Expand Up @@ -102,7 +105,6 @@ pub(crate) enum ExtraOpts {
pub(crate) struct Target {
pub(crate) host_addr: String,
pub(crate) task_name: TaskName,
pub(crate) frequency_ms: u64,
pub(crate) registry: Option<Registry>,
pub(crate) extra_opts: Option<ExtraOpts>,
}
Expand All @@ -111,16 +113,13 @@ impl Hash for Target {
fn hash<H: Hasher>(&self, state: &mut H) {
self.host_addr.hash(state);
self.task_name.hash(state);
self.frequency_ms.hash(state);
self.extra_opts.hash(state);
}
}

impl PartialEq for Target {
fn eq(&self, other: &Self) -> bool {
self.host_addr == other.host_addr
&& self.task_name == other.task_name
&& self.frequency_ms == other.frequency_ms
self.host_addr == other.host_addr && self.task_name == other.task_name
}
}

Expand Down Expand Up @@ -175,7 +174,6 @@ mod tests {
want.crawler.targets.push(Target {
host_addr: "https://somewhere.com/metrics:443".to_string(),
task_name: TaskName::NetworkFunctional,
frequency_ms: 1000,
extra_opts: None,
registry: Some(Registry {
prefix: "findora_exporter".to_string(),
Expand Down
272 changes: 169 additions & 103 deletions src/crawler.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
mpsc::channel,
Arc, Mutex,
},
thread,
time::Duration,
{thread, thread::JoinHandle},
};

use crate::{
config::{ExtraOpts, TaskName},
metrics::Metric,
utils::calculate_hash,
};

Expand All @@ -17,148 +19,212 @@ use log::error;
use prometheus::core::Atomic;

/// A collection of Workers for managing easily.
pub(crate) struct Crawler<T: Atomic> {
workers: Vec<Arc<RwLock<Worker<T>>>>,
pub(crate) struct Crawler {
workers: Vec<Option<thread::JoinHandle<()>>>,
done: Arc<AtomicBool>,
}

impl<T> Crawler<T>
where
T: Atomic + 'static,
{
/// Returns a Crawler instance.
///
/// This new method will not execute anything but only returns a Crawler instance.
/// Every Target in the config structure will be applied to a Worker structure.
pub(crate) fn new(
impl Crawler {
/// Returns a Crawler instance and
/// Spawned
/// 1. a thread to push tasks into a mpsc queue.
/// 2. N threads of worker to consume tasks from the mpsc queue.
pub(crate) fn new<T>(
cfg: &crate::config::Crawler,
metrics: Arc<crate::metrics::Metrics<T>>,
) -> Self {
let mut workers = vec![];
) -> Result<Self>
where
T: Atomic + 'static,
{
let mut workers = Vec::with_capacity(cfg.worker_n + 1);
let mut tasks = Vec::with_capacity(cfg.targets.len());
let done = Arc::new(AtomicBool::new(false));
let (tx, rx) = channel();
let rx = Arc::new(Mutex::new(rx));

for target in &cfg.targets {
let metric = metrics
.get_metric(calculate_hash(target))
.get_metric(calculate_hash(&target))
.expect("get_metric failed");

let task = match target.task_name {
TaskName::ConsensusPower => {
Task::new("consensus_power", crate::tasks::consensus_power)
}
TaskName::NetworkFunctional => {
Task::new("network_functional", crate::tasks::network_functional)
}
TaskName::ConsensusPower => Task::new(
"consensus_power".to_string(),
target.host_addr.clone(),
metric,
target.extra_opts.clone(),
crate::tasks::consensus_power,
),
TaskName::NetworkFunctional => Task::new(
"network_functional".to_string(),
target.host_addr.clone(),
metric,
target.extra_opts.clone(),
crate::tasks::network_functional,
),
TaskName::TotalCountOfValidators => Task::new(
"total_count_of_validators",
"total_count_of_validators".to_string(),
target.host_addr.clone(),
metric,
target.extra_opts.clone(),
crate::tasks::total_count_of_validators,
),
TaskName::TotalBalanceOfRelayers => Task::new(
"total_balance_of_relayers",
"total_balance_of_relayers".to_string(),
target.host_addr.clone(),
metric,
target.extra_opts.clone(),
crate::tasks::total_balance_of_relayers,
),
TaskName::BridgedBalance => {
Task::new("bridged_balance", crate::tasks::bridged_balance)
}
TaskName::BridgedSupply => {
Task::new("bridged_supply", crate::tasks::bridged_supply)
}
TaskName::BridgedBalance => Task::new(
"bridged_balance".to_string(),
target.host_addr.clone(),
metric,
target.extra_opts.clone(),
crate::tasks::bridged_balance,
),
TaskName::BridgedSupply => Task::new(
"bridged_supply".to_string(),
target.host_addr.clone(),
metric,
target.extra_opts.clone(),
crate::tasks::bridged_supply,
),
};

workers.push(Arc::new(RwLock::new(Worker::new(target, metric, task))));
tasks.push(Arc::new(task));
}

let freq = Duration::from_millis(cfg.frequency_ms);
let tx_done = done.clone();
workers.push(Some(
thread::Builder::new()
.name("task pusher".to_string())
.spawn(move || {
while !tx_done.load(Ordering::SeqCst) {
for task in tasks.clone() {
if let Err(e) = tx.send(task.clone()) {
error!(
"task pusher sending task:{}, addr:{} failed:{}",
task.name, task.addr, e
);
}
}
thread::sleep(freq);
}
})
.context("spawning task pusher thread failed")?,
));

for id in 0..cfg.worker_n {
let rx = rx.clone();
let name = format!("worker{}", id);
let rx_done = done.clone();
workers.push(Some(
thread::Builder::new()
.name(name.clone())
.spawn(move || {
while !rx_done.load(Ordering::SeqCst) {
match rx.lock() {
Ok(r) => match r.recv() {
Ok(task) => {
drop(r);
task.execute()
}
Err(e) => error!("{} recv failed:{}", name, e),
},
Err(e) => error!("{} lock rx failed:{}", name, e),
}
}
})
.context("spawning worker thread failed")?,
));
}

Crawler { workers }
Ok(Crawler { workers, done })
}

/// Signaling workers to stop working.
pub(crate) fn close(&self) {
for worker in &self.workers {
worker.write().unwrap().close();
}
}
pub(crate) fn close(&mut self) {
self.done.store(true, Ordering::SeqCst);

/// Spawned a thread to start running each worker.
pub(crate) fn run(&self) -> Result<JoinHandle<()>> {
let workers = self.workers.clone();
thread::Builder::new()
.name("crawler_thread".into())
.spawn(move || {
for worker in &workers {
worker.write().unwrap().run();
}
})
.context("crawler thread run failed")
for worker in self.workers.iter_mut() {
if let Some(w) = worker.take() {
let _ = w.join();
}
}
}
}

struct Worker<T: Atomic> {
#[derive(Clone)]
struct Task<T: Atomic> {
name: String,
addr: String,
extra_opts: Option<ExtraOpts>,
freq: Duration,
task: Arc<Task<T>>,
task_thread: Option<thread::JoinHandle<()>>,
done: Arc<AtomicBool>,
metric: Arc<crate::metrics::Metric<T>>,
metric: Arc<Metric<T>>,
option: Option<ExtraOpts>,
f: fn(&str, &Option<ExtraOpts>) -> Result<<T as Atomic>::T>,
}

impl<T> Worker<T>
impl<T> Task<T>
where
T: Atomic + 'static,
{
fn new(
cfg: &crate::config::Target,
metric: Arc<crate::metrics::Metric<T>>,
task: Task<T>,
name: String,
addr: String,
metric: Arc<Metric<T>>,
option: Option<ExtraOpts>,
f: fn(&str, &Option<ExtraOpts>) -> Result<<T as Atomic>::T>,
) -> Self {
Worker {
addr: cfg.host_addr.clone(),
extra_opts: cfg.extra_opts.clone(),
freq: Duration::from_millis(cfg.frequency_ms),
task: Arc::new(task),
done: Arc::new(AtomicBool::new(false)),
Task {
name,
addr,
metric,
task_thread: None,
option,
f,
}
}

fn close(&mut self) {
self.done.store(true, Ordering::SeqCst);
if let Some(t) = self.task_thread.take() {
let _ = t.join();
fn execute(&self) {
match (self.f)(&self.addr, &self.option) {
Ok(v) => self.metric.set(v),
Err(e) => error!(
"task:{}, addr:{}, option:{:?}, err:{}",
self.name, self.addr, self.option, e
),
}
}

fn run(&mut self) {
let addr = self.addr.clone();
let done = self.done.clone();
let freq = self.freq;
let metric = self.metric.clone();
let task = self.task.clone();
let extra_opts = self.extra_opts.clone();

self.task_thread = Some(thread::spawn(move || {
while !done.load(Ordering::SeqCst) {
match (task.f)(&addr, &extra_opts) {
Ok(v) => metric.set(v),
Err(e) => error!("{} failed: {:?}", task.name, e),
}
thread::sleep(freq);
}
}))
}
}

struct Task<T: Atomic> {
name: &'static str,
f: fn(&str, &Option<ExtraOpts>) -> Result<<T as Atomic>::T>,
}

impl<T> Task<T>
where
T: Atomic,
{
fn new(
name: &'static str,
f: fn(&str, &Option<ExtraOpts>) -> Result<<T as Atomic>::T>,
) -> Self {
Task { name, f }
#[cfg(test)]
mod tests {
use super::*;
use crate::{
config::{Crawler as CrawlerConfig, Target as TargetConfig, TaskName},
metrics::Metrics,
};
use prometheus::core::AtomicU64;
use std::{thread::sleep, time::Duration};

#[test]
fn test_crawler_should_worked() {
let cfg = CrawlerConfig {
targets: vec![TargetConfig {
host_addr: "https://prod-mainnet.prod.findora.org:26657".to_string(),
task_name: TaskName::TotalCountOfValidators,
registry: None,
extra_opts: None,
}],
worker_n: 1,
frequency_ms: 300,
};
let m = Arc::new(Metrics::<AtomicU64>::new(&cfg).unwrap());
let mut c = Crawler::new(&cfg, m.clone()).unwrap();
sleep(Duration::from_secs(1));
c.close();

let got = m.gather();
assert_eq!(1, got.len());
assert_ne!(0.0, got[0].get_metric()[0].get_gauge().get_value());
}
}
Loading

0 comments on commit 1937501

Please sign in to comment.