Skip to content

Commit

Permalink
Merge branch 'master' into feature/async-pagination-queries
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx authored Oct 14, 2024
2 parents d5d7934 + a35827d commit 981b690
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 32 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]
### Added
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Added a new CLI flag `graphql-number-of-threads` to limit the number of threads used by the GraphQL service. The default value is `2`, `0` enables the old behavior.

### Changed

Expand All @@ -13,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

#### Breaking
- [2341](https://github.com/FuelLabs/fuel-core/pull/2341): The maximum number of processed coins from the `coins_to_spend` query is limited to `max_inputs`.
- [2350](https://github.com/FuelLabs/fuel-core/pull/2350): Limited the number of threads used by the GraphQL service.

## [Version 0.39.0]

Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ impl Command {
let config = Config {
graphql_config: GraphQLConfig {
addr,
number_of_threads: graphql.graphql_number_of_threads,
max_queries_depth: graphql.graphql_max_depth,
max_queries_complexity: graphql.graphql_max_complexity,
max_queries_recursive_depth: graphql.graphql_max_recursive_depth,
Expand Down
4 changes: 4 additions & 0 deletions bin/fuel-core/src/cli/run/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub struct GraphQLArgs {
#[clap(long = "port", default_value = "4000", env)]
pub port: u16,

/// The number of threads to use for the GraphQL service.
#[clap(long = "graphql-number-of-threads", default_value = "2", env)]
pub graphql_number_of_threads: usize,

/// The max depth of GraphQL queries.
#[clap(long = "graphql-max-depth", default_value = "16", env)]
pub graphql_max_depth: usize,
Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod worker_service;
#[derive(Clone, Debug)]
pub struct ServiceConfig {
pub addr: SocketAddr,
pub number_of_threads: usize,
pub max_queries_depth: usize,
pub max_queries_complexity: usize,
pub max_queries_recursive_depth: usize,
Expand Down
39 changes: 26 additions & 13 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,8 @@ use axum::{
Json,
Router,
};
use fuel_core_metrics::futures::{
metered_future::MeteredFuture,
FuturesMetrics,
};
use fuel_core_services::{
AsyncProcessor,
RunnableService,
RunnableTask,
StateWatcher,
Expand All @@ -79,6 +76,7 @@ use std::{
TcpListener,
},
pin::Pin,
sync::Arc,
};
use tokio_stream::StreamExt;
use tower::limit::ConcurrencyLimitLayer;
Expand Down Expand Up @@ -115,6 +113,7 @@ pub struct GraphqlService {
pub struct ServerParams {
router: Router,
listener: TcpListener,
number_of_threads: usize,
}

pub struct Task {
Expand All @@ -124,7 +123,7 @@ pub struct Task {

#[derive(Clone)]
struct ExecutorWithMetrics {
metric: FuturesMetrics,
processor: Arc<AsyncProcessor>,
}

impl<F> Executor<F> for ExecutorWithMetrics
Expand All @@ -133,9 +132,7 @@ where
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
let future = MeteredFuture::new(fut, self.metric.clone());

tokio::task::spawn(future);
let _ = self.processor.try_spawn(fut);
}
}

Expand All @@ -159,14 +156,25 @@ impl RunnableService for GraphqlService {
params: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
let mut state = state.clone();
let ServerParams { router, listener } = params;
let metric = ExecutorWithMetrics {
metric: FuturesMetrics::obtain_futures_metrics("GraphQLFutures"),
let ServerParams {
router,
listener,
number_of_threads,
} = params;

let processor = AsyncProcessor::new(
"GraphQLFutures",
number_of_threads,
tokio::sync::Semaphore::MAX_PERMITS,
)?;

let executor = ExecutorWithMetrics {
processor: Arc::new(processor),
};

let server = axum::Server::from_tcp(listener)
.unwrap()
.executor(metric)
.executor(executor)
.serve(router.into_make_service())
.with_graceful_shutdown(async move {
state
Expand Down Expand Up @@ -228,6 +236,7 @@ where
let body_limit = config.config.request_body_bytes_limit;
let max_queries_resolver_recursive_depth =
config.config.max_queries_resolver_recursive_depth;
let number_of_threads = config.config.number_of_threads;

let schema = schema
.limit_complexity(config.config.max_queries_complexity)
Expand Down Expand Up @@ -292,7 +301,11 @@ where

Ok(Service::new_with_params(
GraphqlService { bound_address },
ServerParams { router, listener },
ServerParams {
router,
listener,
number_of_threads,
},
))
}

Expand Down
1 change: 1 addition & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl Config {
std::net::Ipv4Addr::new(127, 0, 0, 1).into(),
0,
),
number_of_threads: 0,
max_queries_depth: 16,
max_queries_complexity: 80000,
max_queries_recursive_depth: 16,
Expand Down
84 changes: 65 additions & 19 deletions crates/services/src/async_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tokio::{
OwnedSemaphorePermit,
Semaphore,
},
task::JoinHandle,
};

/// A processor that can execute async tasks with a limit on the number of tasks that can be
Expand Down Expand Up @@ -76,40 +77,50 @@ impl AsyncProcessor {
}

/// Spawn a task with a reservation.
pub fn spawn_reserved<F>(&self, reservation: AsyncReservation, future: F)
pub fn spawn_reserved<F>(
&self,
reservation: AsyncReservation,
future: F,
) -> JoinHandle<F::Output>
where
F: Future<Output = ()> + Send + 'static,
F: Future + Send + 'static,
F::Output: Send,
{
let permit = reservation.0;
let future = async move {
let permit = permit;
future.await;
drop(permit)
let result = future.await;
drop(permit);
result
};
let metered_future = MeteredFuture::new(future, self.metric.clone());
if let Some(runtime) = &self.thread_pool {
runtime.spawn(metered_future);
runtime.spawn(metered_future)
} else {
tokio::spawn(metered_future);
tokio::spawn(metered_future)
}
}

/// Tries to spawn a task. If the task cannot be spawned, returns an error.
pub fn try_spawn<F>(&self, future: F) -> Result<(), OutOfCapacity>
pub fn try_spawn<F>(&self, future: F) -> Result<JoinHandle<F::Output>, OutOfCapacity>
where
F: Future<Output = ()> + Send + 'static,
F: Future + Send + 'static,
F::Output: Send,
{
let reservation = self.reserve()?;
self.spawn_reserved(reservation, future);
Ok(())
Ok(self.spawn_reserved(reservation, future))
}
}

#[cfg(test)]
#[allow(clippy::bool_assert_comparison)]
#[allow(non_snake_case)]
mod tests {
use super::*;
use futures::future::join_all;
use std::{
collections::HashSet,
iter,
thread::sleep,
time::Duration,
};
Expand All @@ -129,11 +140,45 @@ mod tests {
});

// Then
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
sleep(Duration::from_secs(1));
receiver.try_recv().unwrap();
}

#[tokio::test]
async fn one_spawn_single_tasks_works__thread_id_is_different_than_main() {
// Given
let number_of_threads = 10;
let number_of_pending_tasks = 10000;
let heavy_task_processor =
AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks)
.unwrap();
let main_handler = tokio::spawn(async move { std::thread::current().id() });
let main_id = main_handler.await.unwrap();

// When
let futures = iter::repeat_with(|| {
heavy_task_processor
.try_spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
std::thread::current().id()
})
.unwrap()
})
.take(number_of_pending_tasks)
.collect::<Vec<_>>();

// Then
let thread_ids = join_all(futures).await;
let unique_thread_ids = thread_ids
.into_iter()
.map(|r| r.unwrap())
.collect::<HashSet<_>>();

assert!(!unique_thread_ids.contains(&main_id));
assert_eq!(unique_thread_ids.len(), number_of_threads);
}

#[test]
fn second_spawn_fails_when_limit_is_one_and_first_in_progress() {
// Given
Expand All @@ -143,15 +188,16 @@ mod tests {
let first_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});
assert_eq!(first_spawn_result, Ok(()));
first_spawn_result.expect("Expected Ok result");

// When
let second_spawn_result = heavy_task_processor.try_spawn(async move {
sleep(Duration::from_secs(1));
});

// Then
assert_eq!(second_spawn_result, Err(OutOfCapacity));
let err = second_spawn_result.expect_err("Expected Ok result");
assert_eq!(err, OutOfCapacity);
}

#[test]
Expand All @@ -166,7 +212,7 @@ mod tests {
sleep(Duration::from_secs(1));
sender.send(()).unwrap();
});
assert_eq!(first_spawn, Ok(()));
first_spawn.expect("Expected Ok result");
futures::executor::block_on(async move {
receiver.await.unwrap();
});
Expand All @@ -177,7 +223,7 @@ mod tests {
});

// Then
assert_eq!(second_spawn, Ok(()));
second_spawn.expect("Expected Ok result");
}

#[test]
Expand All @@ -194,7 +240,7 @@ mod tests {
});

// Then
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
}

Expand All @@ -217,7 +263,7 @@ mod tests {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down Expand Up @@ -249,7 +295,7 @@ mod tests {
sleep(Duration::from_secs(1));
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down Expand Up @@ -281,7 +327,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(1)).await;
broadcast_sender.send(()).unwrap();
});
assert_eq!(result, Ok(()));
result.expect("Expected Ok result");
}
drop(broadcast_sender);

Expand Down

0 comments on commit 981b690

Please sign in to comment.