Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(torii): queries for them to not block #2391

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::VecDeque;

use anyhow::{Context, Result};
use dojo_types::schema::Ty;
use sqlx::{Executor, Pool, Sqlite};
use starknet::core::types::Felt;

Expand Down Expand Up @@ -33,11 +35,22 @@
// publishes that are related to queries in the queue, they should be sent
// after the queries are executed
pub publish_queue: VecDeque<BrokerMessage>,
pub publish_queries: VecDeque<(String, Vec<Argument>, QueryType)>,
}

#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
}

impl QueryQueue {
pub fn new(pool: Pool<Sqlite>) -> Self {
QueryQueue { pool, queue: VecDeque::new(), publish_queue: VecDeque::new() }
QueryQueue {
pool,
queue: VecDeque::new(),
publish_queue: VecDeque::new(),
publish_queries: VecDeque::new(),
}
}

pub fn enqueue<S: Into<String>>(&mut self, statement: S, arguments: Vec<Argument>) {
Expand All @@ -52,7 +65,16 @@
self.publish_queue.push_back(value);
}

pub async fn execute_all(&mut self) -> sqlx::Result<u64> {
pub fn push_publish_query(
&mut self,
statement: String,
arguments: Vec<Argument>,
query_type: QueryType,
) {
self.publish_queries.push_back((statement, arguments, query_type));
}

pub async fn execute_all(&mut self) -> Result<u64> {
let mut total_affected = 0_u64;
let mut tx = self.pool.begin().await?;

Expand All @@ -69,20 +91,54 @@
}
}

total_affected += tx.execute(query).await?.rows_affected();
total_affected += tx
.execute(query)
.await
.with_context(|| format!("Failed to execute query: {}", statement))?
.rows_affected();
}

tx.commit().await?;

while let Some(message) = self.publish_queue.pop_front() {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
send_broker_message(message);
}

while let Some((statement, arguments, query_type)) = self.publish_queries.pop_front() {
let mut query = sqlx::query_as(&statement);
for arg in &arguments {
query = match arg {
Argument::Null => query.bind(None::<String>),
Argument::Int(integer) => query.bind(integer),
Argument::Bool(bool) => query.bind(bool),

Check warning on line 113 in crates/torii/core/src/query_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/query_queue.rs#L111-L113

Added lines #L111 - L113 were not covered by tests
Argument::String(string) => query.bind(string),
Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)),

Check warning on line 115 in crates/torii/core/src/query_queue.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/query_queue.rs#L115

Added line #L115 was not covered by tests
}
}

let broker_message = match query_type {
QueryType::SetEntity(entity) => {
let mut result: EntityUpdated = query
.fetch_one(&self.pool)
.await
.with_context(|| format!("Failed to fetch entity: {}", statement))?;
result.updated_model = Some(entity);
result.deleted = false;
BrokerMessage::EntityUpdated(result)
}
};
send_broker_message(broker_message);
}

Ok(total_affected)
}
}

fn send_broker_message(message: BrokerMessage) {
match message {
BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model),
BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity),
BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event),
BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event),
}
}
66 changes: 35 additions & 31 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use tracing::debug;

use crate::cache::{Model, ModelCache};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue};
use crate::query_queue::{Argument, BrokerMessage, QueryQueue, QueryType};
use crate::types::{
Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated,
Model as ModelRegistered,
Expand Down Expand Up @@ -193,29 +193,28 @@
let entity_id = format!("{:#x}", poseidon_hash_many(&keys));
let model_id = format!("{:#x}", compute_selector_from_names(model_namespace, model_name));

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
);

let keys_str = felts_sql_string(&keys);

let insert_entities = "INSERT INTO entities (id, keys, event_id, executed_at) VALUES (?, \
?, ?, ?) ON CONFLICT(id) DO UPDATE SET \
updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \
event_id=EXCLUDED.event_id RETURNING *";
// if timeout doesn't work
// fetch to get entity
// if not available, insert into queue
let mut entity_updated: EntityUpdated = sqlx::query_as(insert_entities)
.bind(&entity_id)
.bind(&keys_str)
.bind(event_id)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.fetch_one(&self.pool)
.await?;
event_id=EXCLUDED.event_id";

entity_updated.updated_model = Some(entity.clone());
self.query_queue.enqueue(
insert_entities,
vec![
Argument::String(entity_id.clone()),
Argument::String(keys_str.clone()),
Argument::String(event_id.to_string()),
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
],
);

self.query_queue.enqueue(
"INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
model_id) DO NOTHING",
vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
);

let path = vec![namespaced_name];
self.build_set_entity_queries_recursive(
Expand All @@ -227,7 +226,12 @@
&vec![],
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(entity_updated));
let query_entities_for_publish = "SELECT * FROM entities WHERE id = ?";
self.query_queue.push_publish_query(
query_entities_for_publish.to_string(),
vec![Argument::String(entity_id.clone())],
QueryType::SetEntity(entity.clone()),
);

Ok(())
}
Expand Down Expand Up @@ -316,18 +320,18 @@
);
self.execute().await?;

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(entity_id)
.fetch_one(&self.pool)
.await?;
let update_query = "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \
event_id=? WHERE id = ? RETURNING *";

Check warning on line 324 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L323-L324

Added lines #L323 - L324 were not covered by tests

update_entity.updated_model = Some(wrapped_ty);
self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
self.query_queue.push_publish_query(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should ensure that set_model_member also works when parallelized

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason it wouldn't work?

update_query.to_string(),
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::SetEntity(wrapped_ty),
);

Check warning on line 334 in crates/torii/core/src/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/core/src/sql.rs#L326-L334

Added lines #L326 - L334 were not covered by tests

Ok(())
}
Expand Down
2 changes: 2 additions & 0 deletions crates/torii/libp2p/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@
error = %e,
"Setting message."
);
self.db.execute().await.unwrap();

Check warning on line 286 in crates/torii/libp2p/src/server/mod.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/libp2p/src/server/mod.rs#L286

Added line #L286 was not covered by tests
continue;
} else {
info!(
Expand All @@ -291,6 +292,7 @@
peer_id = %peer_id,
"Message set."
);
self.db.execute().await.unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also execute the queries for the set_entity that is further down. Can we refactor the logic here to reuse the same self.db.execute() everywhere? We can add a function that sets the entity and directrly executes the db queries

continue;
}
}
Expand Down
Loading