Skip to content

Commit

Permalink
fix(torii): http metadata ratelimiting & image static
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Feb 12, 2025
1 parent f0c1e0b commit 7b8380e
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 53 deletions.
2 changes: 1 addition & 1 deletion crates/torii/sqlite/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pub const TOKEN_TRANSFER_TABLE: &str = "token_transfers";
pub const TOKENS_TABLE: &str = "tokens";
pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const SQL_FELT_DELIMITER: &str = "/";
pub const REQ_MAX_RETRIES: u8 = 3;

pub const IPFS_URL: &str = "https://ipfs.io/ipfs/";
pub const IPFS_CLIENT_MAX_RETRY: u8 = 3;

pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001";
pub const IPFS_CLIENT_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA";
Expand Down
48 changes: 17 additions & 31 deletions crates/torii/sqlite/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use anyhow::{Context, Result};
use cainome::cairo_serde::{ByteArray, CairoSerde};
use data_url::mime::Mime;
use data_url::DataUrl;
use reqwest::Client;
use starknet::core::types::{BlockId, BlockTag, FunctionCall, U256};
use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;
Expand All @@ -18,8 +17,7 @@ use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::types::{ContractType, Token, TokenBalance};
use crate::utils::{
felt_to_sql_string, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256,
u256_to_sql_string, I256,
felt_to_sql_string, fetch_content_from_http, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256, u256_to_sql_string, I256
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -76,7 +74,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
apply_balance_diff.block_id,
)
.await
.with_context(|| "Failed to apply balance diff in apply_cache_diff")?;
.context("Failed to apply balance diff in apply_cache_diff")?;
}
ContractType::ERC20 => {
// account_address/contract_address/ => ERC20
Expand All @@ -95,7 +93,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
apply_balance_diff.block_id,
)
.await
.with_context(|| "Failed to apply balance diff in apply_cache_diff")?;
.context("Failed to apply balance diff in apply_cache_diff")?;
}
}
}
Expand Down Expand Up @@ -143,7 +141,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
block_id,
)
.await
.with_context(|| format!("Failed to fetch balance for id: {}", id))?;
.context(format!("Failed to fetch balance for id: {}", id))?;

let current_balance =
cainome::cairo_serde::U256::cairo_deserialize(&current_balance, 0).unwrap();
Expand Down Expand Up @@ -255,6 +253,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
warn!(
contract_address = format!("{:#x}", register_erc721_token.contract_address),
token_id = %register_erc721_token.actual_token_id,
token_uri = %token_uri,
"Error fetching metadata, empty metadata will be used instead.",
);
"".to_string()
Expand All @@ -268,39 +267,25 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
// given a uri which can be either http/https url or data uri, fetch the metadata erc721
// metadata json schema
pub async fn fetch_metadata(token_uri: &str) -> Result<serde_json::Value> {
// Parse the token_uri

match token_uri {
uri if uri.starts_with("http") || uri.starts_with("https") => {
// Fetch metadata from HTTP/HTTPS URL
debug!(token_uri = %token_uri, "Fetching metadata from http/https URL");
let client = Client::new();
let response = client
.get(token_uri)
.send()
let bytes = fetch_content_from_http(token_uri)
.await
.context("Failed to fetch metadata from URL")?;

let bytes = response.bytes().await.context("Failed to read response bytes")?;
let json: serde_json::Value = serde_json::from_slice(&bytes)
.context(format!("Failed to parse metadata JSON from response: {:?}", bytes))?;
.context(format!("Failed to fetch metadata from URL: {}", token_uri))?;

Ok(json)
serde_json::from_slice(&bytes)
.context(format!("Failed to parse metadata JSON from response: {:?}", bytes))
}
uri if uri.starts_with("ipfs") => {
let cid = uri.strip_prefix("ipfs://").unwrap();
debug!(cid = %cid, "Fetching metadata from IPFS");
let response = fetch_content_from_ipfs(cid)
let bytes = fetch_content_from_ipfs(cid)
.await
.context("Failed to fetch metadata from IPFS")?;

let json: serde_json::Value =
serde_json::from_slice(&response).context(format!(
"Failed to parse metadata JSON from IPFS: {:?}, data: {:?}",
cid, &response
))?;

Ok(json)
serde_json::from_slice(&bytes)
.context(format!("Failed to parse metadata JSON from IPFS: {:?}, data: {:?}", cid, bytes))
}
uri if uri.starts_with("data") => {
// Parse and decode data URI
Expand All @@ -326,9 +311,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
let sanitized_json = sanitize_json_string(&decoded_str);

let json: serde_json::Value =
serde_json::from_str(&sanitized_json).with_context(|| {
format!("Failed to parse metadata JSON from data URI: {}", &uri)
})?;
serde_json::from_str(&sanitized_json).context(format!(
"Failed to parse metadata JSON from data URI: {}",
&uri
))?;

Ok(json)
}
Expand All @@ -354,7 +340,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
let token = query
.fetch_optional(&mut *self.transaction)
.await
.with_context(|| format!("Failed to execute721Token query: {:?}", result))?;
.context(format!("Failed to execute721Token query: {:?}", result))?;

if let Some(token) = token {
self.publish_queue.push(BrokerMessage::TokenRegistered(token));
Expand Down
80 changes: 59 additions & 21 deletions crates/torii/sqlite/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri};
use once_cell::sync::Lazy;
use reqwest::Client;
use starknet::core::types::U256;
use starknet_crypto::Felt;
use tokio_util::bytes::Bytes;
use tracing::warn;
use tracing::debug;

use crate::constants::{
IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME,
IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, REQ_MAX_RETRIES,
SQL_FELT_DELIMITER,
};

Expand Down Expand Up @@ -105,32 +107,68 @@ pub fn sanitize_json_string(s: &str) -> String {
result
}

// Global clients
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(90))
.build()
.expect("Failed to create HTTP client")
});

static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
IpfsClient::from_str(IPFS_CLIENT_URL)
.expect("Failed to create IPFS client")
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD)
});

const INITIAL_BACKOFF: Duration = Duration::from_millis(100);

/// Fetch content from HTTP URL with retries
pub async fn fetch_content_from_http(url: &str) -> Result<Bytes> {
let mut retries = 0;
let mut backoff = INITIAL_BACKOFF;

loop {
match HTTP_CLIENT.get(url).send().await {
Ok(response) => {
if !response.status().is_success() {
return Err(anyhow::anyhow!("HTTP request failed with status: {}", response.status()));
}
return response.bytes().await.map_err(Into::into);
}
Err(e) => {
if retries >= REQ_MAX_RETRIES {
return Err(anyhow::anyhow!("HTTP request failed: {}", e));
}
debug!(error = %e, retry = retries, "Request failed, retrying after backoff");
tokio::time::sleep(backoff).await;
retries += 1;
backoff *= 2;
}
}
}
}

/// Fetch content from IPFS with retries
pub async fn fetch_content_from_ipfs(cid: &str) -> Result<Bytes> {
let mut retries = IPFS_CLIENT_MAX_RETRY;
let client = IpfsClient::from_str(IPFS_CLIENT_URL)?
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD);
let mut retries = 0;
let mut backoff = INITIAL_BACKOFF;

while retries > 0 {
let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await;
match response {
loop {
match IPFS_CLIENT.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await {
Ok(stream) => return Ok(Bytes::from(stream)),
Err(e) => {
retries -= 1;
warn!(
error = %e,
remaining_attempts = retries,
cid = cid,
"Failed to fetch content from IPFS, retrying after delay"
);
tokio::time::sleep(Duration::from_secs(3)).await;
if retries >= REQ_MAX_RETRIES {
return Err(anyhow::anyhow!("IPFS request failed: {}", e));
}
debug!(error = %e, retry = retries, "Request failed, retrying after backoff");
tokio::time::sleep(backoff).await;
retries += 1;
backoff *= 2;
}
}
}

Err(anyhow::anyhow!(format!(
"Failed to pull data from IPFS after {} attempts, cid: {}",
IPFS_CLIENT_MAX_RETRY, cid
)))
}

// type used to do calculation on inmemory balances
Expand Down

0 comments on commit 7b8380e

Please sign in to comment.