Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limited the number of threads used by the GraphQL service #2350

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Added a new CLI flag `graphql-number-of-threads` to limit the number of threads used by the GraphQL service. The default value is `2`, `0` enables the old behavior.

### Changed

- [2334](https://github.com/FuelLabs/fuel-core/pull/2334): Prepare the GraphQL service for the switching to `async` methods.
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Limited the number of threads used by the GraphQL service.

## [Version 0.39.0]

Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ impl Command {
let config = Config {
graphql_config: GraphQLConfig {
addr,
number_of_threads: graphql.graphql_number_of_threads,
max_queries_depth: graphql.graphql_max_depth,
max_queries_complexity: graphql.graphql_max_complexity,
max_queries_recursive_depth: graphql.graphql_max_recursive_depth,
Expand Down
4 changes: 4 additions & 0 deletions bin/fuel-core/src/cli/run/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub struct GraphQLArgs {
#[clap(long = "port", default_value = "4000", env)]
pub port: u16,

/// The number of threads to use for the GraphQL service.
#[clap(long = "graphql-number-of-threads", default_value = "2", env)]
pub graphql_number_of_threads: usize,

/// The max depth of GraphQL queries.
#[clap(long = "graphql-max-depth", default_value = "16", env)]
pub graphql_max_depth: usize,
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod worker_service;
#[derive(Clone, Debug)]
pub struct ServiceConfig {
pub addr: SocketAddr,
pub number_of_threads: usize,
pub max_queries_depth: usize,
pub max_queries_complexity: usize,
pub max_queries_recursive_depth: usize,
Expand Down
39 changes: 26 additions & 13 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ use axum::{
Json,
Router,
};
use fuel_core_metrics::futures::{
metered_future::MeteredFuture,
FuturesMetrics,
};
use fuel_core_services::{
AsyncProcessor,
RunnableService,
RunnableTask,
StateWatcher,
Expand All @@ -79,6 +76,7 @@ use std::{
TcpListener,
},
pin::Pin,
sync::Arc,
};
use tokio_stream::StreamExt;
use tower::limit::ConcurrencyLimitLayer;
Expand Down Expand Up @@ -115,6 +113,7 @@ pub struct GraphqlService {
pub struct ServerParams {
router: Router,
listener: TcpListener,
number_of_threads: usize,
}

pub struct Task {
Expand All @@ -124,7 +123,7 @@ pub struct Task {

#[derive(Clone)]
struct ExecutorWithMetrics {
metric: FuturesMetrics,
processor: Arc<AsyncProcessor>,
}

impl<F> Executor<F> for ExecutorWithMetrics
Expand All @@ -133,9 +132,7 @@ where
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
let future = MeteredFuture::new(fut, self.metric.clone());

tokio::task::spawn(future);
let _ = self.processor.try_spawn(fut);
}
}

Expand All @@ -159,14 +156,25 @@ impl RunnableService for GraphqlService {
params: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let mut state = state.clone();
let ServerParams { router, listener } = params;
let metric = ExecutorWithMetrics {
metric: FuturesMetrics::obtain_futures_metrics("GraphQLFutures"),
let ServerParams {
router,
listener,
number_of_threads,
} = params;

let processor = AsyncProcessor::new(
"GraphQLFutures",
number_of_threads,
tokio::sync::Semaphore::MAX_PERMITS,
)?;

let executor = ExecutorWithMetrics {
processor: Arc::new(processor),
};

let server = axum::Server::from_tcp(listener)
.unwrap()
.executor(metric)
.executor(executor)
.serve(router.into_make_service())
.with_graceful_shutdown(async move {
state
Expand Down Expand Up @@ -228,6 +236,7 @@ where
let body_limit = config.config.request_body_bytes_limit;
let max_queries_resolver_recursive_depth =
config.config.max_queries_resolver_recursive_depth;
let number_of_threads = config.config.number_of_threads;

let schema = schema
.limit_complexity(config.config.max_queries_complexity)
Expand Down Expand Up @@ -292,7 +301,11 @@ where

Ok(Service::new_with_params(
GraphqlService { bound_address },
ServerParams { router, listener },
ServerParams {
router,
listener,
number_of_threads,
},
))
}

Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl Config {
std::net::Ipv4Addr::new(127, 0, 0, 1).into(),
0,
),
number_of_threads: 0,
max_queries_depth: 16,
max_queries_complexity: 80000,
max_queries_recursive_depth: 16,
Expand Down
84 changes: 65 additions & 19 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
OwnedSemaphorePermit,
Semaphore,
},
task::JoinHandle,
};

/// A processor that can execute async tasks with a limit on the number of tasks that can be
Expand Down Expand Up @@ -76,40 +77,50 @@ impl AsyncProcessor {
}

/// Spawn a task with a reservation.
pub fn spawn_reserved<F>(&self, reservation: AsyncReservation, future: F)
pub fn spawn_reserved<F>(
&self,
reservation: AsyncReservation,
future: F,
) -> JoinHandle<F::Output>
where
F: Future<Output = ()> + Send + 'static,
F: Future + Send + 'static,
F::Output: Send,
{
let permit = reservation.0;
let future = async move {
let permit = permit;
future.await;
drop(permit)
let result = future.await;
drop(permit);
result
};
let metered_future = MeteredFuture::new(future, self.metric.clone());
if let Some(runtime) = &self.thread_pool {
runtime.spawn(metered_future);
runtime.spawn(metered_future)
} else {
tokio::spawn(metered_future);
tokio::spawn(metered_future)
}
}

/// Tries to spawn a task. If the task cannot be spawned, returns an error.
pub fn try_spawn<F>(&self, future: F) -> Result<(), OutOfCapacity>
pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
where
F: Future<Output = ()> + Send + 'static,
F: Future + Send + 'static,
F::Output: Send,
{
let reservation = self.reserve()?;
self.spawn_reserved(reservation, future);
Ok(())
Ok(self.spawn_reserved(reservation, future))
}
}

#[cfg(test)]
#[allow(clippy::bool_assert_comparison)]
#[allow(non_snake_case)]
mod tests {
use super::*;
use futures::future::join_all;
use std::{
collections::HashSet,
iter,
thread::sleep,
time::Duration,
};
Expand All @@ -129,11 +140,45 @@ mod tests {
});

// Then
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
}

#[tokio::test]
async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
// Given
let number_of_threads = 10;
let number_of_pending_tasks = 10000;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
.unwrap();
let main_handler = tokio::spawn(async move { std::thread::current().id() });
let main_id = main_handler.await.unwrap();

// When
let futures = iter::repeat_with(|| {
heavy_task_processor
.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
std::thread::current().id()
})
.unwrap()
})
.take(number_of_pending_tasks)
.collect::<Vec<_>>();

// Then
let thread_ids = join_all(futures).await;
let unique_thread_ids = thread_ids
.into_iter()
.map(|r| r.unwrap())
.collect::<HashSet<_>>();

assert!(!unique_thread_ids.contains(&main_id));
assert_eq!(unique_thread_ids.len(), number_of_threads);
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
Expand All @@ -143,15 +188,16 @@ mod tests {
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
assert_eq!(first_spawn_result, Ok(()));
first_spawn_result.expect("Expected Ok result");

// When
let second_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});

// Then
assert_eq!(second_spawn_result, Err(OutOfCapacity));
let err = second_spawn_result.expect_err("Expected Ok result");
assert_eq!(err, OutOfCapacity);
}

#[test]
Expand All @@ -166,7 +212,7 @@ mod tests {
sleep(Duration::from_secs(1));
sender.send(()).unwrap();
});
assert_eq!(first_spawn, Ok(()));
first_spawn.expect("Expected Ok result");
futures::executor::block_on(async move {
receiver.await.unwrap();
});
Expand All @@ -177,7 +223,7 @@ mod tests {
});

// Then
assert_eq!(second_spawn, Ok(()));
second_spawn.expect("Expected Ok result");
}

#[test]
Expand All @@ -194,7 +240,7 @@ mod tests {
});

// Then
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
}

Expand All @@ -217,7 +263,7 @@ mod tests {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down Expand Up @@ -249,7 +295,7 @@ mod tests {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down Expand Up @@ -281,7 +327,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(1)).await;
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down
Loading