Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii-erc): add avif support #3011

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/torii/sqlite/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ pub const TOKEN_TRANSFER_TABLE: &str = "token_transfers";
pub const TOKENS_TABLE: &str = "tokens";
pub const WORLD_CONTRACT_TYPE: &str = "WORLD";
pub const SQL_FELT_DELIMITER: &str = "/";
pub const REQ_MAX_RETRIES: u8 = 3;

pub const IPFS_URL: &str = "https://ipfs.io/ipfs/";
pub const IPFS_CLIENT_MAX_RETRY: u8 = 3;

pub const IPFS_CLIENT_URL: &str = "https://ipfs.infura.io:5001";
pub const IPFS_CLIENT_USERNAME: &str = "2EBrzr7ZASQZKH32sl2xWauXPSA";
Expand Down
48 changes: 17 additions & 31 deletions crates/torii/sqlite/src/executor/erc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use anyhow::{Context, Result};
use cainome::cairo_serde::{ByteArray, CairoSerde};
use data_url::mime::Mime;
use data_url::DataUrl;
use reqwest::Client;
use starknet::core::types::{BlockId, BlockTag, FunctionCall, U256};
use starknet::core::utils::{get_selector_from_name, parse_cairo_short_string};
use starknet::providers::Provider;
Expand All @@ -18,8 +17,7 @@ use crate::executor::LOG_TARGET;
use crate::simple_broker::SimpleBroker;
use crate::types::{ContractType, Token, TokenBalance};
use crate::utils::{
felt_to_sql_string, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256,
u256_to_sql_string, I256,
felt_to_sql_string, fetch_content_from_http, fetch_content_from_ipfs, sanitize_json_string, sql_string_to_u256, u256_to_sql_string, I256
};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -76,7 +74,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
apply_balance_diff.block_id,
)
.await
.with_context(|| "Failed to apply balance diff in apply_cache_diff")?;
.context("Failed to apply balance diff in apply_cache_diff")?;
}
ContractType::ERC20 => {
// account_address/contract_address/ => ERC20
Expand All @@ -95,7 +93,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
apply_balance_diff.block_id,
)
.await
.with_context(|| "Failed to apply balance diff in apply_cache_diff")?;
.context("Failed to apply balance diff in apply_cache_diff")?;
}
}
}
Expand Down Expand Up @@ -143,7 +141,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
block_id,
)
.await
.with_context(|| format!("Failed to fetch balance for id: {}", id))?;
.context(format!("Failed to fetch balance for id: {}", id))?;

let current_balance =
cainome::cairo_serde::U256::cairo_deserialize(&current_balance, 0).unwrap();
Expand Down Expand Up @@ -255,6 +253,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
warn!(
contract_address = format!("{:#x}", register_erc721_token.contract_address),
token_id = %register_erc721_token.actual_token_id,
token_uri = %token_uri,
"Error fetching metadata, empty metadata will be used instead.",
);
"".to_string()
Expand All @@ -268,39 +267,25 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
// given a uri which can be either http/https url or data uri, fetch the metadata erc721
// metadata json schema
pub async fn fetch_metadata(token_uri: &str) -> Result<serde_json::Value> {
// Parse the token_uri

match token_uri {
uri if uri.starts_with("http") || uri.starts_with("https") => {
// Fetch metadata from HTTP/HTTPS URL
debug!(token_uri = %token_uri, "Fetching metadata from http/https URL");
let client = Client::new();
let response = client
.get(token_uri)
.send()
let bytes = fetch_content_from_http(token_uri)
.await
.context("Failed to fetch metadata from URL")?;

let bytes = response.bytes().await.context("Failed to read response bytes")?;
let json: serde_json::Value = serde_json::from_slice(&bytes)
.context(format!("Failed to parse metadata JSON from response: {:?}", bytes))?;
.context(format!("Failed to fetch metadata from URL: {}", token_uri))?;

Ok(json)
serde_json::from_slice(&bytes)
.context(format!("Failed to parse metadata JSON from response: {:?}", bytes))
}
uri if uri.starts_with("ipfs") => {
let cid = uri.strip_prefix("ipfs://").unwrap();
debug!(cid = %cid, "Fetching metadata from IPFS");
let response = fetch_content_from_ipfs(cid)
let bytes = fetch_content_from_ipfs(cid)
.await
.context("Failed to fetch metadata from IPFS")?;

let json: serde_json::Value =
serde_json::from_slice(&response).context(format!(
"Failed to parse metadata JSON from IPFS: {:?}, data: {:?}",
cid, &response
))?;

Ok(json)
serde_json::from_slice(&bytes)
.context(format!("Failed to parse metadata JSON from IPFS: {:?}, data: {:?}", cid, bytes))
}
uri if uri.starts_with("data") => {
// Parse and decode data URI
Expand All @@ -326,9 +311,10 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
let sanitized_json = sanitize_json_string(&decoded_str);

let json: serde_json::Value =
serde_json::from_str(&sanitized_json).with_context(|| {
format!("Failed to parse metadata JSON from data URI: {}", &uri)
})?;
serde_json::from_str(&sanitized_json).context(format!(
"Failed to parse metadata JSON from data URI: {}",
&uri
))?;

Ok(json)
}
Expand All @@ -354,7 +340,7 @@ impl<'c, P: Provider + Sync + Send + 'static> Executor<'c, P> {
let token = query
.fetch_optional(&mut *self.transaction)
.await
.with_context(|| format!("Failed to execute721Token query: {:?}", result))?;
.context(format!("Failed to execute721Token query: {:?}", result))?;

if let Some(token) = token {
self.publish_queue.push(BrokerMessage::TokenRegistered(token));
Expand Down
80 changes: 59 additions & 21 deletions crates/torii/sqlite/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ use anyhow::Result;
use chrono::{DateTime, Utc};
use futures_util::TryStreamExt;
use ipfs_api_backend_hyper::{IpfsApi, IpfsClient, TryFromUri};
use once_cell::sync::Lazy;
use reqwest::Client;
use starknet::core::types::U256;
use starknet_crypto::Felt;
use tokio_util::bytes::Bytes;
use tracing::warn;
use tracing::debug;

use crate::constants::{
IPFS_CLIENT_MAX_RETRY, IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME,
IPFS_CLIENT_PASSWORD, IPFS_CLIENT_URL, IPFS_CLIENT_USERNAME, REQ_MAX_RETRIES,
SQL_FELT_DELIMITER,
};

Expand Down Expand Up @@ -105,32 +107,68 @@ pub fn sanitize_json_string(s: &str) -> String {
result
}

// Global clients
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
Client::builder()
.timeout(Duration::from_secs(10))
.pool_idle_timeout(Duration::from_secs(90))
.build()
.expect("Failed to create HTTP client")
});

static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
IpfsClient::from_str(IPFS_CLIENT_URL)
.expect("Failed to create IPFS client")
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD)
});
Comment on lines +119 to +123
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei, storing credentials as constants is risky.
Hardcoding credentials can lead to accidental exposure and security vulnerabilities. Consider moving these credentials to environment variables or another secure store.

-static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
-    IpfsClient::from_str(IPFS_CLIENT_URL)
-        .expect("Failed to create IPFS client")
-        .with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD)
-});
+// Example approach using environment variables
+static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
+    let url = std::env::var("IPFS_CLIENT_URL").expect("Missing IPFS_CLIENT_URL env var");
+    let username = std::env::var("IPFS_CLIENT_USERNAME").unwrap_or_default();
+    let password = std::env::var("IPFS_CLIENT_PASSWORD").unwrap_or_default();
+
+    IpfsClient::from_str(&url)
+        .expect("Failed to create IPFS client")
+        .with_credentials(&username, &password)
+});
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
IpfsClient::from_str(IPFS_CLIENT_URL)
.expect("Failed to create IPFS client")
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD)
});
// Example approach using environment variables
static IPFS_CLIENT: Lazy<IpfsClient> = Lazy::new(|| {
let url = std::env::var("IPFS_CLIENT_URL").expect("Missing IPFS_CLIENT_URL env var");
let username = std::env::var("IPFS_CLIENT_USERNAME").unwrap_or_default();
let password = std::env::var("IPFS_CLIENT_PASSWORD").unwrap_or_default();
IpfsClient::from_str(&url)
.expect("Failed to create IPFS client")
.with_credentials(&username, &password)
});


const INITIAL_BACKOFF: Duration = Duration::from_millis(100);

/// Fetch content from HTTP URL with retries
pub async fn fetch_content_from_http(url: &str) -> Result<Bytes> {
let mut retries = 0;
let mut backoff = INITIAL_BACKOFF;

loop {
match HTTP_CLIENT.get(url).send().await {
Ok(response) => {
if !response.status().is_success() {
return Err(anyhow::anyhow!("HTTP request failed with status: {}", response.status()));
}
return response.bytes().await.map_err(Into::into);
}
Err(e) => {
if retries >= REQ_MAX_RETRIES {
return Err(anyhow::anyhow!("HTTP request failed: {}", e));
}
debug!(error = %e, retry = retries, "Request failed, retrying after backoff");
tokio::time::sleep(backoff).await;
retries += 1;
backoff *= 2;
}
}
}
}

/// Fetch content from IPFS with retries
pub async fn fetch_content_from_ipfs(cid: &str) -> Result<Bytes> {
let mut retries = IPFS_CLIENT_MAX_RETRY;
let client = IpfsClient::from_str(IPFS_CLIENT_URL)?
.with_credentials(IPFS_CLIENT_USERNAME, IPFS_CLIENT_PASSWORD);
let mut retries = 0;
let mut backoff = INITIAL_BACKOFF;

while retries > 0 {
let response = client.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await;
match response {
loop {
match IPFS_CLIENT.cat(cid).map_ok(|chunk| chunk.to_vec()).try_concat().await {
Ok(stream) => return Ok(Bytes::from(stream)),
Err(e) => {
retries -= 1;
warn!(
error = %e,
remaining_attempts = retries,
cid = cid,
"Failed to fetch content from IPFS, retrying after delay"
);
tokio::time::sleep(Duration::from_secs(3)).await;
if retries >= REQ_MAX_RETRIES {
return Err(anyhow::anyhow!("IPFS request failed: {}", e));
}
debug!(error = %e, retry = retries, "Request failed, retrying after backoff");
tokio::time::sleep(backoff).await;
retries += 1;
backoff *= 2;
}
}
}

Err(anyhow::anyhow!(format!(
"Failed to pull data from IPFS after {} attempts, cid: {}",
IPFS_CLIENT_MAX_RETRY, cid
)))
}

// type used to do calculation on inmemory balances
Expand Down
Loading