Skip to content

Commit

Permalink
graph,store: add database optimization
Browse files Browse the repository at this point in the history
rename variables to be more meaningful
  • Loading branch information
gusinacio committed Mar 15, 2023
1 parent f3e1ef5 commit 3d9a5b0
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 30 deletions.
4 changes: 2 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ impl EntityCache {
) -> Result<Vec<Entity>, anyhow::Error> {
let (base_type, field) = self.schema.get_type_for_field(eref)?;

let key = DerivedEntityQuery {
let query = DerivedEntityQuery {
entity_type: EntityType::new(base_type.to_string()),
entity_field: field.into(),
value: eref.entity_id.clone(),
causality_region: eref.causality_region,
};

let entities = self.store.get_derived(&key)?;
let entities = self.store.get_derived(&query)?;
entities.iter().for_each(|(key, e)| {
self.current.insert(key.clone(), Some(e.clone()));
});
Expand Down
2 changes: 1 addition & 1 deletion graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ pub trait ReadStore: Send + Sync + 'static {
/// Reverse lookup
fn get_derived(
&self,
entity_derived: &DerivedEntityQuery,
query_derived: &DerivedEntityQuery,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError>;

fn input_schema(&self) -> Arc<Schema>;
Expand Down
5 changes: 3 additions & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,12 +1088,13 @@ impl DeploymentStore {
pub(crate) fn get_derived(
&self,
site: Arc<Site>,
key: &DerivedEntityQuery,
derived_query: &DerivedEntityQuery,
block: BlockNumber,
excluded_keys: &Option<Vec<EntityKey>>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
let conn = self.get_conn()?;
let layout = self.layout(&conn, site)?;
layout.find_derived(&conn, key, block)
layout.find_derived(&conn, derived_query, block, excluded_keys)
}

pub(crate) fn get_changes(
Expand Down
7 changes: 4 additions & 3 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,12 @@ impl Layout {
pub fn find_derived(
&self,
conn: &PgConnection,
key: &DerivedEntityQuery,
derived_query: &DerivedEntityQuery,
block: BlockNumber,
excluded_keys: &Option<Vec<EntityKey>>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
let table = self.table_for_entity(&key.entity_type)?;
let query = FindDerivedQuery::new(table, key, block);
let table = self.table_for_entity(&derived_query.entity_type)?;
let query = FindDerivedQuery::new(table, derived_query, block, excluded_keys);

let mut entities = BTreeMap::new();

Expand Down
18 changes: 16 additions & 2 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,8 +1670,9 @@ impl<'a, Conn> RunQueryDsl<Conn> for FindManyQuery<'a> {}
#[derive(Debug, Clone, Constructor)]
pub struct FindDerivedQuery<'a> {
table: &'a Table,
key: &'a DerivedEntityQuery,
derived_query: &'a DerivedEntityQuery,
block: BlockNumber,
excluded_keys: &'a Option<Vec<EntityKey>>,
}

impl<'a> QueryFragment<Pg> for FindDerivedQuery<'a> {
Expand All @@ -1683,7 +1684,7 @@ impl<'a> QueryFragment<Pg> for FindDerivedQuery<'a> {
entity_field,
value: entity_id,
causality_region,
} = self.key;
} = self.derived_query;

// Generate
// select '..' as entity, to_jsonb(e.*) as data
Expand All @@ -1694,6 +1695,19 @@ impl<'a> QueryFragment<Pg> for FindDerivedQuery<'a> {
out.push_sql(" from ");
out.push_sql(self.table.qualified_name.as_str());
out.push_sql(" e\n where ");

if let Some(keys) = self.excluded_keys {
let primary_key = self.table.primary_key();
out.push_identifier(primary_key.name.as_str())?;
out.push_sql(" not in (");
for (i, value) in keys.iter().enumerate() {
if i > 0 {
out.push_sql(", ");
}
out.push_bind_param::<Text, _>(&value.entity_id.as_str())?;
}
out.push_sql(") and ");
}
out.push_identifier(entity_field.as_str())?;
out.push_sql(" = ");
out.push_bind_param::<Text, _>(&entity_id.as_str())?;
Expand Down
41 changes: 21 additions & 20 deletions store/postgres/src/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,11 @@ impl SyncStore {
&self,
key: &DerivedEntityQuery,
block: BlockNumber,
excluded_keys: Option<Vec<EntityKey>>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
self.retry("get_derived", || {
self.writable
.get_derived(self.site.cheap_clone(), key, block)
.get_derived(self.site.cheap_clone(), key, block, &excluded_keys)
})
}

Expand Down Expand Up @@ -809,9 +810,9 @@ impl Queue {
let mut tracker = BlockTracker::new();

// Get entities from entries in the queue
let (entities_in_queue, entities_removed) = self.queue.fold(
(BTreeMap::new(), Vec::new()),
|(mut map, mut remove_list): (BTreeMap<EntityKey, Entity>, Vec<EntityKey>), req| {
let entities_in_queue = self.queue.fold(
BTreeMap::new(),
|mut map: BTreeMap<EntityKey, Option<Entity>>, req| {
tracker.update(req.as_ref());
match req.as_ref() {
Request::Write {
Expand All @@ -820,33 +821,33 @@ impl Queue {
if tracker.visible(block_ptr) {
for emod in mods {
let key = emod.entity_ref();
// The key must be removed to avoid overwriting it with a stale value.
// we select only the entities that match the query
if key_derived.entity_type == key.entity_type {
match emod.entity() {
Some(entity) => {
map.insert(key.clone(), entity.clone());
}
None => {
remove_list.push(key.clone());
}
}
map.insert(key.clone(), emod.entity().cloned());
}
}
}
}
Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ }
}
(map, remove_list)
map
},
);
// We should filter this in the future to only get the entities that are needed
let mut items_from_database = self.store.get_derived(key_derived, tracker.query_block())?;
// Remove any entities that were removed in the queue
items_from_database.retain(|key, _item| !entities_removed.contains(key));

let excluded_keys: Vec<EntityKey> = entities_in_queue.keys().cloned().collect();

// We filter to exclude the entities ids that we already have from the queue
let mut items_from_database =
self.store
.get_derived(key_derived, tracker.query_block(), Some(excluded_keys))?;

// Extend the store results with the entities from the queue.
// This overwrites any entitiy from the database with the same key from queue
items_from_database.extend(entities_in_queue);
let items_from_queue: BTreeMap<EntityKey, Entity> = entities_in_queue
.into_iter()
.filter_map(|(key, entity)| entity.map(|entity| (key, entity)))
.collect();
items_from_database.extend(items_from_queue);

Ok(items_from_database)
}
Expand Down Expand Up @@ -1010,7 +1011,7 @@ impl Writer {
key: &DerivedEntityQuery,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
match self {
Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX),
Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, None),
Writer::Async(queue) => queue.get_derived(key),
}
}
Expand Down

0 comments on commit 3d9a5b0

Please sign in to comment.