diff --git a/Cargo.lock b/Cargo.lock index c07a009fa9..232195d103 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15320,6 +15320,7 @@ dependencies = [ "camino", "chrono", "clap", + "clap_complete", "clap_config", "ctrlc", "dojo-metrics", diff --git a/bin/torii/Cargo.toml b/bin/torii/Cargo.toml index 2e652a89ca..efa4339e0a 100644 --- a/bin/torii/Cargo.toml +++ b/bin/torii/Cargo.toml @@ -12,6 +12,7 @@ base64.workspace = true camino.workspace = true chrono.workspace = true clap.workspace = true +clap_complete.workspace = true ctrlc = { version = "3.4", features = [ "termination" ] } dojo-metrics.workspace = true dojo-types.workspace = true diff --git a/bin/torii/src/cli.rs b/bin/torii/src/cli.rs new file mode 100644 index 0000000000..1ebc62d347 --- /dev/null +++ b/bin/torii/src/cli.rs @@ -0,0 +1,14 @@ +//! CLI for Torii. +//! +//! Use a `Cli` struct to parse the CLI arguments +//! and to have flexibility in the future to add more commands +//! that may not start Torii directly. +use clap::Parser; +use torii_cli::ToriiArgs; + +#[derive(Parser)] +#[command(name = "torii", author, version, about, long_about = None)] +pub struct Cli { + #[command(flatten)] + pub args: ToriiArgs, +} diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index 0131f28fa0..6634b964e2 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -18,6 +18,7 @@ use std::time::Duration; use camino::Utf8PathBuf; use clap::Parser; +use cli::Cli; use dojo_metrics::exporters::prometheus::PrometheusRecorder; use dojo_world::contracts::world::WorldContractReader; use sqlx::sqlite::{ @@ -30,7 +31,6 @@ use tempfile::{NamedTempFile, TempDir}; use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; -use torii_cli::ToriiArgs; use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors}; use torii_core::executor::Executor; use torii_core::processors::store_transaction::StoreTransactionProcessor; @@ -46,9 +46,11 @@ use url::{form_urlencoded, Url}; pub(crate) const LOG_TARGET: &str = "torii::cli"; +mod cli; + #[tokio::main] async fn main() -> anyhow::Result<()> { - let mut args = ToriiArgs::parse().with_config_file()?; + let mut args = Cli::parse().args.with_config_file()?; let world_address = if let Some(world_address) = args.world_address { world_address @@ -56,7 +58,6 @@ async fn main() -> anyhow::Result<()> { return Err(anyhow::anyhow!("Please specify a world address.")); }; - // let mut contracts = parse_erc_contracts(&args.contracts)?; args.indexing.contracts.push(Contract { address: world_address, r#type: ContractType::WORLD }); let filter_layer = EnvFilter::try_from_default_env() diff --git a/crates/katana/cli/src/options.rs b/crates/katana/cli/src/options.rs index 9e9795207e..64dc2ad8a7 100644 --- a/crates/katana/cli/src/options.rs +++ b/crates/katana/cli/src/options.rs @@ -55,6 +55,7 @@ pub struct MetricsOptions { pub metrics_port: u16, } +#[cfg(feature = "server")] impl Default for MetricsOptions { fn default() -> Self { MetricsOptions { @@ -366,6 +367,7 @@ fn default_max_connections() -> u32 { DEFAULT_RPC_MAX_CONNECTIONS } +#[cfg(feature = "server")] fn default_page_size() -> u64 { DEFAULT_RPC_MAX_EVENT_PAGE_SIZE } diff --git a/crates/katana/monitoring/docker-compose.yml b/crates/katana/monitoring/docker-compose.yml index 4f012b0a08..09eada50b6 100644 --- a/crates/katana/monitoring/docker-compose.yml +++ b/crates/katana/monitoring/docker-compose.yml @@ -10,8 +10,8 @@ services: command: > katana --db-dir .katana/data-dir - --metrics 0.0.0.0:9001 - --host 0.0.0.0 --port 5050 + --http.addr 0.0.0.0 --http.port 5050 + --metrics --metrics.addr 0.0.0.0 --metrics.port 9001 prometheus: restart: unless-stopped diff --git a/crates/torii/cli/src/args.rs b/crates/torii/cli/src/args.rs index ce0b1bbf3f..13c43655b0 100644 --- a/crates/torii/cli/src/args.rs +++ b/crates/torii/cli/src/args.rs @@ -14,7 +14,7 @@ pub const DEFAULT_RPC_URL: &str = "http://0.0.0.0:5050"; /// Dojo World Indexer #[derive(Parser, Debug, Clone, serde::Serialize, serde::Deserialize)] -#[command(name = "torii", author, version, about, long_about = None)] +#[command(name = "torii", author, about, long_about = None)] #[command(next_help_heading = "Torii general options")] pub struct ToriiArgs { /// The world to index diff --git a/crates/torii/cli/src/options.rs b/crates/torii/cli/src/options.rs index f8d482c8f6..697ac5f623 100644 --- a/crates/torii/cli/src/options.rs +++ b/crates/torii/cli/src/options.rs @@ -3,6 +3,7 @@ use std::str::FromStr; use anyhow::Context; use clap::ArgAction; +use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; use starknet::core::types::Felt; use torii_core::types::{Contract, ContractType}; @@ -141,6 +142,7 @@ pub struct IndexingOptions { help = "ERC contract addresses to index. You may only specify ERC20 or ERC721 contracts." )] #[serde(deserialize_with = "deserialize_contracts")] + #[serde(serialize_with = "serialize_contracts")] #[serde(default)] pub contracts: Vec, @@ -327,6 +329,19 @@ where contracts.iter().map(|s| parse_erc_contract(s).map_err(serde::de::Error::custom)).collect() } +fn serialize_contracts(contracts: &Vec, serializer: S) -> Result +where + S: serde::Serializer, +{ + let mut seq = serializer.serialize_seq(Some(contracts.len()))?; + + for contract in contracts { + seq.serialize_element(&contract.to_string())?; + } + + seq.end() +} + // ** Default functions to setup serde of the configuration file ** fn default_http_addr() -> IpAddr { DEFAULT_HTTP_ADDR diff --git a/crates/torii/client/src/client/mod.rs b/crates/torii/client/src/client/mod.rs index 391cad9ebb..7fa96c17ab 100644 --- a/crates/torii/client/src/client/mod.rs +++ b/crates/torii/client/src/client/mod.rs @@ -11,9 +11,12 @@ use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; use tokio::sync::RwLock as AsyncRwLock; use torii_grpc::client::{EntityUpdateStreaming, EventUpdateStreaming, IndexerUpdateStreaming}; -use torii_grpc::proto::world::{RetrieveEntitiesResponse, RetrieveEventsResponse}; +use torii_grpc::proto::world::{ + RetrieveEntitiesResponse, RetrieveEventsResponse, RetrieveTokenBalancesResponse, + RetrieveTokensResponse, +}; use torii_grpc::types::schema::Entity; -use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query}; +use torii_grpc::types::{EntityKeysClause, Event, EventQuery, Query, Token, TokenBalance}; use torii_relay::client::EventLoop; use torii_relay::types::Message; @@ -85,6 +88,26 @@ impl Client { self.metadata.read() } + /// Retrieves tokens matching contract addresses. + pub async fn tokens(&self, contract_addresses: Vec) -> Result, Error> { + let mut grpc_client = self.inner.write().await; + let RetrieveTokensResponse { tokens } = + grpc_client.retrieve_tokens(contract_addresses).await?; + Ok(tokens.into_iter().map(TryInto::try_into).collect::, _>>()?) + } + + /// Retrieves token balances for account addresses and contract addresses. + pub async fn token_balances( + &self, + account_addresses: Vec, + contract_addresses: Vec, + ) -> Result, Error> { + let mut grpc_client = self.inner.write().await; + let RetrieveTokenBalancesResponse { balances } = + grpc_client.retrieve_token_balances(account_addresses, contract_addresses).await?; + Ok(balances.into_iter().map(TryInto::try_into).collect::, _>>()?) + } + /// Retrieves entities matching query parameter. /// /// The query param includes an optional clause for filtering. Without clause, it fetches ALL diff --git a/crates/torii/core/src/types.rs b/crates/torii/core/src/types.rs index be120e50ad..fef378f162 100644 --- a/crates/torii/core/src/types.rs +++ b/crates/torii/core/src/types.rs @@ -121,6 +121,28 @@ pub struct Event { pub executed_at: DateTime, pub created_at: DateTime, } + +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Token { + pub id: String, + pub contract_address: String, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub metadata: String, +} + +#[derive(FromRow, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct TokenBalance { + pub id: String, + pub balance: String, + pub account_address: String, + pub contract_address: String, + pub token_id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq)] pub struct Contract { pub address: Felt, @@ -134,6 +156,12 @@ pub enum ContractType { ERC721, } +impl std::fmt::Display for Contract { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{:#x}", self.r#type, self.address) + } +} + impl FromStr for ContractType { type Err = anyhow::Error; diff --git a/crates/torii/grpc/proto/types.proto b/crates/torii/grpc/proto/types.proto index 6b2abb2ba1..d474065f63 100644 --- a/crates/torii/grpc/proto/types.proto +++ b/crates/torii/grpc/proto/types.proto @@ -149,4 +149,19 @@ enum ComparisonOperator { GTE = 3; LT = 4; LTE = 5; +} + +message Token { + string contract_address = 2; + string name = 3; + string symbol = 4; + uint32 decimals = 5; + string metadata = 6; +} + +message TokenBalance { + string balance = 1; + string account_address = 2; + string contract_address = 3; + string token_id = 4; } \ No newline at end of file diff --git a/crates/torii/grpc/proto/world.proto b/crates/torii/grpc/proto/world.proto index 4898c44b8e..2c7e7b1270 100644 --- a/crates/torii/grpc/proto/world.proto +++ b/crates/torii/grpc/proto/world.proto @@ -42,6 +42,36 @@ service World { // Subscribe to events rpc SubscribeEvents (SubscribeEventsRequest) returns (stream SubscribeEventsResponse); + + // Retrieve tokens + rpc RetrieveTokens (RetrieveTokensRequest) returns (RetrieveTokensResponse); + + // Retrieve token balances + rpc RetrieveTokenBalances (RetrieveTokenBalancesRequest) returns (RetrieveTokenBalancesResponse); +} + +// A request to retrieve tokens +message RetrieveTokensRequest { + // The list of contract addresses to retrieve tokens for + repeated bytes contract_addresses = 1; +} + +// A response containing tokens +message RetrieveTokensResponse { + repeated types.Token tokens = 1; +} + +// A request to retrieve token balances +message RetrieveTokenBalancesRequest { + // The account addresses to retrieve balances for + repeated bytes account_addresses = 1; + // The list of token contract addresses to retrieve balances for + repeated bytes contract_addresses = 2; +} + +// A response containing token balances +message RetrieveTokenBalancesResponse { + repeated types.TokenBalance balances = 1; } // A request to subscribe to indexer updates. diff --git a/crates/torii/grpc/src/client.rs b/crates/torii/grpc/src/client.rs index 3cbd3cbe75..f24d5f5f7e 100644 --- a/crates/torii/grpc/src/client.rs +++ b/crates/torii/grpc/src/client.rs @@ -11,11 +11,13 @@ use tonic::transport::Endpoint; use crate::proto::world::{ world_client, RetrieveEntitiesRequest, RetrieveEntitiesResponse, RetrieveEventMessagesRequest, - RetrieveEventsRequest, RetrieveEventsResponse, SubscribeEntitiesRequest, - SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsRequest, - SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, - SubscribeModelsRequest, SubscribeModelsResponse, UpdateEntitiesSubscriptionRequest, - UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, + RetrieveEventsRequest, RetrieveEventsResponse, RetrieveTokenBalancesRequest, + RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, + SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, + SubscribeEventsRequest, SubscribeEventsResponse, SubscribeIndexerRequest, + SubscribeIndexerResponse, SubscribeModelsRequest, SubscribeModelsResponse, + UpdateEntitiesSubscriptionRequest, UpdateEventMessagesSubscriptionRequest, + WorldMetadataRequest, }; use crate::types::schema::{Entity, SchemaError}; use crate::types::{EntityKeysClause, Event, EventQuery, IndexerUpdate, ModelKeysClause, Query}; @@ -90,6 +92,43 @@ impl WorldClient { .and_then(|metadata| metadata.try_into().map_err(Error::ParseStr)) } + pub async fn retrieve_tokens( + &mut self, + contract_addresses: Vec, + ) -> Result { + self.inner + .retrieve_tokens(RetrieveTokensRequest { + contract_addresses: contract_addresses + .into_iter() + .map(|c| c.to_bytes_be().to_vec()) + .collect(), + }) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner()) + } + + pub async fn retrieve_token_balances( + &mut self, + account_addresses: Vec, + contract_addresses: Vec, + ) -> Result { + self.inner + .retrieve_token_balances(RetrieveTokenBalancesRequest { + account_addresses: account_addresses + .into_iter() + .map(|a| a.to_bytes_be().to_vec()) + .collect(), + contract_addresses: contract_addresses + .into_iter() + .map(|c| c.to_bytes_be().to_vec()) + .collect(), + }) + .await + .map_err(Error::Grpc) + .map(|res| res.into_inner()) + } + pub async fn retrieve_entities( &mut self, query: Query, diff --git a/crates/torii/grpc/src/server/mod.rs b/crates/torii/grpc/src/server/mod.rs index 858c4c523c..a0dee77df9 100644 --- a/crates/torii/grpc/src/server/mod.rs +++ b/crates/torii/grpc/src/server/mod.rs @@ -43,6 +43,7 @@ use torii_core::error::{Error, ParseError, QueryError}; use torii_core::model::{build_sql_query, map_row_to_ty}; use torii_core::sql::cache::ModelCache; use torii_core::sql::utils::sql_string_to_felts; +use torii_core::types::{Token, TokenBalance}; use tower_http::cors::{AllowOrigin, CorsLayer}; use self::subscriptions::entity::EntityManager; @@ -53,10 +54,11 @@ use crate::proto::types::member_value::ValueType; use crate::proto::types::LogicalOperator; use crate::proto::world::world_server::WorldServer; use crate::proto::world::{ - RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, SubscribeEntitiesRequest, - SubscribeEntityResponse, SubscribeEventMessagesRequest, SubscribeEventsResponse, - SubscribeIndexerRequest, SubscribeIndexerResponse, UpdateEventMessagesSubscriptionRequest, - WorldMetadataRequest, WorldMetadataResponse, + RetrieveEntitiesStreamingResponse, RetrieveEventMessagesRequest, RetrieveTokenBalancesRequest, + RetrieveTokenBalancesResponse, RetrieveTokensRequest, RetrieveTokensResponse, + SubscribeEntitiesRequest, SubscribeEntityResponse, SubscribeEventMessagesRequest, + SubscribeEventsResponse, SubscribeIndexerRequest, SubscribeIndexerResponse, + UpdateEventMessagesSubscriptionRequest, WorldMetadataRequest, WorldMetadataResponse, }; use crate::proto::{self}; use crate::types::schema::SchemaError; @@ -87,6 +89,29 @@ impl From for Error { } } +impl From for proto::types::Token { + fn from(value: Token) -> Self { + Self { + contract_address: value.contract_address, + name: value.name, + symbol: value.symbol, + decimals: value.decimals as u32, + metadata: value.metadata, + } + } +} + +impl From for proto::types::TokenBalance { + fn from(value: TokenBalance) -> Self { + Self { + balance: value.balance, + account_address: value.account_address, + contract_address: value.contract_address, + token_id: value.token_id, + } + } +} + #[derive(Debug, Clone)] pub struct DojoWorld { pool: Pool, @@ -789,6 +814,74 @@ impl DojoWorld { }) } + async fn retrieve_tokens( + &self, + contract_addresses: Vec, + ) -> Result { + let query = if contract_addresses.is_empty() { + "SELECT * FROM tokens".to_string() + } else { + format!( + "SELECT * FROM tokens WHERE contract_address IN ({})", + contract_addresses + .iter() + .map(|address| format!("{:#x}", address)) + .collect::>() + .join(", ") + ) + }; + + let tokens: Vec = sqlx::query_as(&query) + .fetch_all(&self.pool) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let tokens = tokens.iter().map(|token| token.clone().into()).collect(); + Ok(RetrieveTokensResponse { tokens }) + } + + async fn retrieve_token_balances( + &self, + account_addresses: Vec, + contract_addresses: Vec, + ) -> Result { + let mut query = "SELECT * FROM token_balances".to_string(); + + let mut conditions = Vec::new(); + if !account_addresses.is_empty() { + conditions.push(format!( + "account_address IN ({})", + account_addresses + .iter() + .map(|address| format!("{:#x}", address)) + .collect::>() + .join(", ") + )); + } + if !contract_addresses.is_empty() { + conditions.push(format!( + "contract_address IN ({})", + contract_addresses + .iter() + .map(|address| format!("{:#x}", address)) + .collect::>() + .join(", ") + )); + } + + if !conditions.is_empty() { + query += &format!(" WHERE {}", conditions.join(" AND ")); + } + + let balances: Vec = sqlx::query_as(&query) + .fetch_all(&self.pool) + .await + .map_err(|e| Status::internal(e.to_string()))?; + + let balances = balances.iter().map(|balance| balance.clone().into()).collect(); + Ok(RetrieveTokenBalancesResponse { balances }) + } + async fn subscribe_indexer( &self, contract_address: Felt, @@ -1165,6 +1258,45 @@ impl proto::world::world_server::World for DojoWorld { Ok(Response::new(WorldMetadataResponse { metadata })) } + async fn retrieve_tokens( + &self, + request: Request, + ) -> Result, Status> { + let RetrieveTokensRequest { contract_addresses } = request.into_inner(); + let contract_addresses = contract_addresses + .iter() + .map(|address| Felt::from_bytes_be_slice(address)) + .collect::>(); + + let tokens = self + .retrieve_tokens(contract_addresses) + .await + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(tokens)) + } + + async fn retrieve_token_balances( + &self, + request: Request, + ) -> Result, Status> { + let RetrieveTokenBalancesRequest { account_addresses, contract_addresses } = + request.into_inner(); + let account_addresses = account_addresses + .iter() + .map(|address| Felt::from_bytes_be_slice(address)) + .collect::>(); + let contract_addresses = contract_addresses + .iter() + .map(|address| Felt::from_bytes_be_slice(address)) + .collect::>(); + + let balances = self + .retrieve_token_balances(account_addresses, contract_addresses) + .await + .map_err(|e| Status::internal(e.to_string()))?; + Ok(Response::new(balances)) + } + async fn subscribe_indexer( &self, request: Request, diff --git a/crates/torii/grpc/src/types/mod.rs b/crates/torii/grpc/src/types/mod.rs index b4162f30b6..0b1c18430c 100644 --- a/crates/torii/grpc/src/types/mod.rs +++ b/crates/torii/grpc/src/types/mod.rs @@ -2,9 +2,11 @@ use core::fmt; use std::collections::HashMap; use std::str::FromStr; +use crypto_bigint::U256; use dojo_types::primitive::Primitive; use dojo_types::schema::Ty; use dojo_world::contracts::naming; +use schema::SchemaError; use serde::{Deserialize, Serialize}; use starknet::core::types::{ ContractStorageDiffItem, Felt, FromStrError, StateDiff, StateUpdate, StorageEntry, @@ -16,6 +18,48 @@ use crate::proto::{self}; pub mod schema; +#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] +pub struct Token { + pub contract_address: Felt, + pub name: String, + pub symbol: String, + pub decimals: u8, + pub metadata: String, +} + +impl TryFrom for Token { + type Error = SchemaError; + fn try_from(value: proto::types::Token) -> Result { + Ok(Self { + contract_address: Felt::from_str(&value.contract_address)?, + name: value.name, + symbol: value.symbol, + decimals: value.decimals as u8, + metadata: value.metadata, + }) + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] +pub struct TokenBalance { + pub balance: U256, + pub account_address: Felt, + pub contract_address: Felt, + pub token_id: String, +} + +impl TryFrom for TokenBalance { + type Error = SchemaError; + fn try_from(value: proto::types::TokenBalance) -> Result { + Ok(Self { + balance: U256::from_be_hex(&value.balance), + account_address: Felt::from_str(&value.account_address)?, + contract_address: Felt::from_str(&value.contract_address)?, + token_id: value.token_id, + }) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Hash, Eq, Clone)] pub struct IndexerUpdate { pub head: i64,