Skip to content

Commit

Permalink
fix(torii): erc graphql image path & http ratelimiting (#3032)
Browse files Browse the repository at this point in the history
* fix(torii): erc graphql image path & http ratelimiting

* fmt
  • Loading branch information
Larkooo authored Feb 14, 2025
1 parent b7a4414 commit de7eddd
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 46 deletions.
10 changes: 3 additions & 7 deletions crates/torii/graphql/src/object/erc/erc_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,8 @@ impl ResolvableObject for TokenObject {
v.to_string().trim_matches('"').to_string()
});

let contract_address: String =
row.get("contract_address");
let image_path = format!("{}/image", contract_address);

let image_path =
format!("{}/image", id.replace(":", "/"));
(
metadata_str,
metadata_name,
Expand Down Expand Up @@ -501,9 +499,7 @@ fn create_token_metadata_from_row(row: &SqliteRow) -> sqlx::Result<ErcTokenType>
let metadata_attributes =
metadata.get("attributes").map(|v| v.to_string().trim_matches('"').to_string());

let contract_address: String = row.get("contract_address");
let image_path = format!("{}/image", contract_address);

let image_path = format!("{}/image", id.replace(":", "/"));
(metadata_str, metadata_name, metadata_description, metadata_attributes, image_path)
};

Expand Down
25 changes: 7 additions & 18 deletions crates/torii/server/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ use camino::Utf8PathBuf;
use data_url::mime::Mime;
use data_url::DataUrl;
use image::{DynamicImage, ImageFormat};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use tokio::fs;
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::broadcast::Receiver;
use torii_sqlite::constants::TOKENS_TABLE;
use torii_sqlite::utils::fetch_content_from_ipfs;
use torii_sqlite::utils::{fetch_content_from_http, fetch_content_from_ipfs};
use tracing::{debug, error, trace};
use warp::http::Response;
use warp::path::Tail;
Expand Down Expand Up @@ -59,10 +58,7 @@ async fn serve_static_file(

let token_id = format!("{}:{}", parts[0], parts[1]);
if !token_image_dir.exists() {
match fetch_and_process_image(&artifacts_dir, &token_id, pool)
.await
.context(format!("Failed to fetch and process image for token_id: {}", token_id))
{
match fetch_and_process_image(&artifacts_dir, &token_id, pool).await {
Ok(path) => path,
Err(e) => {
error!(error = %e, "Failed to fetch and process image for token_id: {}", token_id);
Expand Down Expand Up @@ -177,15 +173,8 @@ async fn fetch_and_process_image(
uri if uri.starts_with("http") || uri.starts_with("https") => {
debug!(image_uri = %uri, "Fetching image from http/https URL");
// Fetch image from HTTP/HTTPS URL
let client = Client::new();
let response = client
.get(uri)
.send()
.await
.context("Failed to fetch image from URL")?
.bytes()
.await
.context("Failed to read image bytes from response")?;
let response =
fetch_content_from_http(&uri).await.context("Failed to fetch image from URL")?;

// svg files typically start with <svg or <?xml
if response.starts_with(b"<svg") || response.starts_with(b"<?xml") {
Expand All @@ -195,7 +184,7 @@ async fn fetch_and_process_image(
format!("Unknown file format for token_id: {}, data: {:?}", token_id, &response)
})?;
ErcImageType::DynamicImage((
image::load_from_memory(&response)
image::load_from_memory_with_format(&response, format)
.context("Failed to load image from bytes")?,
format,
))
Expand All @@ -218,7 +207,7 @@ async fn fetch_and_process_image(
)
})?;
ErcImageType::DynamicImage((
image::load_from_memory(&response)
image::load_from_memory_with_format(&response, format)
.context("Failed to load image from bytes")?,
format,
))
Expand All @@ -239,7 +228,7 @@ async fn fetch_and_process_image(
let format = image::guess_format(&decoded.0)
.with_context(|| format!("Unknown file format for token_id: {}", token_id))?;
ErcImageType::DynamicImage((
image::load_from_memory(&decoded.0)
image::load_from_memory_with_format(&decoded.0, format)
.context("Failed to load image from bytes")?,
format,
))
Expand Down
2 changes: 1 addition & 1 deletion crates/torii/sqlite/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pub const TOKEN_TRANSFER_TABLE: &str = "token_transfers";
pub const TOKENS_TABLE: &str = "tokens";
pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const SQL_FELT_DELIMITER: &str = "/";
pub const REQ_MAX_RETRIES: u8 = 3;

pub const IPFS_URL: &str = "https://ipfs.io/ipfs/";
pub const IPFS_CLIENT_MAX_RETRY: u8 = 3;

pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001";
pub const IPFS_CLIENT_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA";
Expand Down
81 changes: 61 additions & 20 deletions crates/torii/sqlite/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri};
use once_cell::sync::Lazy;
use reqwest::Client;
use starknet::core::types::U256;
use starknet_crypto::Felt;
use tokio_util::bytes::Bytes;
use tracing::debug;

use crate::constants::{
IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME,
IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, REQ_MAX_RETRIES,
SQL_FELT_DELIMITER,
};

Expand Down Expand Up @@ -105,32 +107,71 @@ pub fn sanitize_json_string(s: &str) -> String {
result
}

// Global clients
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(90))
.build()
.expect("Failed to create HTTP client")
});

static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
IpfsClient::from_str(IPFS_CLIENT_URL)
.expect("Failed to create IPFS client")
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD)
});

const INITIAL_BACKOFF: Duration = Duration::from_millis(100);

/// Fetch content from HTTP URL with retries
pub async fn fetch_content_from_http(url: &str) -> Result<Bytes> {
let mut retries = 0;
let mut backoff = INITIAL_BACKOFF;

loop {
match HTTP_CLIENT.get(url).send().await {
Ok(response) => {
if !response.status().is_success() {
return Err(anyhow::anyhow!(
"HTTP request failed with status: {}",
response.status()
));
}
return response.bytes().await.map_err(Into::into);
}
Err(e) => {
if retries >= REQ_MAX_RETRIES {
return Err(anyhow::anyhow!("HTTP request failed: {}", e));
}
debug!(error = %e, retry = retries, "Request failed, retrying after backoff");
tokio::time::sleep(backoff).await;
retries += 1;
backoff *= 2;
}
}
}
}

/// Fetch content from IPFS with retries
pub async fn fetch_content_from_ipfs(cid: &str) -> Result<Bytes> {
let mut retries = IPFS_CLIENT_MAX_RETRY;
let client = IpfsClient::from_str(IPFS_CLIENT_URL)?
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD);
let mut retries = 0;
let mut backoff = INITIAL_BACKOFF;

while retries > 0 {
let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await;
match response {
loop {
match IPFS_CLIENT.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await {
Ok(stream) => return Ok(Bytes::from(stream)),
Err(e) => {
retries -= 1;
debug!(
error = %e,
remaining_attempts = retries,
cid = cid,
"Failed to fetch content from IPFS, retrying after delay"
);
tokio::time::sleep(Duration::from_secs(3)).await;
if retries >= REQ_MAX_RETRIES {
return Err(anyhow::anyhow!("IPFS request failed: {}", e));
}
debug!(error = %e, retry = retries, "Request failed, retrying after backoff");
tokio::time::sleep(backoff).await;
retries += 1;
backoff *= 2;
}
}
}

Err(anyhow::anyhow!(format!(
"Failed to pull data from IPFS after {} attempts, cid: {}",
IPFS_CLIENT_MAX_RETRY, cid
)))
}

// type used to do calculation on inmemory balances
Expand Down

0 comments on commit de7eddd

Please sign in to comment.