Skip to content

Commit

Permalink
fix: tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 25, 2024
1 parent 388ba1e commit b7acef5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 62 deletions.
10 changes: 5 additions & 5 deletions crates/torii/core/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::time::Duration;
use anyhow::Result;
use bitflags::bitflags;
use dojo_world::contracts::world::WorldContractReader;
use futures_util::future::try_join_all;
use hashlink::LinkedHashMap;
use starknet::core::types::{
BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts,
Expand All @@ -17,7 +18,6 @@ use starknet::providers::Provider;
use tokio::sync::broadcast::Sender;
use tokio::sync::mpsc::Sender as BoundedSender;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tokio::time::{sleep, Instant};
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -500,14 +500,14 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks));

// Run all tasks concurrently
let mut set = JoinSet::new();
let mut handles = Vec::new();
for (task_id, events) in self.tasks.drain() {
let db = self.db.clone();
let world = self.world.clone();
let processors = self.processors.clone();
let semaphore = semaphore.clone();

set.spawn(async move {
handles.push(tokio::spawn(async move {
let _permit = semaphore.acquire().await.unwrap();
let mut local_db = db.clone();
for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events {
Expand All @@ -523,11 +523,11 @@ impl<P: Provider + Send + Sync + std::fmt::Debug + 'static> Engine<P> {
}
}
Ok::<_, anyhow::Error>(local_db)
});
}));
}

// Join all tasks
while let Some(_) = set.join_next().await {}
try_join_all(handles).await?;

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub enum QueryType {
Other,
}

#[derive(Debug)]
pub struct Executor<'c> {
pool: Pool<Sqlite>,
transaction: Transaction<'c, Sqlite>,
Expand All @@ -60,6 +61,7 @@ pub struct Executor<'c> {
shutdown_rx: Receiver<()>,
}

#[derive(Debug, Clone)]
pub struct QueryMessage {
pub statement: String,
pub arguments: Vec<Argument>,
Expand Down
14 changes: 14 additions & 0 deletions crates/torii/core/src/sql_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) {

TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap();

// move
let tx = &account
.execute_v1(vec![Call {
to: actions_address,
selector: get_selector_from_name("move").unwrap(),
calldata: vec![Felt::ONE],
}])
.send()
.await
.unwrap();

TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap();

let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider));

let (shutdown_tx, _) = broadcast::channel(1);
Expand Down Expand Up @@ -186,6 +199,7 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) {
assert_eq!(unpacked_size, 0);

assert_eq!(count_table("entities", &pool).await, 2);
assert_eq!(count_table("event_messages", &pool).await, 1);

let (id, keys): (String, String) = sqlx::query_as(
format!(
Expand Down
57 changes: 0 additions & 57 deletions crates/torii/libp2p/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,9 @@ mod test {
use crypto_bigint::U256;
use dojo_types::primitive::Primitive;
use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty};
use dojo_world::contracts::abi::model::Layout;
use futures::StreamExt;
use katana_runner::KatanaRunner;
use serde_json::Number;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
use starknet::core::types::Felt;
use torii_core::simple_broker::SimpleBroker;
use torii_core::sql::Sql;
use torii_core::types::EventMessage;
#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::*;

Expand Down Expand Up @@ -693,57 +687,6 @@ mod test {
}
}

// Test to verify that setting an entity message in the SQL database
// triggers a publish event on the broker
#[tokio::test]
async fn test_entity_message_trigger_publish() -> Result<(), Box<dyn Error>> {
let _ = tracing_subscriber::fmt()
.with_env_filter("torii::relay::client=debug,torii::relay::server=debug")
.try_init();

let options = <SqliteConnectOptions as std::str::FromStr>::from_str("sqlite::memory:")
.unwrap()
.create_if_missing(true);
let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap();
sqlx::migrate!("../migrations").run(&pool).await.unwrap();

let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap();
let mut broker = SimpleBroker::<EventMessage>::subscribe();

let entity = Ty::Struct(Struct { name: "Message".to_string(), children: vec![] });
db.register_model(
"test_namespace",
entity.clone(),
Layout::Fixed(vec![]),
Felt::ZERO,
Felt::ZERO,
0,
0,
0,
)
.await?;

// FIXME: register_model and set_event_message handle the name and namespace of entity type
// differently.
let entity =
Ty::Struct(Struct { name: "test_namespace-Message".to_string(), children: vec![] });

// Set the event message in the database
db.set_event_message(entity, "some_entity_id", 0).await?;
db.query_queue.execute_all().await?;

// Check if a message was published to the broker
tokio::select! {
Some(message) = broker.next() => {
println!("Received message: {:?}", message);
Ok(())
},
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => {
Err("Timeout: No message received".into())
}
}
}

#[cfg(target_arch = "wasm32")]
#[wasm_bindgen_test]
async fn test_client_connection_wasm() -> Result<(), Box<dyn Error>> {
Expand Down

0 comments on commit b7acef5

Please sign in to comment.