Skip to content

Commit

Permalink
Update async processor tests (#2577)
Browse files Browse the repository at this point in the history
Closes #2449
Or at least, tries to :)

## Description
Summary of changes, each in separate commit for clarity. The change that
attempts to fix the actual flakiness reported in the issue in pt. 2
below:
1. Modify `one_spawn_single_tasks_works()` to not use `sleep`
2. Updates assertion in
`one_spawn_single_tasks_works__thread_id_is_different_than_main()`
* The original test expected the exact number of worker threads to be
spawned (`assert_eq!(unique_thread_ids.len(), number_of_threads);`),
however, the `AsyncProcessor` can potentially reuse threads if the tasks
finish quick enough.
* One way to reduce the flakiness would be to bump the sleep, but I
think the more clear way is to ensure that _some_ (but not too many)
worker threads have been spawned
* Especially because the aim of the test is not to ensure the number of
worker threads, but to check that main thread id was not used
3. Reduces the arbitrary number of tasks in
`executes_10_tasks_for_10_seconds_with_one_thread`, just to make the
test execute faster
4. Add some clarifying comments
5. Slightly update and rename the
`executes_10_tasks_for_2_seconds_with_10_thread()` and
`executes_10_tasks_for_2_seconds_with_1_thread()` - to use proper values
and better reflect the idea behind the test

### Before requesting review
- [X] I have reviewed the code myself
  • Loading branch information
rafal-ch authored Jan 22, 2025
1 parent b0773e0 commit 2031bea
Showing 1 changed file with 62 additions and 43 deletions.
105 changes: 62 additions & 43 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,46 +126,45 @@ mod tests {
};
use tokio::time::Instant;

#[test]
fn one_spawn_single_tasks_works() {
#[tokio::test]
async fn one_spawn_single_tasks_works() {
// Given
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

// When
let (sender, mut receiver) = tokio::sync::oneshot::channel();
let (sender, receiver) = tokio::sync::oneshot::channel();
let result = heavy_task_processor.try_spawn(async move {
sender.send(()).unwrap();
});

// Then
result.expect("Expected Ok result");
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
tokio::time::timeout(Duration::from_secs(5), receiver)
.await
.unwrap()
.unwrap();
}

#[tokio::test]
async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
// Given
let number_of_threads = 10;
let number_of_pending_tasks = 10000;
const MAX_NUMBER_OF_THREADS: usize = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10000;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", MAX_NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();
let main_handler = tokio::spawn(async move { std::thread::current().id() });
let main_id = main_handler.await.unwrap();

// When
let futures = iter::repeat_with(|| {
heavy_task_processor
.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
std::thread::current().id()
})
.try_spawn(async move { std::thread::current().id() })
.unwrap()
})
.take(number_of_pending_tasks)
.take(NUMBER_OF_PENDING_TASKS)
.collect::<Vec<_>>();

// Then
Expand All @@ -175,16 +174,20 @@ mod tests {
.map(|r| r.unwrap())
.collect::<HashSet<_>>();

// Main thread was not used.
assert!(!unique_thread_ids.contains(&main_id));
assert_eq!(unique_thread_ids.len(), number_of_threads);
// There's been at least one worker thread used.
assert!(!unique_thread_ids.is_empty());
// There were no more worker threads above the threshold.
assert!(unique_thread_ids.len() <= MAX_NUMBER_OF_THREADS);
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
Expand All @@ -196,15 +199,15 @@ mod tests {
});

// Then
let err = second_spawn_result.expect_err("Expected Ok result");
let err = second_spawn_result.expect_err("Should error");
assert_eq!(err, OutOfCapacity);
}

#[test]
fn second_spawn_works_when_first_is_finished() {
let number_of_pending_tasks = 1;
const NUMBER_OF_PENDING_TASKS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

// Given
let (sender, receiver) = tokio::sync::oneshot::channel();
Expand All @@ -229,11 +232,11 @@ mod tests {
#[test]
fn can_spawn_10_tasks_when_limit_is_10() {
// Given
let number_of_pending_tasks = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap();
AsyncProcessor::new("Test", 1, NUMBER_OF_PENDING_TASKS).unwrap();

for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
// When
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -245,19 +248,19 @@ mod tests {
}

#[tokio::test]
async fn executes_10_tasks_for_10_seconds_with_one_thread() {
async fn executes_5_tasks_for_5_seconds_with_one_thread() {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 1;
const NUMBER_OF_PENDING_TASKS: usize = 5;
const NUMBER_OF_THREADS: usize = 1;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
Expand All @@ -269,29 +272,36 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() >= Duration::from_secs(10));
// 5 tasks running on 1 thread, each task taking 1 second,
// should complete in approximately 5 seconds overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() < Duration::from_secs(5) + LEEWAY);
// Make sure that the tasks were not executed in parallel.
assert!(instant.elapsed() >= Duration::from_secs(5));
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
assert_eq!(duration.as_secs(), 10);
assert_eq!(duration.as_secs(), 5);
let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get());
assert_eq!(duration.as_secs(), 0);
}

#[tokio::test]
async fn executes_10_tasks_for_2_seconds_with_10_thread() {
async fn executes_10_blocking_tasks_for_1_second_with_10_threads__records_busy_time()
{
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
const NUMBER_OF_THREADS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
Expand All @@ -303,7 +313,11 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
// 10 blocking tasks running on 10 threads, each task taking 1 second,
// should complete in approximately 1 second overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
Expand All @@ -313,19 +327,20 @@ mod tests {
}

#[tokio::test]
async fn executes_10_tasks_for_2_seconds_with_1_thread() {
async fn executes_10_non_blocking_tasks_for_1_second_with_10_threads__records_idle_time(
) {
// Given
let number_of_pending_tasks = 10;
let number_of_threads = 10;
const NUMBER_OF_PENDING_TASKS: usize = 10;
const NUMBER_OF_THREADS: usize = 10;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
AsyncProcessor::new("Test", NUMBER_OF_THREADS, NUMBER_OF_PENDING_TASKS)
.unwrap();

// When
let (broadcast_sender, mut broadcast_receiver) =
tokio::sync::broadcast::channel(1024);
let instant = Instant::now();
for _ in 0..number_of_pending_tasks {
for _ in 0..NUMBER_OF_PENDING_TASKS {
let broadcast_sender = broadcast_sender.clone();
let result = heavy_task_processor.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
Expand All @@ -337,7 +352,11 @@ mod tests {

// Then
while broadcast_receiver.recv().await.is_ok() {}
assert!(instant.elapsed() <= Duration::from_secs(2));
// 10 non-blocking tasks running on 10 threads, each task taking 1 second,
// should complete in approximately 1 second overall.
// Allowing some LEEWAY to account for runtime overhead.
const LEEWAY: Duration = Duration::from_millis(300);
assert!(instant.elapsed() <= Duration::from_secs(1) + LEEWAY);
// Wait for the metrics to be updated.
tokio::time::sleep(Duration::from_secs(1)).await;
let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get());
Expand Down

0 comments on commit 2031bea

Please sign in to comment.