Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add scheduled time per task #406

Merged
merged 11 commits into from
Apr 19, 2023
13 changes: 12 additions & 1 deletion console-api/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ message PollStats {
// Subtracting this timestamp from `created_at` can be used to calculate the
// time to first poll for this object, a measurement of executor latency.
optional google.protobuf.Timestamp first_poll = 3;
// The timestamp of the most recent time this object was woken.
//
// If this is `None`, the object has not yet been woken.
optional google.protobuf.Timestamp last_wake = 7;
// The timestamp of the most recent time this objects's poll method was invoked.
//
// If this is `None`, the object has not yet been polled.
Expand All @@ -180,8 +184,15 @@ message PollStats {
// all polls. Note that this includes only polls that have completed and is
// not reflecting any inprogress polls. Subtracting `busy_time` from the
// total lifetime of the polled object results in the amount of time it
// has spent *waiting* to be polled.
// has spent *waiting* to be polled (including `scheduled_time`).
hds marked this conversation as resolved.
Show resolved Hide resolved
google.protobuf.Duration busy_time = 6;
// The total duration this object was scheduled prior to being polled, summed
// across all poll cycles. Note that this includes only polls that have
// started and is not reflecting any scheduled state where the polling hasn't
// yet finished. Subtracting both `busy_time` and `scheduled_time` from the
// total lifetime of the polled object results in the amount of time it spent
// unable to progress because it was waiting on some resource.
google.protobuf.Duration scheduled_time = 8;
}

// State attributes of an entity. These are dependent on the type of the entity.
Expand Down
4 changes: 0 additions & 4 deletions console-api/proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ message Stats {
uint64 waker_clones = 4;
// The total number of times this task's waker has been dropped.
uint64 waker_drops = 5;
// The timestamp of the most recent time this task has been woken.
//
// If this is `None`, the task has not yet been woken.
optional google.protobuf.Timestamp last_wake = 6;
// Contains task poll statistics.
common.PollStats poll_stats = 7;
// The total number of times this task has woken itself.
Expand Down
15 changes: 14 additions & 1 deletion console-api/src/generated/rs.tokio.console.common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,11 @@ pub struct PollStats {
/// time to first poll for this object, a measurement of executor latency.
#[prost(message, optional, tag="3")]
pub first_poll: ::core::option::Option<::prost_types::Timestamp>,
/// The timestamp of the most recent time this object was woken.
///
/// If this is `None`, the object has not yet been woken.
#[prost(message, optional, tag="7")]
pub last_wake: ::core::option::Option<::prost_types::Timestamp>,
/// The timestamp of the most recent time this objects's poll method was invoked.
///
/// If this is `None`, the object has not yet been polled.
Expand All @@ -256,9 +261,17 @@ pub struct PollStats {
/// all polls. Note that this includes only polls that have completed and is
/// not reflecting any inprogress polls. Subtracting `busy_time` from the
/// total lifetime of the polled object results in the amount of time it
/// has spent *waiting* to be polled.
/// has spent *waiting* to be polled (including `scheduled_time`).
#[prost(message, optional, tag="6")]
pub busy_time: ::core::option::Option<::prost_types::Duration>,
/// The total duration this object was scheduled prior to being polled, summed
/// across all poll cycles. Note that this includes only polls that have
/// started and is not reflecting any scheduled state where the polling hasn't
/// yet finished. Subtracting both `busy_time` and `scheduled_time` from the
/// total lifetime of the polled object results in the amount of time it spent
/// unable to progress because it was waiting on some resource.
#[prost(message, optional, tag="8")]
pub scheduled_time: ::core::option::Option<::prost_types::Duration>,
}
/// State attributes of an entity. These are dependent on the type of the entity.
///
Expand Down
5 changes: 0 additions & 5 deletions console-api/src/generated/rs.tokio.console.tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,6 @@ pub struct Stats {
/// The total number of times this task's waker has been dropped.
#[prost(uint64, tag="5")]
pub waker_drops: u64,
/// The timestamp of the most recent time this task has been woken.
///
/// If this is `None`, the task has not yet been woken.
#[prost(message, optional, tag="6")]
pub last_wake: ::core::option::Option<::prost_types::Timestamp>,
/// Contains task poll statistics.
#[prost(message, optional, tag="7")]
pub poll_stats: ::core::option::Option<super::common::PollStats>,
Expand Down
73 changes: 73 additions & 0 deletions console-subscriber/examples/long_scheduled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! Long scheduled time
//!
//! This example shows an application with a task that has an excessive
//! time between being woken and being polled.
//!
//! It consists of a channel where a sender task sends a message
//! through the channel and then immediately does a lot of work
//! (simulated in this case by a call to `std::thread::sleep`).
//!
//! As soon as the sender task calls `send()` the receiver task gets
//! woken, but because there's only a single worker thread, it doesn't
//! get polled until after the sender task has finished "working" and
//! yields (via `tokio::time::sleep`).
hawkw marked this conversation as resolved.
Show resolved Hide resolved
use std::time::Duration;

use console_subscriber::ConsoleLayer;
use tokio::{sync::mpsc, task};
use tracing::info;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
ConsoleLayer::builder()
.with_default_env()
.publish_interval(Duration::from_millis(100))
.init();

let (tx, rx) = mpsc::channel::<u32>(1);
let count = 10000;

let jh_rx = task::Builder::new()
.name("rx")
.spawn(receiver(rx, count))
.unwrap();
let jh_tx = task::Builder::new()
.name("tx")
.spawn(sender(tx, count))
.unwrap();

let res_tx = jh_tx.await;
let res_rx = jh_rx.await;
info!(
"main: Joined sender: {:?} and receiver: {:?}",
res_tx, res_rx,
);

tokio::time::sleep(Duration::from_millis(200)).await;

Ok(())
}

async fn sender(tx: mpsc::Sender<u32>, count: u32) {
info!("tx: started");

for idx in 0..count {
let msg: u32 = idx;
let res = tx.send(msg).await;
info!("tx: sent msg '{}' result: {:?}", msg, res);

std::thread::sleep(Duration::from_millis(5000));
info!("tx: work done");

tokio::time::sleep(Duration::from_millis(100)).await;
}
}

async fn receiver(mut rx: mpsc::Receiver<u32>, count: u32) {
info!("rx: started");

for _ in 0..count {
let msg = rx.recv().await;
info!("rx: Received message: '{:?}'", msg);
}
}
94 changes: 67 additions & 27 deletions console-subscriber/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) struct TaskStats {
is_dropped: AtomicBool,
// task stats
pub(crate) created_at: Instant,
timestamps: Mutex<TaskTimestamps>,
dropped_at: Mutex<Option<Instant>>,

// waker stats
wakes: AtomicUsize,
Expand Down Expand Up @@ -100,12 +100,6 @@ pub(crate) struct ResourceStats {
pub(crate) parent_id: Option<Id>,
}

#[derive(Debug, Default)]
struct TaskTimestamps {
dropped_at: Option<Instant>,
last_wake: Option<Instant>,
}

#[derive(Debug, Default)]
struct PollStats<H> {
/// The number of polls in progress
Expand All @@ -118,9 +112,11 @@ struct PollStats<H> {
#[derive(Debug, Default)]
struct PollTimestamps<H> {
first_poll: Option<Instant>,
last_wake: Option<Instant>,
last_poll_started: Option<Instant>,
last_poll_ended: Option<Instant>,
busy_time: Duration,
scheduled_time: Duration,
histogram: H,
}

Expand Down Expand Up @@ -162,14 +158,16 @@ impl TaskStats {
is_dirty: AtomicBool::new(true),
is_dropped: AtomicBool::new(false),
created_at,
timestamps: Mutex::new(TaskTimestamps::default()),
dropped_at: Mutex::new(None),
poll_stats: PollStats {
timestamps: Mutex::new(PollTimestamps {
histogram: Histogram::new(poll_duration_max),
first_poll: None,
last_wake: None,
last_poll_started: None,
last_poll_ended: None,
busy_time: Duration::new(0, 0),
scheduled_time: Duration::new(0, 0),
}),
current_polls: AtomicUsize::new(0),
polls: AtomicUsize::new(0),
Expand Down Expand Up @@ -209,13 +207,14 @@ impl TaskStats {
}

fn wake(&self, at: Instant, self_wake: bool) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
self.wakes.fetch_add(1, Release);
self.poll_stats.wake(at);

self.wakes.fetch_add(1, Release);
if self_wake {
self.wakes.fetch_add(1, Release);
}

self.make_dirty();
}

pub(crate) fn start_poll(&self, at: Instant) {
Expand All @@ -235,8 +234,7 @@ impl TaskStats {
return;
}

let mut timestamps = self.timestamps.lock();
let _prev = timestamps.dropped_at.replace(dropped_at);
let _prev = self.dropped_at.lock().replace(dropped_at);
debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!");
self.make_dirty();
}
Expand All @@ -257,16 +255,14 @@ impl ToProto for TaskStats {

fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
let poll_stats = Some(self.poll_stats.to_proto(base_time));
let timestamps = self.timestamps.lock();
proto::tasks::Stats {
poll_stats,
created_at: Some(base_time.to_timestamp(self.created_at)),
dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)),
dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
wakes: self.wakes.load(Acquire) as u64,
waker_clones: self.waker_clones.load(Acquire) as u64,
self_wakes: self.self_wakes.load(Acquire) as u64,
waker_drops: self.waker_drops.load(Acquire) as u64,
last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
}
}
}
Expand All @@ -287,7 +283,7 @@ impl DroppedAt for TaskStats {
// avoid acquiring the lock if we know we haven't tried to drop this
// thing yet
if self.is_dropped.load(Acquire) {
return self.timestamps.lock().dropped_at;
return *self.dropped_at.lock();
}

None
Expand Down Expand Up @@ -466,18 +462,49 @@ impl ToProto for ResourceStats {
// === impl PollStats ===

impl<H: RecordPoll> PollStats<H> {
fn start_poll(&self, at: Instant) {
if self.current_polls.fetch_add(1, AcqRel) == 0 {
// We are starting the first poll
let mut timestamps = self.timestamps.lock();
if timestamps.first_poll.is_none() {
timestamps.first_poll = Some(at);
}
fn wake(&self, at: Instant) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
}

timestamps.last_poll_started = Some(at);
fn start_poll(&self, at: Instant) {
if self.current_polls.fetch_add(1, AcqRel) > 0 {
return;
}

self.polls.fetch_add(1, Release);
// We are starting the first poll
let mut timestamps = self.timestamps.lock();
if timestamps.first_poll.is_none() {
timestamps.first_poll = Some(at);
}

timestamps.last_poll_started = Some(at);

self.polls.fetch_add(1, Release);

let scheduled = match (timestamps.last_wake, timestamps.last_poll_ended) {
// If the last poll ended after the last wake then it was likely
// a self-wake, so we measure from the end of the last poll instead.
// This also ensures that `busy_time` and `scheduled_time` don't overlap.
(Some(last_wake), Some(last_poll_ended)) if last_poll_ended > last_wake => {
last_poll_ended
}
(Some(last_wake), _) => last_wake,
(None, _) => return, // Async operations record polls, but not wakes
};
hds marked this conversation as resolved.
Show resolved Hide resolved

let elapsed = match at.checked_duration_since(scheduled) {
Some(elapsed) => elapsed,
None => {
eprintln!(
"possible Instant clock skew detected: a poll's start timestamp \
was before the wake time/last poll end timestamp\nwake = {:?}\n start = {:?}",
scheduled, at
);
return;
}
};
timestamps.scheduled_time += elapsed;
}

fn end_poll(&self, at: Instant) {
Expand Down Expand Up @@ -526,6 +553,7 @@ impl<H> ToProto for PollStats<H> {
proto::PollStats {
polls: self.polls.load(Acquire) as u64,
first_poll: timestamps.first_poll.map(|at| base_time.to_timestamp(at)),
last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
last_poll_started: timestamps
.last_poll_started
.map(|at| base_time.to_timestamp(at)),
Expand All @@ -534,11 +562,23 @@ impl<H> ToProto for PollStats<H> {
.map(|at| base_time.to_timestamp(at)),
busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| {
eprintln!(
"failed to convert busy time to protobuf duration: {}",
"failed to convert `busy_time` to protobuf duration: {}",
error
);
Default::default()
})),
scheduled_time: Some(
timestamps
.scheduled_time
.try_into()
.unwrap_or_else(|error| {
eprintln!(
"failed to convert `scheduled_time` to protobuf duration: {}",
error
);
Default::default()
}),
),
}
}
}
Expand Down
Loading