Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[api] Allow customizing reqwest client #1414 #1415

Merged
merged 1 commit into from
Dec 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agdb_api/rust/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ impl ReqwestClient {
client: reqwest::Client::new(),
}
}

pub fn with_client(client: reqwest::Client) -> Self {
Self { client }
}
}

#[cfg(feature = "reqwest")]
Expand Down
14 changes: 9 additions & 5 deletions agdb_server/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ClusterNodeImpl {
address: &str,
token: &str,
responses: UnboundedSender<(Request<ClusterAction>, Response)>,
) -> Self {
) -> ServerResult<Self> {
let base = if address.starts_with("http") || address.starts_with("https") {
address.to_string()
} else {
Expand All @@ -91,15 +91,19 @@ impl ClusterNodeImpl {

let (requests_sender, requests_receiver) = tokio::sync::mpsc::unbounded_channel();

Self {
client: ReqwestClient::new(),
Ok(Self {
client: ReqwestClient::with_client(
reqwest::Client::builder()
.connect_timeout(Duration::from_secs(60))
.build()?,
),
url: format!("{base}api/v1/cluster"),
base_url: base.trim_end_matches("/").to_string(),
token: Some(token.to_string()),
requests_sender,
requests_receiver: RwLock::new(requests_receiver),
responses,
}
})
}

fn bad_request(message: &str) -> AxumResponse {
Expand Down Expand Up @@ -194,7 +198,7 @@ pub(crate) async fn new(config: &Config, db: &ServerDb, db_pool: &DbPool) -> Ser
node.as_str(),
&config.cluster_token,
requests.clone(),
)));
)?));
}

Some(RwLock::new(responses))
Expand Down
30 changes: 27 additions & 3 deletions agdb_server/tests/routes/cluster_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,35 @@ use agdb_api::DbUserRole;
use agdb_api::ReqwestClient;
use assert_cmd::cargo::CommandCargoExt;
use std::process::Command;
use std::time::Duration;

#[tokio::test]
async fn rebalance() -> anyhow::Result<()> {
let mut servers = create_cluster(3).await?;
let mut leader = AgdbApi::new(ReqwestClient::new(), &servers[0].address);
let mut leader = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&servers[0].address,
);
leader.user_login(ADMIN, ADMIN).await?;
leader.admin_shutdown().await?;
assert!(servers[0].process.wait()?.success());

let mut statuses = Vec::with_capacity(servers.len());

for server in &servers[1..] {
let status = wait_for_leader(&AgdbApi::new(ReqwestClient::new(), &server.address)).await?;
let status = wait_for_leader(&AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
))
.await?;
statuses.push(status);
}

Expand All @@ -43,7 +59,15 @@ async fn rebalance() -> anyhow::Result<()> {
statuses.clear();

for server in &servers {
let status = wait_for_leader(&AgdbApi::new(ReqwestClient::new(), &server.address)).await?;
let status = wait_for_leader(&AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
))
.await?;
statuses.push(status);
}

Expand Down
55 changes: 49 additions & 6 deletions agdb_server/tests/routes/misc_routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use reqwest::StatusCode;
use std::collections::HashMap;
use std::path::Path;
use std::process::Command;
use std::time::Duration;

#[tokio::test]
async fn missing() -> anyhow::Result<()> {
Expand Down Expand Up @@ -73,7 +74,14 @@ async fn openapi() -> anyhow::Result<()> {
#[tokio::test]
async fn config_reuse() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);
client.user_login(ADMIN, ADMIN).await?;
client.admin_shutdown().await?;
assert!(server.process.wait()?.success());
Expand All @@ -87,7 +95,14 @@ async fn config_reuse() -> anyhow::Result<()> {
#[tokio::test]
async fn db_list_after_shutdown() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -117,7 +132,14 @@ async fn db_list_after_shutdown() -> anyhow::Result<()> {
#[tokio::test]
async fn db_list_after_shutdown_corrupted_data() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -168,7 +190,14 @@ async fn basepath_test() -> anyhow::Result<()> {
#[tokio::test]
async fn location_change_after_restart() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -213,7 +242,14 @@ async fn location_change_after_restart() -> anyhow::Result<()> {
#[tokio::test]
async fn reset_admin_password() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);

{
client.user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -245,7 +281,14 @@ async fn reset_admin_password() -> anyhow::Result<()> {
#[tokio::test]
async fn memory_db_from_backup() -> anyhow::Result<()> {
let mut server = TestServerImpl::new().await?;
let mut client = AgdbApi::new(ReqwestClient::new(), &server.address);
let mut client = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);
let owner = "user1";
let db = "db1";

Expand Down
40 changes: 35 additions & 5 deletions agdb_server/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ impl TestServerImpl {
};

let mut process = Command::cargo_bin(BINARY)?.current_dir(&dir).spawn()?;
let api = AgdbApi::new(ReqwestClient::new(), &api_address);
let api = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&api_address,
);

for _ in 0..RETRY_ATTEMPS {
match api.status().await {
Expand Down Expand Up @@ -202,7 +209,14 @@ impl TestServer {
let server = server_guard.as_ref().unwrap();

Ok(Self {
api: AgdbApi::new(ReqwestClient::new(), &server.address),
api: AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
),
dir: server.dir.clone(),
data_dir: server.data_dir.clone(),
})
Expand Down Expand Up @@ -241,8 +255,17 @@ impl TestCluster {
.unwrap()
.0
.iter()
.map(|s| AgdbApi::new(ReqwestClient::new(), &s.address))
.collect(),
.map(|s| {
Ok(AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&s.address,
))
})
.collect::<anyhow::Result<Vec<AgdbApi<ReqwestClient>>>>()?,
};

cluster.apis[1].cluster_user_login(ADMIN, ADMIN).await?;
Expand Down Expand Up @@ -318,7 +341,14 @@ pub async fn create_cluster(nodes: usize) -> anyhow::Result<Vec<TestServerImpl>>
.map(|c| tokio::spawn(async move { TestServerImpl::with_config(c).await }))
{
let server = server.await??;
let api = AgdbApi::new(ReqwestClient::new(), &server.address);
let api = AgdbApi::new(
ReqwestClient::with_client(
reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()?,
),
&server.address,
);
servers.push((server, api));
}

Expand Down
Loading