Skip to content

Commit

Permalink
refactor(torii-core): correctly queue entity deletions
Browse files Browse the repository at this point in the history
  • Loading branch information
Larkooo committed Sep 14, 2024
1 parent ed2aa83 commit a80c181
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
25 changes: 25 additions & 0 deletions crates/torii/core/src/query_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct QueryQueue {
#[derive(Debug, Clone)]
pub enum QueryType {
SetEntity(Ty),
DeleteEntity(Ty),
Other,
}

Expand Down Expand Up @@ -97,6 +98,30 @@ impl QueryQueue {
let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::DeleteEntity(entity) => {
let row = query.fetch_one(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
})?;
let mut entity_updated = EntityUpdated::from_row(&row)?;
entity_updated.updated_model = Some(entity);

let count = sqlx::query_scalar::<_, i64>("SELECT count(*) FROM entity_model WHERE entity_id = ?")
.bind(entity_updated.id.clone())
.fetch_one(&mut *tx)
.await?;
entity_updated.deleted = count == 0;

// Delete entity if all of its models are deleted
if entity_updated.deleted {
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(entity_updated.id.clone())
.execute(&mut *tx)
.await?;
}

let broker_message = BrokerMessage::EntityUpdated(entity_updated);
self.push_publish(broker_message);
}
QueryType::Other => {
query.execute(&mut *tx).await.with_context(|| {
format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
Expand Down
63 changes: 22 additions & 41 deletions crates/torii/core/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,49 +324,26 @@ impl Sql {
let path = vec![entity.name()];
// delete entity models data
self.build_delete_entity_queries_recursive(path, &entity_id, &entity);
self.execute().await?;

let deleted_entity_model =
sqlx::query("DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?")
.bind(&entity_id)
.bind(format!("{:#x}", compute_selector_from_tag(&entity.name())))
.execute(&self.pool)
.await?;
if deleted_entity_model.rows_affected() == 0 {
// fail silently. we have no entity-model relation to delete.
// this can happen if a entity model that doesnt exist
// got deleted
return Ok(());
}
self.query_queue.enqueue(
"DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?",
vec![
Argument::String(entity_id.clone()),
Argument::String(format!("{:#x}", compute_selector_from_tag(&entity.name()))),
],
QueryType::Other,
);

let mut update_entity = sqlx::query_as::<_, EntityUpdated>(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id \
= ? RETURNING *",
)
.bind(utc_dt_string_from_timestamp(block_timestamp))
.bind(event_id)
.bind(&entity_id)
.fetch_one(&self.pool)
.await?;
update_entity.updated_model = Some(entity.clone());

let models_count =
sqlx::query_scalar::<_, u32>("SELECT count(*) FROM entity_model WHERE entity_id = ?")
.bind(&entity_id)
.fetch_one(&self.pool)
.await?;

if models_count == 0 {
// delete entity
sqlx::query("DELETE FROM entities WHERE id = ?")
.bind(&entity_id)
.execute(&self.pool)
.await?;

update_entity.deleted = true;
}
self.query_queue.enqueue(
"UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? WHERE id = ? RETURNING *",
vec![
Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
Argument::String(event_id.to_string()),
Argument::String(entity_id.clone()),
],
QueryType::DeleteEntity(entity.clone()),
);

self.query_queue.push_publish(BrokerMessage::EntityUpdated(update_entity));
Ok(())
}

Expand Down Expand Up @@ -758,7 +735,11 @@ impl Sql {
Ty::Enum(e) => {
if e.options.iter().all(
|o| {
if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }
if let Ty::Tuple(t) = &o.ty {
t.is_empty()
} else {
false
}
},
) {
return;
Expand Down

0 comments on commit a80c181

Please sign in to comment.