diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 991cd3dea7..40cb52e86c 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -7,6 +7,7 @@ use std::time::Duration; use anyhow::Result; use bitflags::bitflags; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::try_join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, @@ -17,7 +18,6 @@ use starknet::providers::Provider; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; -use tokio::task::JoinSet; use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, trace, warn}; @@ -500,14 +500,14 @@ impl Engine

{ let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); // Run all tasks concurrently - let mut set = JoinSet::new(); + let mut handles = Vec::new(); for (task_id, events) in self.tasks.drain() { let db = self.db.clone(); let world = self.world.clone(); let processors = self.processors.clone(); let semaphore = semaphore.clone(); - set.spawn(async move { + handles.push(tokio::spawn(async move { let _permit = semaphore.acquire().await.unwrap(); let mut local_db = db.clone(); for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { @@ -523,11 +523,11 @@ impl Engine

{ } } Ok::<_, anyhow::Error>(local_db) - }); + })); } // Join all tasks - while let Some(_) = set.join_next().await {} + try_join_all(handles).await?; Ok(()) } diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs index 76237f0a73..599f594c1b 100644 --- a/crates/torii/core/src/executor.rs +++ b/crates/torii/core/src/executor.rs @@ -52,6 +52,7 @@ pub enum QueryType { Other, } +#[derive(Debug)] pub struct Executor<'c> { pool: Pool, transaction: Transaction<'c, Sqlite>, @@ -60,6 +61,7 @@ pub struct Executor<'c> { shutdown_rx: Receiver<()>, } +#[derive(Debug, Clone)] pub struct QueryMessage { pub statement: String, pub arguments: Vec, diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index dd98a7b916..ec5433cb15 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -124,6 +124,19 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + // move + let tx = &account + .execute_v1(vec![Call { + to: actions_address, + selector: get_selector_from_name("move").unwrap(), + calldata: vec![Felt::ONE], + }]) + .send() + .await + .unwrap(); + + TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); let (shutdown_tx, _) = broadcast::channel(1); @@ -186,6 +199,7 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { assert_eq!(unpacked_size, 0); assert_eq!(count_table("entities", &pool).await, 2); + assert_eq!(count_table("event_messages", &pool).await, 1); let (id, keys): (String, String) = sqlx::query_as( format!( diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 964b656f2f..08b52f8ca6 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -12,15 +12,9 @@ mod test { use crypto_bigint::U256; use dojo_types::primitive::Primitive; use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty}; - use dojo_world::contracts::abi::model::Layout; - use futures::StreamExt; use katana_runner::KatanaRunner; use serde_json::Number; - use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use starknet::core::types::Felt; - use torii_core::simple_broker::SimpleBroker; - use torii_core::sql::Sql; - use torii_core::types::EventMessage; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::*; @@ -693,57 +687,6 @@ mod test { } } - // Test to verify that setting an entity message in the SQL database - // triggers a publish event on the broker - #[tokio::test] - async fn test_entity_message_trigger_publish() -> Result<(), Box> { - let _ = tracing_subscriber::fmt() - .with_env_filter("torii::relay::client=debug,torii::relay::server=debug") - .try_init(); - - let options = ::from_str("sqlite::memory:") - .unwrap() - .create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); - let mut broker = SimpleBroker::::subscribe(); - - let entity = Ty::Struct(Struct { name: "Message".to_string(), children: vec![] }); - db.register_model( - "test_namespace", - entity.clone(), - Layout::Fixed(vec![]), - Felt::ZERO, - Felt::ZERO, - 0, - 0, - 0, - ) - .await?; - - // FIXME: register_model and set_event_message handle the name and namespace of entity type - // differently. - let entity = - Ty::Struct(Struct { name: "test_namespace-Message".to_string(), children: vec![] }); - - // Set the event message in the database - db.set_event_message(entity, "some_entity_id", 0).await?; - db.query_queue.execute_all().await?; - - // Check if a message was published to the broker - tokio::select! { - Some(message) = broker.next() => { - println!("Received message: {:?}", message); - Ok(()) - }, - _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => { - Err("Timeout: No message received".into()) - } - } - } - #[cfg(target_arch = "wasm32")] #[wasm_bindgen_test] async fn test_client_connection_wasm() -> Result<(), Box> {