From 0519b205c411c79f4e8a6bc67b96a5287d377df0 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 7 Mar 2023 21:18:47 -0300 Subject: [PATCH 01/22] graph,core,runtime,store: add store.getWhere --- core/src/subgraph/state.rs | 4 +- graph/src/components/store/entity_cache.rs | 142 +++++++++++++++++---- graph/src/components/store/mod.rs | 40 ++++++ graph/src/components/store/traits.rs | 7 + graph/src/components/subgraph/instance.rs | 4 +- graph/src/runtime/gas/size_of.rs | 19 ++- graph/src/util/cache_weight.rs | 19 ++- runtime/wasm/src/host_exports.rs | 24 +++- runtime/wasm/src/module/mod.rs | 30 +++++ store/postgres/src/deployment_store.rs | 13 +- store/postgres/src/relational.rs | 13 +- store/postgres/src/writable.rs | 29 ++++- 12 files changed, 309 insertions(+), 35 deletions(-) diff --git a/core/src/subgraph/state.rs b/core/src/subgraph/state.rs index 0d5edd84b65..19eab13b2a0 100644 --- a/core/src/subgraph/state.rs +++ b/core/src/subgraph/state.rs @@ -1,5 +1,5 @@ use graph::{ - components::store::EntityKey, + components::store::EntityMultiKey, prelude::Entity, util::{backoff::ExponentialBackoff, lfu_cache::LfuCache}, }; @@ -18,5 +18,5 @@ pub struct IndexingState { /// - The time THRESHOLD is passed /// - Or the subgraph has triggers for the block pub skip_ptr_updates_timer: Instant, - pub entity_lfu_cache: LfuCache>, + pub entity_lfu_cache: LfuCache>, } diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 538af72ff18..375000d242b 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -3,10 +3,12 @@ use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; -use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOperation}; +use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOperation, Value}; use crate::prelude::{Schema, ENV_VARS}; use crate::util::lfu_cache::LfuCache; +use super::{EntityDerived, EntityMultiKey}; + /// A cache for entities from the store that provides the basic functionality /// needed for the store interactions in the host exports. This struct tracks /// how entities are modified, and caches all entities looked up from the @@ -17,7 +19,7 @@ use crate::util::lfu_cache::LfuCache; pub struct EntityCache { /// The state of entities in the store. An entry of `None` /// means that the entity is not present in the store - current: LfuCache>, + current: LfuCache>, /// The accumulated changes to an entity. updates: HashMap, @@ -45,7 +47,7 @@ impl Debug for EntityCache { pub struct ModificationsAndCache { pub modifications: Vec, - pub entity_lfu_cache: LfuCache>, + pub entity_lfu_cache: LfuCache>, } impl EntityCache { @@ -62,7 +64,7 @@ impl EntityCache { pub fn with_current( store: Arc, - current: LfuCache>, + current: LfuCache>, ) -> EntityCache { EntityCache { current, @@ -99,7 +101,9 @@ impl EntityCache { pub fn get(&mut self, eref: &EntityKey) -> Result, s::QueryExecutionError> { // Get the current entity, apply any updates from `updates`, then // from `handler_updates`. - let mut entity = self.current.get_entity(&*self.store, eref)?; + let mut entity = self + .current + .get_entity(&*self.store, &EntityMultiKey::Equal(eref.clone()))?; // Always test the cache consistency in debug mode. debug_assert!(entity == self.store.get(eref).unwrap()); @@ -110,9 +114,55 @@ impl EntityCache { if let Some(op) = self.handler_updates.get(eref).cloned() { entity = op.apply_to(entity) } + match entity { + Some(ref e) => { + let list = e.clone().sorted(); + let list = list.iter(); + list.for_each(|(k, v)| println!("{}: {:?}", k, v)); + } + None => println!("No entity found for {:?}", eref), + } Ok(entity) } + pub fn get_where( + &mut self, + eref: &EntityDerived, + ) -> Result, s::QueryExecutionError> { + println!("HELLO WORLD, YOU TRIGERED store.getWhere"); + self.current + .get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; + let entity = self + .current + .get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; + let entities = match entity { + Some(e) => { + // retrieve the list from the cache + let mut entities = Vec::new(); + if let Some(Value::List(list)) = e.get(eref.entity_field.as_str()) { + for id in list.iter() { + // we just created + if let Value::String(id) = id { + let key = EntityKey::from(id, eref); + match self.get(&key) { + Ok(Some(value)) => entities.push(value), + _ => (), + } + } + } + } + entities + } + None => { + println!("No entity found for {:?}", eref); + Vec::new() + } + }; + // self.store.get_where() + // todo!(); + Ok(entities) + } + pub fn remove(&mut self, key: EntityKey) { self.entity_op(key, EntityOp::Remove); } @@ -151,6 +201,7 @@ impl EntityCache { } } + // check the validate for derived fields let is_valid = entity.validate(&self.schema, &key).is_ok(); self.entity_op(key.clone(), EntityOp::Update(entity)); @@ -221,10 +272,11 @@ impl EntityCache { // The first step is to make sure all entities being set are in `self.current`. // For each subgraph, we need a map of entity type to missing entity ids. - let missing = self - .updates - .keys() - .filter(|key| !self.current.contains_key(key)); + let missing = self.updates.keys().filter(|key| { + !self + .current + .contains_key(&EntityMultiKey::Equal((*key).clone())) + }); // For immutable types, we assume that the subgraph is well-behaved, // and all updated immutable entities are in fact new, and skip @@ -236,12 +288,14 @@ impl EntityCache { let missing = missing.filter(|key| !self.schema.is_immutable(&key.entity_type)); for (entity_key, entity) in self.store.get_many(missing.cloned().collect())? { - self.current.insert(entity_key, Some(entity)); + self.current + .insert(EntityMultiKey::Equal(entity_key), Some(entity)); } let mut mods = Vec::new(); - for (key, update) in self.updates { + for (entity_key, update) in self.updates { use s::EntityModification::*; + let key = EntityMultiKey::Equal(entity_key.clone()); let current = self.current.remove(&key).and_then(|entity| entity); let modification = match (current, update) { @@ -251,7 +305,10 @@ impl EntityCache { let mut data = Entity::new(); data.merge_remove_null_fields(updates); self.current.insert(key.clone(), Some(data.clone())); - Some(Insert { key, data }) + Some(Insert { + key: entity_key, + data, + }) } // Entity may have been changed (Some(current), EntityOp::Update(updates)) => { @@ -259,7 +316,10 @@ impl EntityCache { data.merge_remove_null_fields(updates); self.current.insert(key.clone(), Some(data.clone())); if current != data { - Some(Overwrite { key, data }) + Some(Overwrite { + key: entity_key, + data, + }) } else { None } @@ -268,7 +328,10 @@ impl EntityCache { (Some(current), EntityOp::Overwrite(data)) => { self.current.insert(key.clone(), Some(data.clone())); if current != data { - Some(Overwrite { key, data }) + Some(Overwrite { + key: entity_key, + data, + }) } else { None } @@ -276,7 +339,7 @@ impl EntityCache { // Existing entity was deleted (Some(_), EntityOp::Remove) => { self.current.insert(key.clone(), None); - Some(Remove { key }) + Some(Remove { key: entity_key }) } // Entity was deleted, but it doesn't exist in the store (None, EntityOp::Remove) => None, @@ -294,23 +357,52 @@ impl EntityCache { } } -impl LfuCache> { +impl LfuCache> { // Helper for cached lookup of an entity. fn get_entity( &mut self, store: &(impl s::ReadStore + ?Sized), - key: &EntityKey, + key: &EntityMultiKey, ) -> Result, s::QueryExecutionError> { match self.get(key) { - None => { - let mut entity = store.get(key)?; - if let Some(entity) = &mut entity { - // `__typename` is for queries not for mappings. - entity.remove("__typename"); + None => match key { + EntityMultiKey::Equal(store_key) => { + let mut entity = store.get(store_key)?; + if let Some(entity) = &mut entity { + // `__typename` is for queries not for mappings. + entity.remove("__typename"); + } + self.insert(key.clone(), entity.clone()); + Ok(entity) } - self.insert(key.clone(), entity.clone()); - Ok(entity) - } + EntityMultiKey::All(derived) => { + // we get all entities with the derived field + let mut entities = store.get_where(derived)?; + // we asssume that derived fields contains ids + let entities = entities + .iter_mut() + .filter(|entity| entity.contains_key("id")) + .map(|entity| { + entity.remove("__typename"); + + // we insert each entity into the cache with the id key + let key = EntityKey::from(&entity.id().unwrap().into(), derived); + self.insert(EntityMultiKey::Equal(key), Some(entity.clone())); + // return the value to save + Value::String(entity.id().unwrap()) + }); + let entities = entities.collect(); + + // create entity with the list of ids + let mut entity = Entity::new(); + entity.insert(derived.entity_field.to_string(), Value::List(entities)); + + // insert to cache the list of ids + self.insert(key.clone(), Some(entity.clone())); + + Ok(Some(entity.clone())) + } + }, Some(data) => Ok(data.clone()), } } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index a1c42d5f1bf..f8304fd3f14 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -138,6 +138,33 @@ pub struct EntityKey { pub causality_region: CausalityRegion, } +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct EntityDerived { + /// Name of the entity type. + pub entity_type: EntityType, + + pub entity_field: Word, + + /// ID of the individual entity. + pub entity_id: Word, + + /// This is the causality region of the data source that created the entity. + /// + /// In the case of an entity lookup, this is the causality region of the data source that is + /// doing the lookup. So if the entity exists but was created on a different causality region, + /// the lookup will return empty. + pub causality_region: CausalityRegion, +} + +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum EntityMultiKey { + /// A filter that matches all entities of a given type. + All(EntityDerived), + + /// A filter that matches a specific entity. + Equal(EntityKey), +} + impl EntityKey { // For use in tests only #[cfg(debug_assertions)] @@ -148,6 +175,15 @@ impl EntityKey { causality_region: CausalityRegion::ONCHAIN, } } + + pub fn from(id: &String, entity_derived: &EntityDerived) -> Self { + let clone = entity_derived.clone(); + Self { + entity_id: id.clone().into(), + entity_type: clone.entity_type, + causality_region: clone.causality_region, + } + } } #[derive(Clone, Debug, PartialEq)] @@ -1127,6 +1163,10 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } + fn get_where(&self, _query: &EntityDerived) -> Result, StoreError> { + Ok(vec![]) + } + fn input_schema(&self) -> Arc { self.schema.cheap_clone() } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 04bc36aa1e6..c897dfb1178 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -186,6 +186,9 @@ pub trait ReadStore: Send + Sync + 'static { keys: BTreeSet, ) -> Result, StoreError>; + /// Reverse lookup + fn get_where(&self, entity_derived: &EntityDerived) -> Result, StoreError>; + fn input_schema(&self) -> Arc; } @@ -202,6 +205,10 @@ impl ReadStore for Arc { (**self).get_many(keys) } + fn get_where(&self, entity_derived: &EntityDerived) -> Result, StoreError> { + (**self).get_where(entity_derived) + } + fn input_schema(&self) -> Arc { (**self).input_schema() } diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index f3df2c672e4..b7f2a1efe19 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -1,6 +1,6 @@ use crate::{ blockchain::Blockchain, - components::store::{EntityKey, ReadStore, StoredDynamicDataSource}, + components::store::{EntityMultiKey, ReadStore, StoredDynamicDataSource}, data::subgraph::schema::SubgraphError, data_source::DataSourceTemplate, prelude::*, @@ -35,7 +35,7 @@ pub struct BlockState { } impl BlockState { - pub fn new(store: impl ReadStore, lfu_cache: LfuCache>) -> Self { + pub fn new(store: impl ReadStore, lfu_cache: LfuCache>) -> Self { BlockState { entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache), deterministic_errors: Vec::new(), diff --git a/graph/src/runtime/gas/size_of.rs b/graph/src/runtime/gas/size_of.rs index 49bb60b1215..4f1218955af 100644 --- a/graph/src/runtime/gas/size_of.rs +++ b/graph/src/runtime/gas/size_of.rs @@ -1,7 +1,7 @@ //! Various implementations of GasSizeOf; use crate::{ - components::store::{EntityKey, EntityType}, + components::store::{EntityDerived, EntityKey, EntityMultiKey, EntityType}, data::store::{scalar::Bytes, Value}, prelude::{BigDecimal, BigInt}, }; @@ -168,6 +168,23 @@ impl GasSizeOf for EntityKey { } } +impl GasSizeOf for EntityDerived { + fn gas_size_of(&self) -> Gas { + self.entity_type.gas_size_of() + + self.entity_id.gas_size_of() + + self.entity_field.gas_size_of() + } +} + +impl GasSizeOf for EntityMultiKey { + fn gas_size_of(&self) -> Gas { + match self { + EntityMultiKey::Equal(key) => key.gas_size_of(), + EntityMultiKey::All(key) => key.gas_size_of(), + } + } +} + impl GasSizeOf for EntityType { fn gas_size_of(&self) -> Gas { self.as_str().gas_size_of() diff --git a/graph/src/util/cache_weight.rs b/graph/src/util/cache_weight.rs index af15a82b25d..790e5a80f8e 100644 --- a/graph/src/util/cache_weight.rs +++ b/graph/src/util/cache_weight.rs @@ -1,5 +1,5 @@ use crate::{ - components::store::{EntityKey, EntityType}, + components::store::{EntityDerived, EntityKey, EntityMultiKey, EntityType}, data::value::Word, prelude::{q, BigDecimal, BigInt, Value}, }; @@ -127,6 +127,23 @@ impl CacheWeight for EntityKey { } } +impl CacheWeight for EntityDerived { + fn indirect_weight(&self) -> usize { + self.entity_id.indirect_weight() + + self.entity_type.indirect_weight() + + self.entity_field.indirect_weight() + } +} + +impl CacheWeight for EntityMultiKey { + fn indirect_weight(&self) -> usize { + match self { + EntityMultiKey::All(derived) => derived.indirect_weight(), + EntityMultiKey::Equal(key) => key.indirect_weight(), + } + } +} + impl CacheWeight for [u8; 32] { fn indirect_weight(&self) -> usize { 0 diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index ded1d7193d6..e514314f509 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -9,7 +9,7 @@ use wasmtime::Trap; use web3::types::H160; use graph::blockchain::Blockchain; -use graph::components::store::EnsLookup; +use graph::components::store::{EnsLookup, EntityDerived}; use graph::components::store::{EntityKey, EntityType}; use graph::components::subgraph::{ PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, @@ -239,6 +239,28 @@ impl HostExports { Ok(result) } + pub(crate) fn store_get_where( + &self, + state: &mut BlockState, + entity_type: String, + entity_field: String, + entity_id: String, + gas: &GasCounter, + ) -> Result, anyhow::Error> { + let store_key = EntityDerived { + entity_type: EntityType::new(entity_type), + entity_id: entity_id.into(), + entity_field: entity_field.into(), + causality_region: self.data_source_causality_region, + }; + self.check_entity_type_access(&store_key.entity_type)?; + + let result = state.entity_cache.get_where(&store_key)?; + gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?; + + Ok(result) + } + /// Prints the module of `n` in hex. /// Integers are encoded using the least amount of digits (no leading zero digits). /// Their encoding may be of uneven length. The number zero encodes as "0x0". diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index c7ac94175ac..28dbf3fc124 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -527,6 +527,14 @@ impl WasmInstance { link!("abort", abort, message_ptr, file_name_ptr, line, column); link!("store.get", store_get, "host_export_store_get", entity, id); + link!( + "store.getWhere", + store_get_where, + "host_export_store_get_where", + entity, + field, + id + ); link!( "store.set", store_set, @@ -1059,6 +1067,28 @@ impl WasmInstanceContext { Ok(ret) } + /// function store.getWhere(entity: string, field: string, id: string): Entity[] | null + pub fn store_get_where( + &mut self, + gas: &GasCounter, + entity_ptr: AscPtr, + field_ptr: AscPtr, + id_ptr: AscPtr, + ) -> Result>>, HostExportError> { + let entity_type: String = asc_get(self, entity_ptr, gas)?; + let field: String = asc_get(self, field_ptr, gas)?; + let id: String = asc_get(self, id_ptr, gas)?; + println!("store_get_where: {} {} {}", entity_type, field, id); + let entity_option = self.ctx.host_exports.store_get_where( + &mut self.ctx.state, + entity_type.clone(), + field.clone(), + id.clone(), + gas, + )?; + Ok(AscPtr::null()) + } + /// function typeConversion.bytesToString(bytes: Bytes): string pub fn bytes_to_string( &mut self, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d40e48c68ef..7545a1d0891 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -6,7 +6,7 @@ use diesel::r2d2::{ConnectionManager, PooledConnection}; use graph::anyhow::Context; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{ - EntityKey, EntityType, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, + EntityDerived, EntityKey, EntityType, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, StoredDynamicDataSource, VersionStats, }; use graph::components::versions::VERSIONS; @@ -1119,6 +1119,17 @@ impl DeploymentStore { layout.find_many(&conn, ids_for_type, block) } + pub(crate) fn get_where( + &self, + site: Arc, + key: &EntityDerived, + block: BlockNumber, + ) -> Result, StoreError> { + let conn = self.get_conn()?; + let layout = self.layout(&conn, site)?; + layout.find_where(&conn, key, block) + } + pub(crate) fn get_changes( &self, site: Arc, diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 26cc5ca304b..e9cb5a7ca84 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -49,7 +49,7 @@ use crate::{ FilterQuery, FindManyQuery, FindQuery, InsertQuery, RevertClampQuery, RevertRemoveQuery, }, }; -use graph::components::store::{EntityKey, EntityType}; +use graph::components::store::{EntityDerived, EntityKey, EntityType}; use graph::data::graphql::ext::{DirectiveFinder, DocumentExt, ObjectTypeExt}; use graph::data::schema::{FulltextConfig, FulltextDefinition, Schema, SCHEMA_TYPE_NAME}; use graph::data::store::BYTES_SCALAR; @@ -558,6 +558,17 @@ impl Layout { Ok(entities) } + pub fn find_where( + &self, + conn: &PgConnection, + key: &EntityDerived, + block: BlockNumber, + ) -> Result, StoreError> { + let entities = Vec::new(); + println!("find_where: {:?} {:?}", key, block); + Ok(entities) + } + pub fn find_changes( &self, conn: &PgConnection, diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 62b47b57097..363faf9f5ad 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -5,7 +5,7 @@ use std::{collections::BTreeMap, sync::Arc}; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::ReadStore; -use graph::components::store::{DeploymentCursorTracker, EntityKey}; +use graph::components::store::{DeploymentCursorTracker, EntityDerived, EntityKey}; use graph::data::subgraph::schema; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -251,6 +251,16 @@ impl SyncStore { }) } + fn get_where( + &self, + key: &EntityDerived, + block: BlockNumber, + ) -> Result, StoreError> { + retry::forever(&self.logger, "get_where", || { + self.writable.get_where(self.site.cheap_clone(), key, block) + }) + } + async fn is_deployment_synced(&self) -> Result { retry::forever_async(&self.logger, "is_deployment_synced", || async { self.writable @@ -746,6 +756,12 @@ impl Queue { Ok(map) } + fn get_where(&self, key: &EntityDerived) -> Result, StoreError> { + let tracker = BlockTracker::new(); + // TODO implement the whole async + self.store.get_where(key, tracker.query_block()) + } + /// Load dynamic data sources by looking at both the queue and the store async fn load_dynamic_data_sources( &self, @@ -904,6 +920,13 @@ impl Writer { } } + fn get_where(&self, key: &EntityDerived) -> Result, StoreError> { + match self { + Writer::Sync(store) => store.get_where(key, BLOCK_NUMBER_MAX), + Writer::Async(queue) => queue.get_where(key), + } + } + async fn load_dynamic_data_sources( &self, manifest_idx_and_name: Vec<(u32, String)>, @@ -993,6 +1016,10 @@ impl ReadStore for WritableStore { self.writer.get_many(keys) } + fn get_where(&self, key: &EntityDerived) -> Result, StoreError> { + self.writer.get_where(key) + } + fn input_schema(&self) -> Arc { self.store.input_schema() } From 9661088b2995b0c9c70a0e955d27b9dbaf51d8ac Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 00:06:46 -0300 Subject: [PATCH 02/22] add asc to array of entity --- graph/src/runtime/mod.rs | 1 + runtime/wasm/src/asc_abi/class.rs | 4 ++++ runtime/wasm/src/module/mod.rs | 9 +++++++-- runtime/wasm/src/to_from/external.rs | 13 +++++++++++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/graph/src/runtime/mod.rs b/graph/src/runtime/mod.rs index 74007b96cef..917f4d85d40 100644 --- a/graph/src/runtime/mod.rs +++ b/graph/src/runtime/mod.rs @@ -260,6 +260,7 @@ pub enum IndexForAscTypeId { Log = 1001, ArrayH256 = 1002, ArrayLog = 1003, + ArrayTypedMapStringStoreValue = 1004, // Continue to add more Ethereum type IDs here. // e.g.: // NextEthereumType = 1004, diff --git a/runtime/wasm/src/asc_abi/class.rs b/runtime/wasm/src/asc_abi/class.rs index 0fdac204847..95c14b1bfb3 100644 --- a/runtime/wasm/src/asc_abi/class.rs +++ b/runtime/wasm/src/asc_abi/class.rs @@ -609,6 +609,10 @@ impl AscIndexId for AscTypedMap> { const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::TypedMapStringStoreValue; } +impl AscIndexId for Array> { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArrayTypedMapStringStoreValue; +} + impl AscIndexId for AscTypedMap> { const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::TypedMapStringJsonValue; } diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 28dbf3fc124..d2c9a268647 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -1079,14 +1079,19 @@ impl WasmInstanceContext { let field: String = asc_get(self, field_ptr, gas)?; let id: String = asc_get(self, id_ptr, gas)?; println!("store_get_where: {} {} {}", entity_type, field, id); - let entity_option = self.ctx.host_exports.store_get_where( + let entities = self.ctx.host_exports.store_get_where( &mut self.ctx.state, entity_type.clone(), field.clone(), id.clone(), gas, )?; - Ok(AscPtr::null()) + + let entities: Vec> = entities.iter().map(|entity| entity.clone().sorted()).collect(); + // .map(|name| asc_new(self, &*name, gas)) + // ..collect(); + let ret = asc_new(self, &entities, gas)?; + Ok(ret) } /// function typeConversion.bytesToString(bytes: Bytes): string diff --git a/runtime/wasm/src/to_from/external.rs b/runtime/wasm/src/to_from/external.rs index 69532fbf237..426d224074d 100644 --- a/runtime/wasm/src/to_from/external.rs +++ b/runtime/wasm/src/to_from/external.rs @@ -333,6 +333,19 @@ impl ToAscObj for Vec<(String, store::Value)> { } } +impl ToAscObj>> for Vec> { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result>, DeterministicHostError> + { + let content: Result, _> = self.iter().map(|x| asc_new(heap, &x, gas)).collect(); + let content = content?; + Array::new(&content, heap, gas) + } +} + impl ToAscObj> for serde_json::Value { fn to_asc_obj( &self, From 357d2e91ac516f4c0a73d4af8cfd912820a558c6 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 00:13:48 -0300 Subject: [PATCH 03/22] store: fix lint --- runtime/wasm/src/module/mod.rs | 5 ++++- runtime/wasm/src/to_from/external.rs | 3 +-- store/postgres/src/writable.rs | 3 +-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index d2c9a268647..93aaf482910 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -1087,7 +1087,10 @@ impl WasmInstanceContext { gas, )?; - let entities: Vec> = entities.iter().map(|entity| entity.clone().sorted()).collect(); + let entities: Vec> = entities + .iter() + .map(|entity| entity.clone().sorted()) + .collect(); // .map(|name| asc_new(self, &*name, gas)) // ..collect(); let ret = asc_new(self, &entities, gas)?; diff --git a/runtime/wasm/src/to_from/external.rs b/runtime/wasm/src/to_from/external.rs index 426d224074d..fd8a2e2ad16 100644 --- a/runtime/wasm/src/to_from/external.rs +++ b/runtime/wasm/src/to_from/external.rs @@ -338,8 +338,7 @@ impl ToAscObj>> for Vec> { &self, heap: &mut H, gas: &GasCounter, - ) -> Result>, DeterministicHostError> - { + ) -> Result>, HostExportError> { let content: Result, _> = self.iter().map(|x| asc_new(heap, &x, gas)).collect(); let content = content?; Array::new(&content, heap, gas) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 363faf9f5ad..758d87657ec 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -4,8 +4,7 @@ use std::sync::Mutex; use std::{collections::BTreeMap, sync::Arc}; use graph::blockchain::block_stream::FirehoseCursor; -use graph::components::store::ReadStore; -use graph::components::store::{DeploymentCursorTracker, EntityDerived, EntityKey}; +use graph::components::store::{DeploymentCursorTracker, EntityDerived, EntityKey, ReadStore}; use graph::data::subgraph::schema; use graph::data_source::CausalityRegion; use graph::prelude::{ From f32ac45c2eabf4be0cad929f919f6ab91cd8b934 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 11:51:56 -0300 Subject: [PATCH 04/22] store: add FindDerivedQuery --- store/postgres/src/relational.rs | 11 ++++- store/postgres/src/relational_queries.rs | 58 +++++++++++++++++++++++- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index e9cb5a7ca84..b6cdcfc3674 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -564,8 +564,15 @@ impl Layout { key: &EntityDerived, block: BlockNumber, ) -> Result, StoreError> { - let entities = Vec::new(); - println!("find_where: {:?} {:?}", key, block); + let table = self.table_for_entity(&key.entity_type)?; + let query = FindDerivedQuery::new(table, key, block); + + let mut entities = Vec::new(); + + for data in query.load::(conn)? { + let entity_data: Entity = data.deserialize_with_layout(self, None, true)?; + entities.push(entity_data); + } Ok(entities) } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index bb60880935c..a07c654bf8a 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -12,7 +12,7 @@ use diesel::result::{Error as DieselError, QueryResult}; use diesel::sql_types::{Array, BigInt, Binary, Bool, Integer, Jsonb, Text}; use diesel::Connection; -use graph::components::store::EntityKey; +use graph::components::store::{EntityKey, EntityDerived}; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -1668,6 +1668,62 @@ impl<'a> LoadQuery for FindManyQuery<'a> { impl<'a, Conn> RunQueryDsl for FindManyQuery<'a> {} +/// A query that finds an entity by key. Used during indexing. +/// See also `FindManyQuery`. +#[derive(Debug, Clone, Constructor)] +pub struct FindDerivedQuery<'a> { + table: &'a Table, + key: &'a EntityDerived, + block: BlockNumber, +} + +impl<'a> QueryFragment for FindDerivedQuery<'a> { + fn walk_ast(&self, mut out: AstPass) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + + let EntityDerived { + entity_type: _, + entity_field, + entity_id, + causality_region, + } = self.key; + + // Generate + // select '..' as entity, to_jsonb(e.*) as data + // from schema.table e where id = $1 + out.push_sql("select "); + out.push_bind_param::(&self.table.object.as_str())?; + out.push_sql(" as entity, to_jsonb(e.*) as data\n"); + out.push_sql(" from "); + out.push_sql(self.table.qualified_name.as_str()); + out.push_sql(" e\n where "); + out.push_identifier(entity_field.as_str())?; + out.push_sql(" = "); + out.push_bind_param::(&entity_id.as_str())?; + out.push_sql(" and "); + if self.table.has_causality_region { + out.push_sql("causality_region = "); + out.push_bind_param::(causality_region)?; + out.push_sql(" and "); + } + BlockRangeColumn::new(self.table, "e.", self.block).contains(&mut out) + } +} + +impl<'a> QueryId for FindDerivedQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a> LoadQuery for FindDerivedQuery<'a> { + fn internal_load(self, conn: &PgConnection) -> QueryResult> { + conn.query_by_name(&self) + } +} + +impl<'a, Conn> RunQueryDsl for FindDerivedQuery<'a> {} + #[derive(Debug)] pub struct InsertQuery<'a> { table: &'a Table, From ef355141a2fc719bed8f6fdb33ba480b97e7d9ed Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 11:56:18 -0300 Subject: [PATCH 05/22] store,runtime,graph: refact where to derived --- graph/src/components/store/entity_cache.rs | 21 +++++++++------------ graph/src/components/store/mod.rs | 5 ++++- graph/src/components/store/traits.rs | 12 +++++++++--- runtime/wasm/src/host_exports.rs | 5 +++-- runtime/wasm/src/module/mod.rs | 14 +++++++------- store/postgres/src/deployment_store.rs | 4 ++-- store/postgres/src/relational.rs | 6 ++++-- store/postgres/src/writable.rs | 16 ++++++++-------- 8 files changed, 46 insertions(+), 37 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 375000d242b..1db87565d47 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -120,21 +120,17 @@ impl EntityCache { let list = list.iter(); list.for_each(|(k, v)| println!("{}: {:?}", k, v)); } - None => println!("No entity found for {:?}", eref), + None => println!("get: No entity found for {:?}", eref), } Ok(entity) } - pub fn get_where( + pub fn get_derived( &mut self, eref: &EntityDerived, ) -> Result, s::QueryExecutionError> { - println!("HELLO WORLD, YOU TRIGERED store.getWhere"); - self.current - .get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; - let entity = self - .current - .get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; + self.current.get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; + let entity = self.current.get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; let entities = match entity { Some(e) => { // retrieve the list from the cache @@ -154,11 +150,11 @@ impl EntityCache { entities } None => { - println!("No entity found for {:?}", eref); + println!("get_derived: No entity found for {:?}", eref); Vec::new() } }; - // self.store.get_where() + // self.store.get_derived() // todo!(); Ok(entities) } @@ -377,7 +373,7 @@ impl LfuCache> { } EntityMultiKey::All(derived) => { // we get all entities with the derived field - let mut entities = store.get_where(derived)?; + let mut entities = store.get_derived(derived)?; // we asssume that derived fields contains ids let entities = entities .iter_mut() @@ -398,7 +394,8 @@ impl LfuCache> { entity.insert(derived.entity_field.to_string(), Value::List(entities)); // insert to cache the list of ids - self.insert(key.clone(), Some(entity.clone())); + // todo remove this comment + // self.insert(key.clone(), Some(entity.clone())); Ok(Some(entity.clone())) } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index f8304fd3f14..6ff6aa0a733 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1163,7 +1163,10 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } - fn get_where(&self, _query: &EntityDerived) -> Result, StoreError> { + fn get_derived( + &self, + _query: &EntityDerived, + ) -> Result, StoreError> { Ok(vec![]) } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index c897dfb1178..b33bd1ce5f2 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -187,7 +187,10 @@ pub trait ReadStore: Send + Sync + 'static { ) -> Result, StoreError>; /// Reverse lookup - fn get_where(&self, entity_derived: &EntityDerived) -> Result, StoreError>; + fn get_derived( + &self, + entity_derived: &EntityDerived + ) -> Result, StoreError>; fn input_schema(&self) -> Arc; } @@ -205,8 +208,11 @@ impl ReadStore for Arc { (**self).get_many(keys) } - fn get_where(&self, entity_derived: &EntityDerived) -> Result, StoreError> { - (**self).get_where(entity_derived) + fn get_derived( + &self, + entity_derived: &EntityDerived + ) -> Result, StoreError> { + (**self).get_derived(entity_derived) } fn input_schema(&self) -> Arc { diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index e514314f509..6bb2f89f9ce 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -239,7 +239,8 @@ impl HostExports { Ok(result) } - pub(crate) fn store_get_where( + + pub(crate) fn store_get_derived( &self, state: &mut BlockState, entity_type: String, @@ -255,7 +256,7 @@ impl HostExports { }; self.check_entity_type_access(&store_key.entity_type)?; - let result = state.entity_cache.get_where(&store_key)?; + let result = state.entity_cache.get_derived(&store_key)?; gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?; Ok(result) diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 93aaf482910..795070b4ddd 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -528,9 +528,9 @@ impl WasmInstance { link!("store.get", store_get, "host_export_store_get", entity, id); link!( - "store.getWhere", - store_get_where, - "host_export_store_get_where", + "store.getDerived", + store_get_derived, + "host_export_store_get_derived", entity, field, id @@ -1067,8 +1067,8 @@ impl WasmInstanceContext { Ok(ret) } - /// function store.getWhere(entity: string, field: string, id: string): Entity[] | null - pub fn store_get_where( + /// function store.getDerived(entity: string, field: string, id: string): Entity[] + pub fn store_get_derived( &mut self, gas: &GasCounter, entity_ptr: AscPtr, @@ -1078,8 +1078,8 @@ impl WasmInstanceContext { let entity_type: String = asc_get(self, entity_ptr, gas)?; let field: String = asc_get(self, field_ptr, gas)?; let id: String = asc_get(self, id_ptr, gas)?; - println!("store_get_where: {} {} {}", entity_type, field, id); - let entities = self.ctx.host_exports.store_get_where( + println!("store_get_derived: {} {} {}", entity_type, field, id); + let entities = self.ctx.host_exports.store_get_derived( &mut self.ctx.state, entity_type.clone(), field.clone(), diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 7545a1d0891..08471490443 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1119,7 +1119,7 @@ impl DeploymentStore { layout.find_many(&conn, ids_for_type, block) } - pub(crate) fn get_where( + pub(crate) fn get_derived( &self, site: Arc, key: &EntityDerived, @@ -1127,7 +1127,7 @@ impl DeploymentStore { ) -> Result, StoreError> { let conn = self.get_conn()?; let layout = self.layout(&conn, site)?; - layout.find_where(&conn, key, block) + layout.find_derived(&conn, key, block) } pub(crate) fn get_changes( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index b6cdcfc3674..4cd5e97cea3 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -41,7 +41,7 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use crate::relational_queries::{FindChangesQuery, FindPossibleDeletionsQuery}; +use crate::relational_queries::{FindChangesQuery, FindPossibleDeletionsQuery, FindDerivedQuery}; use crate::{ primary::{Namespace, Site}, relational_queries::{ @@ -558,7 +558,7 @@ impl Layout { Ok(entities) } - pub fn find_where( + pub fn find_derived( &self, conn: &PgConnection, key: &EntityDerived, @@ -731,6 +731,8 @@ impl Layout { query.query_id, &self.site, )?; + println!("{}", debug_query(&query).to_string()); + let query_clone = query.clone(); let start = Instant::now(); diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 758d87657ec..5d587c1cd1f 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -250,7 +250,7 @@ impl SyncStore { }) } - fn get_where( + fn get_derived( &self, key: &EntityDerived, block: BlockNumber, @@ -755,10 +755,10 @@ impl Queue { Ok(map) } - fn get_where(&self, key: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, key: &EntityDerived) -> Result, StoreError> { let tracker = BlockTracker::new(); // TODO implement the whole async - self.store.get_where(key, tracker.query_block()) + self.store.get_derived(key, tracker.query_block()) } /// Load dynamic data sources by looking at both the queue and the store @@ -919,10 +919,10 @@ impl Writer { } } - fn get_where(&self, key: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, key: &EntityDerived) -> Result, StoreError> { match self { - Writer::Sync(store) => store.get_where(key, BLOCK_NUMBER_MAX), - Writer::Async(queue) => queue.get_where(key), + Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX), + Writer::Async(queue) => queue.get_derived(key), } } @@ -1015,8 +1015,8 @@ impl ReadStore for WritableStore { self.writer.get_many(keys) } - fn get_where(&self, key: &EntityDerived) -> Result, StoreError> { - self.writer.get_where(key) + fn get_derived(&self, key: &EntityDerived) -> Result, StoreError> { + self.writer.get_derived(key) } fn input_schema(&self) -> Arc { From c54887ba284a4ac2146ca8e5d836ad8a58b91fc9 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 12:02:31 -0300 Subject: [PATCH 06/22] graph: update EntityMultiKey enum --- graph/src/components/store/entity_cache.rs | 36 ++++++++-------------- graph/src/components/store/mod.rs | 9 ++---- graph/src/components/store/traits.rs | 10 ++---- graph/src/runtime/gas/size_of.rs | 4 +-- graph/src/util/cache_weight.rs | 4 +-- runtime/wasm/src/host_exports.rs | 1 - store/postgres/src/relational.rs | 4 +-- store/postgres/src/relational_queries.rs | 2 +- store/postgres/src/writable.rs | 3 +- 9 files changed, 27 insertions(+), 46 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 1db87565d47..29eaea5d616 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -103,7 +103,7 @@ impl EntityCache { // from `handler_updates`. let mut entity = self .current - .get_entity(&*self.store, &EntityMultiKey::Equal(eref.clone()))?; + .get_entity(&*self.store, &EntityMultiKey::Single(eref.clone()))?; // Always test the cache consistency in debug mode. debug_assert!(entity == self.store.get(eref).unwrap()); @@ -114,14 +114,6 @@ impl EntityCache { if let Some(op) = self.handler_updates.get(eref).cloned() { entity = op.apply_to(entity) } - match entity { - Some(ref e) => { - let list = e.clone().sorted(); - let list = list.iter(); - list.for_each(|(k, v)| println!("{}: {:?}", k, v)); - } - None => println!("get: No entity found for {:?}", eref), - } Ok(entity) } @@ -129,8 +121,11 @@ impl EntityCache { &mut self, eref: &EntityDerived, ) -> Result, s::QueryExecutionError> { - self.current.get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; - let entity = self.current.get_entity(&*self.store, &EntityMultiKey::All(eref.clone()))?; + self.current + .get_entity(&*self.store, &EntityMultiKey::Derived(eref.clone()))?; + let entity = self + .current + .get_entity(&*self.store, &EntityMultiKey::Derived(eref.clone()))?; let entities = match entity { Some(e) => { // retrieve the list from the cache @@ -149,13 +144,8 @@ impl EntityCache { } entities } - None => { - println!("get_derived: No entity found for {:?}", eref); - Vec::new() - } + None => Vec::new(), }; - // self.store.get_derived() - // todo!(); Ok(entities) } @@ -271,7 +261,7 @@ impl EntityCache { let missing = self.updates.keys().filter(|key| { !self .current - .contains_key(&EntityMultiKey::Equal((*key).clone())) + .contains_key(&EntityMultiKey::Single((*key).clone())) }); // For immutable types, we assume that the subgraph is well-behaved, @@ -285,13 +275,13 @@ impl EntityCache { for (entity_key, entity) in self.store.get_many(missing.cloned().collect())? { self.current - .insert(EntityMultiKey::Equal(entity_key), Some(entity)); + .insert(EntityMultiKey::Single(entity_key), Some(entity)); } let mut mods = Vec::new(); for (entity_key, update) in self.updates { use s::EntityModification::*; - let key = EntityMultiKey::Equal(entity_key.clone()); + let key = EntityMultiKey::Single(entity_key.clone()); let current = self.current.remove(&key).and_then(|entity| entity); let modification = match (current, update) { @@ -362,7 +352,7 @@ impl LfuCache> { ) -> Result, s::QueryExecutionError> { match self.get(key) { None => match key { - EntityMultiKey::Equal(store_key) => { + EntityMultiKey::Single(store_key) => { let mut entity = store.get(store_key)?; if let Some(entity) = &mut entity { // `__typename` is for queries not for mappings. @@ -371,7 +361,7 @@ impl LfuCache> { self.insert(key.clone(), entity.clone()); Ok(entity) } - EntityMultiKey::All(derived) => { + EntityMultiKey::Derived(derived) => { // we get all entities with the derived field let mut entities = store.get_derived(derived)?; // we asssume that derived fields contains ids @@ -383,7 +373,7 @@ impl LfuCache> { // we insert each entity into the cache with the id key let key = EntityKey::from(&entity.id().unwrap().into(), derived); - self.insert(EntityMultiKey::Equal(key), Some(entity.clone())); + self.insert(EntityMultiKey::Single(key), Some(entity.clone())); // return the value to save Value::String(entity.id().unwrap()) }); diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 6ff6aa0a733..7c35cb359ca 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -159,10 +159,10 @@ pub struct EntityDerived { #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub enum EntityMultiKey { /// A filter that matches all entities of a given type. - All(EntityDerived), + Derived(EntityDerived), /// A filter that matches a specific entity. - Equal(EntityKey), + Single(EntityKey), } impl EntityKey { @@ -1163,10 +1163,7 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } - fn get_derived( - &self, - _query: &EntityDerived, - ) -> Result, StoreError> { + fn get_derived(&self, _query: &EntityDerived) -> Result, StoreError> { Ok(vec![]) } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index b33bd1ce5f2..d100dfaa0d8 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -187,10 +187,7 @@ pub trait ReadStore: Send + Sync + 'static { ) -> Result, StoreError>; /// Reverse lookup - fn get_derived( - &self, - entity_derived: &EntityDerived - ) -> Result, StoreError>; + fn get_derived(&self, entity_derived: &EntityDerived) -> Result, StoreError>; fn input_schema(&self) -> Arc; } @@ -208,10 +205,7 @@ impl ReadStore for Arc { (**self).get_many(keys) } - fn get_derived( - &self, - entity_derived: &EntityDerived - ) -> Result, StoreError> { + fn get_derived(&self, entity_derived: &EntityDerived) -> Result, StoreError> { (**self).get_derived(entity_derived) } diff --git a/graph/src/runtime/gas/size_of.rs b/graph/src/runtime/gas/size_of.rs index 4f1218955af..b6b717c9979 100644 --- a/graph/src/runtime/gas/size_of.rs +++ b/graph/src/runtime/gas/size_of.rs @@ -179,8 +179,8 @@ impl GasSizeOf for EntityDerived { impl GasSizeOf for EntityMultiKey { fn gas_size_of(&self) -> Gas { match self { - EntityMultiKey::Equal(key) => key.gas_size_of(), - EntityMultiKey::All(key) => key.gas_size_of(), + EntityMultiKey::Single(key) => key.gas_size_of(), + EntityMultiKey::Derived(key) => key.gas_size_of(), } } } diff --git a/graph/src/util/cache_weight.rs b/graph/src/util/cache_weight.rs index 790e5a80f8e..6c1feaf9e83 100644 --- a/graph/src/util/cache_weight.rs +++ b/graph/src/util/cache_weight.rs @@ -138,8 +138,8 @@ impl CacheWeight for EntityDerived { impl CacheWeight for EntityMultiKey { fn indirect_weight(&self) -> usize { match self { - EntityMultiKey::All(derived) => derived.indirect_weight(), - EntityMultiKey::Equal(key) => key.indirect_weight(), + EntityMultiKey::Derived(derived) => derived.indirect_weight(), + EntityMultiKey::Single(key) => key.indirect_weight(), } } } diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 6bb2f89f9ce..a22f2395aae 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -239,7 +239,6 @@ impl HostExports { Ok(result) } - pub(crate) fn store_get_derived( &self, state: &mut BlockState, diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 4cd5e97cea3..a572e6a29f6 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -41,7 +41,7 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use crate::relational_queries::{FindChangesQuery, FindPossibleDeletionsQuery, FindDerivedQuery}; +use crate::relational_queries::{FindChangesQuery, FindDerivedQuery, FindPossibleDeletionsQuery}; use crate::{ primary::{Namespace, Site}, relational_queries::{ @@ -566,7 +566,7 @@ impl Layout { ) -> Result, StoreError> { let table = self.table_for_entity(&key.entity_type)?; let query = FindDerivedQuery::new(table, key, block); - + let mut entities = Vec::new(); for data in query.load::(conn)? { diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index a07c654bf8a..f7f40fa91c0 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -12,7 +12,7 @@ use diesel::result::{Error as DieselError, QueryResult}; use diesel::sql_types::{Array, BigInt, Binary, Bool, Integer, Jsonb, Text}; use diesel::Connection; -use graph::components::store::{EntityKey, EntityDerived}; +use graph::components::store::{EntityDerived, EntityKey}; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::prelude::{ diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 5d587c1cd1f..b259d460425 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -256,7 +256,8 @@ impl SyncStore { block: BlockNumber, ) -> Result, StoreError> { retry::forever(&self.logger, "get_where", || { - self.writable.get_where(self.site.cheap_clone(), key, block) + self.writable + .get_derived(self.site.cheap_clone(), key, block) }) } From f0b809101e1b6bee2e50c7474157cab9bfd7672b Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 12:07:26 -0300 Subject: [PATCH 07/22] store,runtime: cleanup code --- runtime/wasm/src/module/mod.rs | 7 +------ store/postgres/src/relational.rs | 1 - store/postgres/src/relational_queries.rs | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 795070b4ddd..5a1e9a14c82 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -1087,12 +1087,7 @@ impl WasmInstanceContext { gas, )?; - let entities: Vec> = entities - .iter() - .map(|entity| entity.clone().sorted()) - .collect(); - // .map(|name| asc_new(self, &*name, gas)) - // ..collect(); + let entities: Vec> = entities.iter().map(|entity| entity.clone().sorted()).collect(); let ret = asc_new(self, &entities, gas)?; Ok(ret) } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index a572e6a29f6..13c9bb90bcf 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -731,7 +731,6 @@ impl Layout { query.query_id, &self.site, )?; - println!("{}", debug_query(&query).to_string()); let query_clone = query.clone(); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index f7f40fa91c0..0bd28e6e3e5 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -1690,7 +1690,7 @@ impl<'a> QueryFragment for FindDerivedQuery<'a> { // Generate // select '..' as entity, to_jsonb(e.*) as data - // from schema.table e where id = $1 + // from schema.table e where field = $1 out.push_sql("select "); out.push_bind_param::(&self.table.object.as_str())?; out.push_sql(" as entity, to_jsonb(e.*) as data\n"); From cc16efb1e1f4188dd141a0e57b9a56279e4fc469 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 14:13:52 -0300 Subject: [PATCH 08/22] graph: fix missing get_derived on mock --- graph/tests/entity_cache.rs | 10 +++++++++- runtime/wasm/src/module/mod.rs | 1 - 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index bbec082ec3b..bbf0df9c13f 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use graph::components::store::{ DeploymentCursorTracker, EntityKey, EntityType, ReadStore, StoredDynamicDataSource, - WritableStore, + WritableStore, EntityDerived, }; use graph::{ components::store::{DeploymentId, DeploymentLocator}, @@ -60,6 +60,14 @@ impl ReadStore for MockStore { Ok(self.get_many_res.clone()) } + fn get_derived( + &self, + _key: &EntityDerived, + ) -> Result, StoreError> { + let values: Vec = self.get_many_res.clone().into_iter().map(|(_, v)| v).collect(); + Ok(values) + } + fn input_schema(&self) -> Arc { SCHEMA.clone() } diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 5a1e9a14c82..bd6d606ff30 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -1078,7 +1078,6 @@ impl WasmInstanceContext { let entity_type: String = asc_get(self, entity_ptr, gas)?; let field: String = asc_get(self, field_ptr, gas)?; let id: String = asc_get(self, id_ptr, gas)?; - println!("store_get_derived: {} {} {}", entity_type, field, id); let entities = self.ctx.host_exports.store_get_derived( &mut self.ctx.state, entity_type.clone(), From e2a796e4f33285106df0036c9d540c994a932e6e Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 14:16:31 -0300 Subject: [PATCH 09/22] all: cargo format --- graph/tests/entity_cache.rs | 16 +++++++++------- runtime/wasm/src/module/mod.rs | 5 ++++- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index bbf0df9c13f..7024db0de5e 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -10,8 +10,8 @@ use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use graph::components::store::{ - DeploymentCursorTracker, EntityKey, EntityType, ReadStore, StoredDynamicDataSource, - WritableStore, EntityDerived, + DeploymentCursorTracker, EntityDerived, EntityKey, EntityType, ReadStore, + StoredDynamicDataSource, WritableStore, }; use graph::{ components::store::{DeploymentId, DeploymentLocator}, @@ -60,11 +60,13 @@ impl ReadStore for MockStore { Ok(self.get_many_res.clone()) } - fn get_derived( - &self, - _key: &EntityDerived, - ) -> Result, StoreError> { - let values: Vec = self.get_many_res.clone().into_iter().map(|(_, v)| v).collect(); + fn get_derived(&self, _key: &EntityDerived) -> Result, StoreError> { + let values: Vec = self + .get_many_res + .clone() + .into_iter() + .map(|(_, v)| v) + .collect(); Ok(values) } diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index bd6d606ff30..27d6dd69693 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -1086,7 +1086,10 @@ impl WasmInstanceContext { gas, )?; - let entities: Vec> = entities.iter().map(|entity| entity.clone().sorted()).collect(); + let entities: Vec> = entities + .iter() + .map(|entity| entity.clone().sorted()) + .collect(); let ret = asc_new(self, &entities, gas)?; Ok(ret) } From 9ff225f0edbe96fed3bd53b762e1c11d79a37767 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 18:50:24 -0300 Subject: [PATCH 10/22] change to load_related, use entitykey in cache --- core/src/subgraph/state.rs | 4 +- graph/src/components/store/entity_cache.rs | 141 +++++++-------------- graph/src/components/store/mod.rs | 14 +- graph/src/components/subgraph/instance.rs | 4 +- graph/src/data/schema.rs | 39 +++++- graph/src/runtime/gas/size_of.rs | 11 +- graph/src/util/cache_weight.rs | 19 +-- runtime/wasm/src/host_exports.rs | 6 +- runtime/wasm/src/module/mod.rs | 24 ++-- 9 files changed, 104 insertions(+), 158 deletions(-) diff --git a/core/src/subgraph/state.rs b/core/src/subgraph/state.rs index 19eab13b2a0..0d5edd84b65 100644 --- a/core/src/subgraph/state.rs +++ b/core/src/subgraph/state.rs @@ -1,5 +1,5 @@ use graph::{ - components::store::EntityMultiKey, + components::store::EntityKey, prelude::Entity, util::{backoff::ExponentialBackoff, lfu_cache::LfuCache}, }; @@ -18,5 +18,5 @@ pub struct IndexingState { /// - The time THRESHOLD is passed /// - Or the subgraph has triggers for the block pub skip_ptr_updates_timer: Instant, - pub entity_lfu_cache: LfuCache>, + pub entity_lfu_cache: LfuCache>, } diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 29eaea5d616..1a84d2ab3ea 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -3,11 +3,11 @@ use std::collections::HashMap; use std::fmt::{self, Debug}; use std::sync::Arc; -use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOperation, Value}; +use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOperation}; use crate::prelude::{Schema, ENV_VARS}; use crate::util::lfu_cache::LfuCache; -use super::{EntityDerived, EntityMultiKey}; +use super::EntityDerived; /// A cache for entities from the store that provides the basic functionality /// needed for the store interactions in the host exports. This struct tracks @@ -19,7 +19,7 @@ use super::{EntityDerived, EntityMultiKey}; pub struct EntityCache { /// The state of entities in the store. An entry of `None` /// means that the entity is not present in the store - current: LfuCache>, + current: LfuCache>, /// The accumulated changes to an entity. updates: HashMap, @@ -47,7 +47,7 @@ impl Debug for EntityCache { pub struct ModificationsAndCache { pub modifications: Vec, - pub entity_lfu_cache: LfuCache>, + pub entity_lfu_cache: LfuCache>, } impl EntityCache { @@ -64,7 +64,7 @@ impl EntityCache { pub fn with_current( store: Arc, - current: LfuCache>, + current: LfuCache>, ) -> EntityCache { EntityCache { current, @@ -101,9 +101,7 @@ impl EntityCache { pub fn get(&mut self, eref: &EntityKey) -> Result, s::QueryExecutionError> { // Get the current entity, apply any updates from `updates`, then // from `handler_updates`. - let mut entity = self - .current - .get_entity(&*self.store, &EntityMultiKey::Single(eref.clone()))?; + let mut entity = self.current.get_entity(&*self.store, eref)?; // Always test the cache consistency in debug mode. debug_assert!(entity == self.store.get(eref).unwrap()); @@ -117,35 +115,24 @@ impl EntityCache { Ok(entity) } - pub fn get_derived( - &mut self, - eref: &EntityDerived, - ) -> Result, s::QueryExecutionError> { - self.current - .get_entity(&*self.store, &EntityMultiKey::Derived(eref.clone()))?; - let entity = self - .current - .get_entity(&*self.store, &EntityMultiKey::Derived(eref.clone()))?; - let entities = match entity { - Some(e) => { - // retrieve the list from the cache - let mut entities = Vec::new(); - if let Some(Value::List(list)) = e.get(eref.entity_field.as_str()) { - for id in list.iter() { - // we just created - if let Value::String(id) = id { - let key = EntityKey::from(id, eref); - match self.get(&key) { - Ok(Some(value)) => entities.push(value), - _ => (), - } - } - } - } - entities - } - None => Vec::new(), + pub fn load_related(&mut self, eref: &EntityDerived) -> Result, anyhow::Error> { + let (base_type, field) = self.schema.get_type_for_field(eref)?; + + let key = EntityDerived { + entity_id: eref.entity_id.clone(), + entity_field: field.into(), + entity_type: base_type.into(), + causality_region: eref.causality_region, }; + + let entities = self.store.get_derived(&key)?; + entities + .iter() + .filter(|e| e.contains_key("id")) + .for_each(|e| { + let key = EntityKey::from(&e.id().unwrap().into(), eref); + self.current.insert(key, Some(e.clone())); + }); Ok(entities) } @@ -258,11 +245,10 @@ impl EntityCache { // The first step is to make sure all entities being set are in `self.current`. // For each subgraph, we need a map of entity type to missing entity ids. - let missing = self.updates.keys().filter(|key| { - !self - .current - .contains_key(&EntityMultiKey::Single((*key).clone())) - }); + let missing = self + .updates + .keys() + .filter(|key| !self.current.contains_key(key)); // For immutable types, we assume that the subgraph is well-behaved, // and all updated immutable entities are in fact new, and skip @@ -274,14 +260,12 @@ impl EntityCache { let missing = missing.filter(|key| !self.schema.is_immutable(&key.entity_type)); for (entity_key, entity) in self.store.get_many(missing.cloned().collect())? { - self.current - .insert(EntityMultiKey::Single(entity_key), Some(entity)); + self.current.insert(entity_key, Some(entity)); } let mut mods = Vec::new(); - for (entity_key, update) in self.updates { + for (key, update) in self.updates { use s::EntityModification::*; - let key = EntityMultiKey::Single(entity_key.clone()); let current = self.current.remove(&key).and_then(|entity| entity); let modification = match (current, update) { @@ -291,10 +275,7 @@ impl EntityCache { let mut data = Entity::new(); data.merge_remove_null_fields(updates); self.current.insert(key.clone(), Some(data.clone())); - Some(Insert { - key: entity_key, - data, - }) + Some(Insert { key, data }) } // Entity may have been changed (Some(current), EntityOp::Update(updates)) => { @@ -302,10 +283,7 @@ impl EntityCache { data.merge_remove_null_fields(updates); self.current.insert(key.clone(), Some(data.clone())); if current != data { - Some(Overwrite { - key: entity_key, - data, - }) + Some(Overwrite { key, data }) } else { None } @@ -314,10 +292,7 @@ impl EntityCache { (Some(current), EntityOp::Overwrite(data)) => { self.current.insert(key.clone(), Some(data.clone())); if current != data { - Some(Overwrite { - key: entity_key, - data, - }) + Some(Overwrite { key, data }) } else { None } @@ -325,7 +300,7 @@ impl EntityCache { // Existing entity was deleted (Some(_), EntityOp::Remove) => { self.current.insert(key.clone(), None); - Some(Remove { key: entity_key }) + Some(Remove { key }) } // Entity was deleted, but it doesn't exist in the store (None, EntityOp::Remove) => None, @@ -343,53 +318,23 @@ impl EntityCache { } } -impl LfuCache> { +impl LfuCache> { // Helper for cached lookup of an entity. fn get_entity( &mut self, store: &(impl s::ReadStore + ?Sized), - key: &EntityMultiKey, + key: &EntityKey, ) -> Result, s::QueryExecutionError> { match self.get(key) { - None => match key { - EntityMultiKey::Single(store_key) => { - let mut entity = store.get(store_key)?; - if let Some(entity) = &mut entity { - // `__typename` is for queries not for mappings. - entity.remove("__typename"); - } - self.insert(key.clone(), entity.clone()); - Ok(entity) - } - EntityMultiKey::Derived(derived) => { - // we get all entities with the derived field - let mut entities = store.get_derived(derived)?; - // we asssume that derived fields contains ids - let entities = entities - .iter_mut() - .filter(|entity| entity.contains_key("id")) - .map(|entity| { - entity.remove("__typename"); - - // we insert each entity into the cache with the id key - let key = EntityKey::from(&entity.id().unwrap().into(), derived); - self.insert(EntityMultiKey::Single(key), Some(entity.clone())); - // return the value to save - Value::String(entity.id().unwrap()) - }); - let entities = entities.collect(); - - // create entity with the list of ids - let mut entity = Entity::new(); - entity.insert(derived.entity_field.to_string(), Value::List(entities)); - - // insert to cache the list of ids - // todo remove this comment - // self.insert(key.clone(), Some(entity.clone())); - - Ok(Some(entity.clone())) + None => { + let mut entity = store.get(key)?; + if let Some(entity) = &mut entity { + // `__typename` is for queries not for mappings. + entity.remove("__typename"); } - }, + self.insert(key.clone(), entity.clone()); + Ok(entity) + } Some(data) => Ok(data.clone()), } } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 7c35cb359ca..776592868e3 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -142,12 +142,11 @@ pub struct EntityKey { pub struct EntityDerived { /// Name of the entity type. pub entity_type: EntityType, - - pub entity_field: Word, - /// ID of the individual entity. pub entity_id: Word, + pub entity_field: Word, + /// This is the causality region of the data source that created the entity. /// /// In the case of an entity lookup, this is the causality region of the data source that is @@ -156,15 +155,6 @@ pub struct EntityDerived { pub causality_region: CausalityRegion, } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum EntityMultiKey { - /// A filter that matches all entities of a given type. - Derived(EntityDerived), - - /// A filter that matches a specific entity. - Single(EntityKey), -} - impl EntityKey { // For use in tests only #[cfg(debug_assertions)] diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index b7f2a1efe19..f3df2c672e4 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -1,6 +1,6 @@ use crate::{ blockchain::Blockchain, - components::store::{EntityMultiKey, ReadStore, StoredDynamicDataSource}, + components::store::{EntityKey, ReadStore, StoredDynamicDataSource}, data::subgraph::schema::SubgraphError, data_source::DataSourceTemplate, prelude::*, @@ -35,7 +35,7 @@ pub struct BlockState { } impl BlockState { - pub fn new(store: impl ReadStore, lfu_cache: LfuCache>) -> Self { + pub fn new(store: impl ReadStore, lfu_cache: LfuCache>) -> Self { BlockState { entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache), deterministic_errors: Vec::new(), diff --git a/graph/src/data/schema.rs b/graph/src/data/schema.rs index 899ef70fc93..216e7952229 100644 --- a/graph/src/data/schema.rs +++ b/graph/src/data/schema.rs @@ -1,5 +1,5 @@ use crate::cheap_clone::CheapClone; -use crate::components::store::{EntityKey, EntityType}; +use crate::components::store::{EntityDerived, EntityKey, EntityType, SubgraphStore}; use crate::data::graphql::ext::{DirectiveExt, DirectiveFinder, DocumentExt, TypeExt, ValueExt}; use crate::data::graphql::ObjectTypeExt; use crate::data::store::{self, ValueType}; @@ -539,6 +539,43 @@ impl Schema { } } + pub fn get_type_for_field(&self, key: &EntityDerived) -> Result<(&str, &str), Error> { + let field = self + .document + .get_object_type_definition(key.entity_type.as_str()) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown entity type `{}`", + key.entity_type, + key.entity_id, + key.entity_type, + ) + })? + .field(&key.entity_field) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown field `{}`", + key.entity_type, + key.entity_id, + key.entity_field, + ) + })?; + if field.is_derived() { + let derived_from = field.find_directive("derivedFrom").unwrap(); + let base_type = field.field_type.get_base_type(); + let field = derived_from.argument("field").unwrap(); + + Ok((base_type, field.as_str().unwrap())) + } else { + Err(anyhow!( + "Entity {}[{}]: field `{}` is not derived", + key.entity_type, + key.entity_id, + key.entity_field, + )) + } + } + pub fn is_immutable(&self, entity_type: &EntityType) -> bool { self.immutable_types.contains(entity_type) } diff --git a/graph/src/runtime/gas/size_of.rs b/graph/src/runtime/gas/size_of.rs index b6b717c9979..4466919dd01 100644 --- a/graph/src/runtime/gas/size_of.rs +++ b/graph/src/runtime/gas/size_of.rs @@ -1,7 +1,7 @@ //! Various implementations of GasSizeOf; use crate::{ - components::store::{EntityDerived, EntityKey, EntityMultiKey, EntityType}, + components::store::{EntityDerived, EntityKey, EntityType}, data::store::{scalar::Bytes, Value}, prelude::{BigDecimal, BigInt}, }; @@ -176,15 +176,6 @@ impl GasSizeOf for EntityDerived { } } -impl GasSizeOf for EntityMultiKey { - fn gas_size_of(&self) -> Gas { - match self { - EntityMultiKey::Single(key) => key.gas_size_of(), - EntityMultiKey::Derived(key) => key.gas_size_of(), - } - } -} - impl GasSizeOf for EntityType { fn gas_size_of(&self) -> Gas { self.as_str().gas_size_of() diff --git a/graph/src/util/cache_weight.rs b/graph/src/util/cache_weight.rs index 6c1feaf9e83..af15a82b25d 100644 --- a/graph/src/util/cache_weight.rs +++ b/graph/src/util/cache_weight.rs @@ -1,5 +1,5 @@ use crate::{ - components::store::{EntityDerived, EntityKey, EntityMultiKey, EntityType}, + components::store::{EntityKey, EntityType}, data::value::Word, prelude::{q, BigDecimal, BigInt, Value}, }; @@ -127,23 +127,6 @@ impl CacheWeight for EntityKey { } } -impl CacheWeight for EntityDerived { - fn indirect_weight(&self) -> usize { - self.entity_id.indirect_weight() - + self.entity_type.indirect_weight() - + self.entity_field.indirect_weight() - } -} - -impl CacheWeight for EntityMultiKey { - fn indirect_weight(&self) -> usize { - match self { - EntityMultiKey::Derived(derived) => derived.indirect_weight(), - EntityMultiKey::Single(key) => key.indirect_weight(), - } - } -} - impl CacheWeight for [u8; 32] { fn indirect_weight(&self) -> usize { 0 diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index a22f2395aae..0f695ff742e 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -239,12 +239,12 @@ impl HostExports { Ok(result) } - pub(crate) fn store_get_derived( + pub(crate) fn store_load_related( &self, state: &mut BlockState, entity_type: String, - entity_field: String, entity_id: String, + entity_field: String, gas: &GasCounter, ) -> Result, anyhow::Error> { let store_key = EntityDerived { @@ -255,7 +255,7 @@ impl HostExports { }; self.check_entity_type_access(&store_key.entity_type)?; - let result = state.entity_cache.get_derived(&store_key)?; + let result = state.entity_cache.load_related(&store_key)?; gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?; Ok(result) diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 27d6dd69693..b8e6a0cbc36 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -528,12 +528,12 @@ impl WasmInstance { link!("store.get", store_get, "host_export_store_get", entity, id); link!( - "store.getDerived", - store_get_derived, + "store.loadRelated", + store_load_related, "host_export_store_get_derived", entity, - field, - id + id, + field ); link!( "store.set", @@ -1067,22 +1067,22 @@ impl WasmInstanceContext { Ok(ret) } - /// function store.getDerived(entity: string, field: string, id: string): Entity[] - pub fn store_get_derived( + /// function store.loadRelated(entity_type: string, id: string, field: string): Array + pub fn store_load_related( &mut self, gas: &GasCounter, - entity_ptr: AscPtr, - field_ptr: AscPtr, + entity_type_ptr: AscPtr, id_ptr: AscPtr, + field_ptr: AscPtr, ) -> Result>>, HostExportError> { - let entity_type: String = asc_get(self, entity_ptr, gas)?; - let field: String = asc_get(self, field_ptr, gas)?; + let entity_type: String = asc_get(self, entity_type_ptr, gas)?; let id: String = asc_get(self, id_ptr, gas)?; - let entities = self.ctx.host_exports.store_get_derived( + let field: String = asc_get(self, field_ptr, gas)?; + let entities = self.ctx.host_exports.store_load_related( &mut self.ctx.state, entity_type.clone(), - field.clone(), id.clone(), + field.clone(), gas, )?; From 40b7b7333890eb69fd09cebab973e56141ffe0c5 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 18:55:30 -0300 Subject: [PATCH 11/22] graph: convert string to entity type --- graph/src/components/store/entity_cache.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 1a84d2ab3ea..a5ccd3381b3 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -7,7 +7,7 @@ use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOpe use crate::prelude::{Schema, ENV_VARS}; use crate::util::lfu_cache::LfuCache; -use super::EntityDerived; +use super::{EntityDerived, EntityType}; /// A cache for entities from the store that provides the basic functionality /// needed for the store interactions in the host exports. This struct tracks @@ -121,7 +121,7 @@ impl EntityCache { let key = EntityDerived { entity_id: eref.entity_id.clone(), entity_field: field.into(), - entity_type: base_type.into(), + entity_type: EntityType::new(base_type.to_string()), causality_region: eref.causality_region, }; From 6c46d052b23f0418a0a35e8eae56040bc0a60f80 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 8 Mar 2023 19:15:00 -0300 Subject: [PATCH 12/22] graph,graphql,runtime,store: refact structs --- graph/src/components/store/entity_cache.rs | 13 +++++++---- graph/src/components/store/mod.rs | 27 ++++++++++++++++++---- graph/src/components/store/traits.rs | 4 ++-- graph/src/data/schema.rs | 4 ++-- graph/src/runtime/gas/size_of.rs | 4 ++-- graph/tests/entity_cache.rs | 4 ++-- runtime/wasm/src/host_exports.rs | 4 ++-- store/postgres/src/deployment_store.rs | 2 +- store/postgres/src/relational.rs | 4 ++-- store/postgres/src/relational_queries.rs | 8 +++---- store/postgres/src/writable.rs | 10 ++++---- 11 files changed, 52 insertions(+), 32 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index a5ccd3381b3..fa06205983f 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -7,7 +7,7 @@ use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOpe use crate::prelude::{Schema, ENV_VARS}; use crate::util::lfu_cache::LfuCache; -use super::{EntityDerived, EntityType}; +use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest}; /// A cache for entities from the store that provides the basic functionality /// needed for the store interactions in the host exports. This struct tracks @@ -115,13 +115,16 @@ impl EntityCache { Ok(entity) } - pub fn load_related(&mut self, eref: &EntityDerived) -> Result, anyhow::Error> { + pub fn load_related( + &mut self, + eref: &LoadRelatedRequest, + ) -> Result, anyhow::Error> { let (base_type, field) = self.schema.get_type_for_field(eref)?; - let key = EntityDerived { - entity_id: eref.entity_id.clone(), - entity_field: field.into(), + let key = DerivedEntityQuery { entity_type: EntityType::new(base_type.to_string()), + entity_field: field.into(), + value: eref.entity_id.clone(), causality_region: eref.causality_region, }; diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 776592868e3..33a6d7cdfa8 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -138,14 +138,31 @@ pub struct EntityKey { pub causality_region: CausalityRegion, } -#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct EntityDerived { +#[derive(Debug, Clone)] +pub struct LoadRelatedRequest { /// Name of the entity type. pub entity_type: EntityType, /// ID of the individual entity. pub entity_id: Word, + /// Field the shall be loaded + pub entity_field: Word, + + /// This is the causality region of the data source that created the entity. + /// + /// In the case of an entity lookup, this is the causality region of the data source that is + /// doing the lookup. So if the entity exists but was created on a different causality region, + /// the lookup will return empty. + pub causality_region: CausalityRegion, +} +#[derive(Debug)] +pub struct DerivedEntityQuery { + /// Name of the entity to search + pub entity_type: EntityType, + /// The field to check pub entity_field: Word, + /// The value to compare against + pub value: Word, /// This is the causality region of the data source that created the entity. /// @@ -166,8 +183,8 @@ impl EntityKey { } } - pub fn from(id: &String, entity_derived: &EntityDerived) -> Self { - let clone = entity_derived.clone(); + pub fn from(id: &String, load_related_request: &LoadRelatedRequest) -> Self { + let clone = load_related_request.clone(); Self { entity_id: id.clone().into(), entity_type: clone.entity_type, @@ -1153,7 +1170,7 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } - fn get_derived(&self, _query: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, _query: &DerivedEntityQuery) -> Result, StoreError> { Ok(vec![]) } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index d100dfaa0d8..094be6fa373 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -187,7 +187,7 @@ pub trait ReadStore: Send + Sync + 'static { ) -> Result, StoreError>; /// Reverse lookup - fn get_derived(&self, entity_derived: &EntityDerived) -> Result, StoreError>; + fn get_derived(&self, entity_derived: &DerivedEntityQuery) -> Result, StoreError>; fn input_schema(&self) -> Arc; } @@ -205,7 +205,7 @@ impl ReadStore for Arc { (**self).get_many(keys) } - fn get_derived(&self, entity_derived: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, entity_derived: &DerivedEntityQuery) -> Result, StoreError> { (**self).get_derived(entity_derived) } diff --git a/graph/src/data/schema.rs b/graph/src/data/schema.rs index 216e7952229..3d3ef328d9f 100644 --- a/graph/src/data/schema.rs +++ b/graph/src/data/schema.rs @@ -1,5 +1,5 @@ use crate::cheap_clone::CheapClone; -use crate::components::store::{EntityDerived, EntityKey, EntityType, SubgraphStore}; +use crate::components::store::{EntityKey, EntityType, LoadRelatedRequest, SubgraphStore}; use crate::data::graphql::ext::{DirectiveExt, DirectiveFinder, DocumentExt, TypeExt, ValueExt}; use crate::data::graphql::ObjectTypeExt; use crate::data::store::{self, ValueType}; @@ -539,7 +539,7 @@ impl Schema { } } - pub fn get_type_for_field(&self, key: &EntityDerived) -> Result<(&str, &str), Error> { + pub fn get_type_for_field(&self, key: &LoadRelatedRequest) -> Result<(&str, &str), Error> { let field = self .document .get_object_type_definition(key.entity_type.as_str()) diff --git a/graph/src/runtime/gas/size_of.rs b/graph/src/runtime/gas/size_of.rs index 4466919dd01..8f4e535a1fd 100644 --- a/graph/src/runtime/gas/size_of.rs +++ b/graph/src/runtime/gas/size_of.rs @@ -1,7 +1,7 @@ //! Various implementations of GasSizeOf; use crate::{ - components::store::{EntityDerived, EntityKey, EntityType}, + components::store::{EntityKey, EntityType, LoadRelatedRequest}, data::store::{scalar::Bytes, Value}, prelude::{BigDecimal, BigInt}, }; @@ -168,7 +168,7 @@ impl GasSizeOf for EntityKey { } } -impl GasSizeOf for EntityDerived { +impl GasSizeOf for LoadRelatedRequest { fn gas_size_of(&self) -> Gas { self.entity_type.gas_size_of() + self.entity_id.gas_size_of() diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index 7024db0de5e..b3941f608c1 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -10,7 +10,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use graph::components::store::{ - DeploymentCursorTracker, EntityDerived, EntityKey, EntityType, ReadStore, + DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, ReadStore, StoredDynamicDataSource, WritableStore, }; use graph::{ @@ -60,7 +60,7 @@ impl ReadStore for MockStore { Ok(self.get_many_res.clone()) } - fn get_derived(&self, _key: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, _key: &DerivedEntityQuery) -> Result, StoreError> { let values: Vec = self .get_many_res .clone() diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 0f695ff742e..e943749b332 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -9,7 +9,7 @@ use wasmtime::Trap; use web3::types::H160; use graph::blockchain::Blockchain; -use graph::components::store::{EnsLookup, EntityDerived}; +use graph::components::store::{EnsLookup, LoadRelatedRequest}; use graph::components::store::{EntityKey, EntityType}; use graph::components::subgraph::{ PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, @@ -247,7 +247,7 @@ impl HostExports { entity_field: String, gas: &GasCounter, ) -> Result, anyhow::Error> { - let store_key = EntityDerived { + let store_key = LoadRelatedRequest { entity_type: EntityType::new(entity_type), entity_id: entity_id.into(), entity_field: entity_field.into(), diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 08471490443..1d1c82bfaa8 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1122,7 +1122,7 @@ impl DeploymentStore { pub(crate) fn get_derived( &self, site: Arc, - key: &EntityDerived, + key: &DerivedEntityQuery, block: BlockNumber, ) -> Result, StoreError> { let conn = self.get_conn()?; diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 13c9bb90bcf..3e3ed41ff59 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -49,7 +49,7 @@ use crate::{ FilterQuery, FindManyQuery, FindQuery, InsertQuery, RevertClampQuery, RevertRemoveQuery, }, }; -use graph::components::store::{EntityDerived, EntityKey, EntityType}; +use graph::components::store::{DerivedEntityQuery, EntityKey, EntityType}; use graph::data::graphql::ext::{DirectiveFinder, DocumentExt, ObjectTypeExt}; use graph::data::schema::{FulltextConfig, FulltextDefinition, Schema, SCHEMA_TYPE_NAME}; use graph::data::store::BYTES_SCALAR; @@ -561,7 +561,7 @@ impl Layout { pub fn find_derived( &self, conn: &PgConnection, - key: &EntityDerived, + key: &DerivedEntityQuery, block: BlockNumber, ) -> Result, StoreError> { let table = self.table_for_entity(&key.entity_type)?; diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 0bd28e6e3e5..e7f891d8fd8 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -12,7 +12,7 @@ use diesel::result::{Error as DieselError, QueryResult}; use diesel::sql_types::{Array, BigInt, Binary, Bool, Integer, Jsonb, Text}; use diesel::Connection; -use graph::components::store::{EntityDerived, EntityKey}; +use graph::components::store::{DerivedEntityQuery, EntityKey}; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -1673,7 +1673,7 @@ impl<'a, Conn> RunQueryDsl for FindManyQuery<'a> {} #[derive(Debug, Clone, Constructor)] pub struct FindDerivedQuery<'a> { table: &'a Table, - key: &'a EntityDerived, + key: &'a DerivedEntityQuery, block: BlockNumber, } @@ -1681,10 +1681,10 @@ impl<'a> QueryFragment for FindDerivedQuery<'a> { fn walk_ast(&self, mut out: AstPass) -> QueryResult<()> { out.unsafe_to_cache_prepared(); - let EntityDerived { + let DerivedEntityQuery { entity_type: _, entity_field, - entity_id, + value: entity_id, causality_region, } = self.key; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index b259d460425..d8b434be54c 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; use std::{collections::BTreeMap, sync::Arc}; use graph::blockchain::block_stream::FirehoseCursor; -use graph::components::store::{DeploymentCursorTracker, EntityDerived, EntityKey, ReadStore}; +use graph::components::store::{DeploymentCursorTracker, DerivedEntityQuery, EntityKey, ReadStore}; use graph::data::subgraph::schema; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -252,7 +252,7 @@ impl SyncStore { fn get_derived( &self, - key: &EntityDerived, + key: &DerivedEntityQuery, block: BlockNumber, ) -> Result, StoreError> { retry::forever(&self.logger, "get_where", || { @@ -756,7 +756,7 @@ impl Queue { Ok(map) } - fn get_derived(&self, key: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, key: &DerivedEntityQuery) -> Result, StoreError> { let tracker = BlockTracker::new(); // TODO implement the whole async self.store.get_derived(key, tracker.query_block()) @@ -920,7 +920,7 @@ impl Writer { } } - fn get_derived(&self, key: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, key: &DerivedEntityQuery) -> Result, StoreError> { match self { Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX), Writer::Async(queue) => queue.get_derived(key), @@ -1016,7 +1016,7 @@ impl ReadStore for WritableStore { self.writer.get_many(keys) } - fn get_derived(&self, key: &EntityDerived) -> Result, StoreError> { + fn get_derived(&self, key: &DerivedEntityQuery) -> Result, StoreError> { self.writer.get_derived(key) } From f02e393425e766e90a12d9c59e7fd83443561451 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 14 Mar 2023 19:22:15 -0300 Subject: [PATCH 13/22] graph, store: add get_derived for queue --- graph/src/components/store/entity_cache.rs | 11 ++-- graph/src/components/store/mod.rs | 7 ++- graph/src/components/store/traits.rs | 10 +++- store/postgres/src/deployment_store.rs | 2 +- store/postgres/src/relational.rs | 13 +++-- store/postgres/src/writable.rs | 63 +++++++++++++++++++--- 6 files changed, 83 insertions(+), 23 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index fa06205983f..cd114f6a16c 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -129,13 +129,10 @@ impl EntityCache { }; let entities = self.store.get_derived(&key)?; - entities - .iter() - .filter(|e| e.contains_key("id")) - .for_each(|e| { - let key = EntityKey::from(&e.id().unwrap().into(), eref); - self.current.insert(key, Some(e.clone())); - }); + entities.iter().for_each(|(key, e)| { + self.current.insert(key.clone(), Some(e.clone())); + }); + let entities: Vec = entities.values().cloned().collect(); Ok(entities) } diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 33a6d7cdfa8..c18352e4315 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -1170,8 +1170,11 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } - fn get_derived(&self, _query: &DerivedEntityQuery) -> Result, StoreError> { - Ok(vec![]) + fn get_derived( + &self, + _query: &DerivedEntityQuery, + ) -> Result, StoreError> { + Ok(BTreeMap::new()) } fn input_schema(&self) -> Arc { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 094be6fa373..71a3dfd20d2 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -187,7 +187,10 @@ pub trait ReadStore: Send + Sync + 'static { ) -> Result, StoreError>; /// Reverse lookup - fn get_derived(&self, entity_derived: &DerivedEntityQuery) -> Result, StoreError>; + fn get_derived( + &self, + entity_derived: &DerivedEntityQuery, + ) -> Result, StoreError>; fn input_schema(&self) -> Arc; } @@ -205,7 +208,10 @@ impl ReadStore for Arc { (**self).get_many(keys) } - fn get_derived(&self, entity_derived: &DerivedEntityQuery) -> Result, StoreError> { + fn get_derived( + &self, + entity_derived: &DerivedEntityQuery, + ) -> Result, StoreError> { (**self).get_derived(entity_derived) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 1d1c82bfaa8..798a84e00fe 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1124,7 +1124,7 @@ impl DeploymentStore { site: Arc, key: &DerivedEntityQuery, block: BlockNumber, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let conn = self.get_conn()?; let layout = self.layout(&conn, site)?; layout.find_derived(&conn, key, block) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 3e3ed41ff59..2edbaaeacfc 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -563,15 +563,22 @@ impl Layout { conn: &PgConnection, key: &DerivedEntityQuery, block: BlockNumber, - ) -> Result, StoreError> { + ) -> Result, StoreError> { let table = self.table_for_entity(&key.entity_type)?; let query = FindDerivedQuery::new(table, key, block); - let mut entities = Vec::new(); + let mut entities = BTreeMap::new(); for data in query.load::(conn)? { + let entity_type = data.entity_type(); let entity_data: Entity = data.deserialize_with_layout(self, None, true)?; - entities.push(entity_data); + let key = EntityKey { + entity_type, + entity_id: entity_data.id()?.into(), + causality_region: CausalityRegion::from_entity(&entity_data), + }; + + entities.insert(key, entity_data); } Ok(entities) } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index d8b434be54c..5134a139a55 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -254,8 +254,8 @@ impl SyncStore { &self, key: &DerivedEntityQuery, block: BlockNumber, - ) -> Result, StoreError> { - retry::forever(&self.logger, "get_where", || { + ) -> Result, StoreError> { + self.retry("get_derived", || { self.writable .get_derived(self.site.cheap_clone(), key, block) }) @@ -756,10 +756,51 @@ impl Queue { Ok(map) } - fn get_derived(&self, key: &DerivedEntityQuery) -> Result, StoreError> { - let tracker = BlockTracker::new(); - // TODO implement the whole async - self.store.get_derived(key, tracker.query_block()) + fn get_derived( + &self, + key_derived: &DerivedEntityQuery, + ) -> Result, StoreError> { + let mut tracker = BlockTracker::new(); + + // Get entities from entries in the queue + let (mut entities_in_queue, entities_removed) = self.queue.fold( + (BTreeMap::new(), Vec::new()), + |(mut map, mut remove_list): (BTreeMap, Vec), req| { + tracker.update(req.as_ref()); + match req.as_ref() { + Request::Write { + block_ptr, mods, .. + } => { + if tracker.visible(block_ptr) { + for emod in mods { + let key = emod.entity_ref(); + // The key must be removed to avoid overwriting it with a stale value. + if key_derived.entity_type == key.entity_type { + match emod.entity() { + Some(entity) => { + map.insert(key.clone(), entity.clone()); + } + None => { + remove_list.push(key.clone()); + } + } + } + } + } + } + Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ } + } + (map, remove_list) + }, + ); + let mut items_from_database = self.store.get_derived(key_derived, tracker.query_block())?; + // Remove any entities that were removed in the queue + items_from_database.retain(|key, _item| !entities_removed.contains(key)); + + // Extend the store results with the entities from the queue. + entities_in_queue.extend(items_from_database); + + Ok(entities_in_queue) } /// Load dynamic data sources by looking at both the queue and the store @@ -920,7 +961,10 @@ impl Writer { } } - fn get_derived(&self, key: &DerivedEntityQuery) -> Result, StoreError> { + fn get_derived( + &self, + key: &DerivedEntityQuery, + ) -> Result, StoreError> { match self { Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX), Writer::Async(queue) => queue.get_derived(key), @@ -1016,7 +1060,10 @@ impl ReadStore for WritableStore { self.writer.get_many(keys) } - fn get_derived(&self, key: &DerivedEntityQuery) -> Result, StoreError> { + fn get_derived( + &self, + key: &DerivedEntityQuery, + ) -> Result, StoreError> { self.writer.get_derived(key) } From 790bba5ecac39a2813fe31edffeb50b8118c00b8 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 14 Mar 2023 19:36:09 -0300 Subject: [PATCH 14/22] store: fix problem where database overwrite queue --- store/postgres/src/writable.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 5134a139a55..88be64c23b5 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -793,14 +793,16 @@ impl Queue { (map, remove_list) }, ); + // We should filter this in the future to only get the entities that are needed let mut items_from_database = self.store.get_derived(key_derived, tracker.query_block())?; // Remove any entities that were removed in the queue items_from_database.retain(|key, _item| !entities_removed.contains(key)); // Extend the store results with the entities from the queue. - entities_in_queue.extend(items_from_database); + // This overwrites any entitiy from the database with the same key from queue + items_from_database.extend(entities_in_queue); - Ok(entities_in_queue) + Ok(items_from_database) } /// Load dynamic data sources by looking at both the queue and the store From e00533355a47348f99753e5fd7891817c5100384 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 14 Mar 2023 19:37:40 -0300 Subject: [PATCH 15/22] graph: fix mockstore --- graph/tests/entity_cache.rs | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index b3941f608c1..c844a3a000d 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -60,14 +60,11 @@ impl ReadStore for MockStore { Ok(self.get_many_res.clone()) } - fn get_derived(&self, _key: &DerivedEntityQuery) -> Result, StoreError> { - let values: Vec = self - .get_many_res - .clone() - .into_iter() - .map(|(_, v)| v) - .collect(); - Ok(values) + fn get_derived( + &self, + _key: &DerivedEntityQuery, + ) -> Result, StoreError> { + Ok(self.get_many_res.clone()) } fn input_schema(&self) -> Arc { From 14d8bdb69b66c3624ae6a180c02c20819c129b4c Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 14 Mar 2023 19:41:56 -0300 Subject: [PATCH 16/22] store: remove unnecessary mut --- store/postgres/src/writable.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 88be64c23b5..7ebbbd772ad 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -763,7 +763,7 @@ impl Queue { let mut tracker = BlockTracker::new(); // Get entities from entries in the queue - let (mut entities_in_queue, entities_removed) = self.queue.fold( + let (entities_in_queue, entities_removed) = self.queue.fold( (BTreeMap::new(), Vec::new()), |(mut map, mut remove_list): (BTreeMap, Vec), req| { tracker.update(req.as_ref()); From 74f547be70ae1d26613f951a6d7101f94037c02d Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 15 Mar 2023 11:55:43 -0300 Subject: [PATCH 17/22] graph,store: add database optimization rename variables to be more meaningful --- graph/src/components/store/entity_cache.rs | 4 +-- graph/src/components/store/traits.rs | 2 +- store/postgres/src/deployment_store.rs | 5 +-- store/postgres/src/relational.rs | 7 ++-- store/postgres/src/relational_queries.rs | 18 ++++++++-- store/postgres/src/writable.rs | 41 +++++++++++----------- 6 files changed, 47 insertions(+), 30 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index cd114f6a16c..a6ed5ec9f6d 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -121,14 +121,14 @@ impl EntityCache { ) -> Result, anyhow::Error> { let (base_type, field) = self.schema.get_type_for_field(eref)?; - let key = DerivedEntityQuery { + let query = DerivedEntityQuery { entity_type: EntityType::new(base_type.to_string()), entity_field: field.into(), value: eref.entity_id.clone(), causality_region: eref.causality_region, }; - let entities = self.store.get_derived(&key)?; + let entities = self.store.get_derived(&query)?; entities.iter().for_each(|(key, e)| { self.current.insert(key.clone(), Some(e.clone())); }); diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 71a3dfd20d2..dc376b7f65b 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -189,7 +189,7 @@ pub trait ReadStore: Send + Sync + 'static { /// Reverse lookup fn get_derived( &self, - entity_derived: &DerivedEntityQuery, + query_derived: &DerivedEntityQuery, ) -> Result, StoreError>; fn input_schema(&self) -> Arc; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 798a84e00fe..844dbe5dc61 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1122,12 +1122,13 @@ impl DeploymentStore { pub(crate) fn get_derived( &self, site: Arc, - key: &DerivedEntityQuery, + derived_query: &DerivedEntityQuery, block: BlockNumber, + excluded_keys: &Option>, ) -> Result, StoreError> { let conn = self.get_conn()?; let layout = self.layout(&conn, site)?; - layout.find_derived(&conn, key, block) + layout.find_derived(&conn, derived_query, block, excluded_keys) } pub(crate) fn get_changes( diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 2edbaaeacfc..6d6f9c9c66b 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -561,11 +561,12 @@ impl Layout { pub fn find_derived( &self, conn: &PgConnection, - key: &DerivedEntityQuery, + derived_query: &DerivedEntityQuery, block: BlockNumber, + excluded_keys: &Option>, ) -> Result, StoreError> { - let table = self.table_for_entity(&key.entity_type)?; - let query = FindDerivedQuery::new(table, key, block); + let table = self.table_for_entity(&derived_query.entity_type)?; + let query = FindDerivedQuery::new(table, derived_query, block, excluded_keys); let mut entities = BTreeMap::new(); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index e7f891d8fd8..00758db5c66 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -1673,8 +1673,9 @@ impl<'a, Conn> RunQueryDsl for FindManyQuery<'a> {} #[derive(Debug, Clone, Constructor)] pub struct FindDerivedQuery<'a> { table: &'a Table, - key: &'a DerivedEntityQuery, + derived_query: &'a DerivedEntityQuery, block: BlockNumber, + excluded_keys: &'a Option>, } impl<'a> QueryFragment for FindDerivedQuery<'a> { @@ -1686,7 +1687,7 @@ impl<'a> QueryFragment for FindDerivedQuery<'a> { entity_field, value: entity_id, causality_region, - } = self.key; + } = self.derived_query; // Generate // select '..' as entity, to_jsonb(e.*) as data @@ -1697,6 +1698,19 @@ impl<'a> QueryFragment for FindDerivedQuery<'a> { out.push_sql(" from "); out.push_sql(self.table.qualified_name.as_str()); out.push_sql(" e\n where "); + + if let Some(keys) = self.excluded_keys { + let primary_key = self.table.primary_key(); + out.push_identifier(primary_key.name.as_str())?; + out.push_sql(" not in ("); + for (i, value) in keys.iter().enumerate() { + if i > 0 { + out.push_sql(", "); + } + out.push_bind_param::(&value.entity_id.as_str())?; + } + out.push_sql(") and "); + } out.push_identifier(entity_field.as_str())?; out.push_sql(" = "); out.push_bind_param::(&entity_id.as_str())?; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 7ebbbd772ad..97bca3f6e09 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -254,10 +254,11 @@ impl SyncStore { &self, key: &DerivedEntityQuery, block: BlockNumber, + excluded_keys: Option>, ) -> Result, StoreError> { self.retry("get_derived", || { self.writable - .get_derived(self.site.cheap_clone(), key, block) + .get_derived(self.site.cheap_clone(), key, block, &excluded_keys) }) } @@ -763,9 +764,9 @@ impl Queue { let mut tracker = BlockTracker::new(); // Get entities from entries in the queue - let (entities_in_queue, entities_removed) = self.queue.fold( - (BTreeMap::new(), Vec::new()), - |(mut map, mut remove_list): (BTreeMap, Vec), req| { + let entities_in_queue = self.queue.fold( + BTreeMap::new(), + |mut map: BTreeMap>, req| { tracker.update(req.as_ref()); match req.as_ref() { Request::Write { @@ -774,33 +775,33 @@ impl Queue { if tracker.visible(block_ptr) { for emod in mods { let key = emod.entity_ref(); - // The key must be removed to avoid overwriting it with a stale value. + // we only select only the entities that match the query if key_derived.entity_type == key.entity_type { - match emod.entity() { - Some(entity) => { - map.insert(key.clone(), entity.clone()); - } - None => { - remove_list.push(key.clone()); - } - } + map.insert(key.clone(), emod.entity().cloned()); } } } } Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ } } - (map, remove_list) + map }, ); - // We should filter this in the future to only get the entities that are needed - let mut items_from_database = self.store.get_derived(key_derived, tracker.query_block())?; - // Remove any entities that were removed in the queue - items_from_database.retain(|key, _item| !entities_removed.contains(key)); + + let excluded_keys: Vec = entities_in_queue.keys().cloned().collect(); + + // We filter to exclude the entities ids that we already have from the queue + let mut items_from_database = + self.store + .get_derived(key_derived, tracker.query_block(), Some(excluded_keys))?; // Extend the store results with the entities from the queue. // This overwrites any entitiy from the database with the same key from queue - items_from_database.extend(entities_in_queue); + let items_from_queue: BTreeMap = entities_in_queue + .into_iter() + .filter_map(|(key, entity)| entity.map(|entity| (key, entity))) + .collect(); + items_from_database.extend(items_from_queue); Ok(items_from_database) } @@ -968,7 +969,7 @@ impl Writer { key: &DerivedEntityQuery, ) -> Result, StoreError> { match self { - Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX), + Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, None), Writer::Async(queue) => queue.get_derived(key), } } From a3be3de0d9cfd811b6331e08dd7521fca1b70244 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Wed, 15 Mar 2023 16:55:02 -0300 Subject: [PATCH 18/22] store: check for length in excluded_keys --- store/postgres/src/relational_queries.rs | 2 +- store/postgres/src/writable.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 00758db5c66..facadb6907d 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -1699,7 +1699,7 @@ impl<'a> QueryFragment for FindDerivedQuery<'a> { out.push_sql(self.table.qualified_name.as_str()); out.push_sql(" e\n where "); - if let Some(keys) = self.excluded_keys { + if let Some(keys) = self.excluded_keys.as_ref().filter(|keys| keys.len() > 0) { let primary_key = self.table.primary_key(); out.push_identifier(primary_key.name.as_str())?; out.push_sql(" not in ("); diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 97bca3f6e09..2ed8fe59973 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -759,7 +759,7 @@ impl Queue { fn get_derived( &self, - key_derived: &DerivedEntityQuery, + derived_query: &DerivedEntityQuery, ) -> Result, StoreError> { let mut tracker = BlockTracker::new(); @@ -775,8 +775,8 @@ impl Queue { if tracker.visible(block_ptr) { for emod in mods { let key = emod.entity_ref(); - // we only select only the entities that match the query - if key_derived.entity_type == key.entity_type { + // we select just the entities that match the query + if derived_query.entity_type == key.entity_type { map.insert(key.clone(), emod.entity().cloned()); } } @@ -793,7 +793,7 @@ impl Queue { // We filter to exclude the entities ids that we already have from the queue let mut items_from_database = self.store - .get_derived(key_derived, tracker.query_block(), Some(excluded_keys))?; + .get_derived(derived_query, tracker.query_block(), Some(excluded_keys))?; // Extend the store results with the entities from the queue. // This overwrites any entitiy from the database with the same key from queue From be31715b3d33e3af4670b7df940047f1de5b568b Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Thu, 16 Mar 2023 23:55:18 -0300 Subject: [PATCH 19/22] graph,runtime,store: fix requested changes --- graph/src/components/store/entity_cache.rs | 4 +- graph/src/data/schema.rs | 44 ++++++++++++++++++++-- runtime/wasm/src/module/mod.rs | 8 ++-- store/postgres/src/deployment_store.rs | 2 +- store/postgres/src/relational.rs | 2 +- store/postgres/src/relational_queries.rs | 6 +-- store/postgres/src/writable.rs | 10 +++-- 7 files changed, 56 insertions(+), 20 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index a6ed5ec9f6d..af618bb8aad 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -119,11 +119,11 @@ impl EntityCache { &mut self, eref: &LoadRelatedRequest, ) -> Result, anyhow::Error> { - let (base_type, field) = self.schema.get_type_for_field(eref)?; + let (base_type, field) = self.schema.get_field_related(eref)?; let query = DerivedEntityQuery { entity_type: EntityType::new(base_type.to_string()), - entity_field: field.into(), + entity_field: field.name.clone().into(), value: eref.entity_id.clone(), causality_region: eref.causality_region, }; diff --git a/graph/src/data/schema.rs b/graph/src/data/schema.rs index 3d3ef328d9f..311671a6d48 100644 --- a/graph/src/data/schema.rs +++ b/graph/src/data/schema.rs @@ -539,7 +539,22 @@ impl Schema { } } - pub fn get_type_for_field(&self, key: &LoadRelatedRequest) -> Result<(&str, &str), Error> { + /// Returns the field that has the relationship with the key requested + /// This works as a reverse search for the Field related to the query + /// + /// example: + /// + /// type Account @entity { + /// wallets: [Wallet!]! @derivedFrom("account") + /// } + /// type Wallet { + /// account: Account! + /// balance: Int! + /// } + /// + /// When asked to load the related entities from "Account" in the field "wallets" + /// This function will return the type "Wallet" with the field "account" + pub fn get_field_related(&self, key: &LoadRelatedRequest) -> Result<(&str, &Field), Error> { let field = self .document .get_object_type_definition(key.entity_type.as_str()) @@ -561,11 +576,32 @@ impl Schema { ) })?; if field.is_derived() { - let derived_from = field.find_directive("derivedFrom").unwrap(); + let derived_from = field.find_directive("derivedfrom").unwrap(); let base_type = field.field_type.get_base_type(); - let field = derived_from.argument("field").unwrap(); + let field_name = derived_from.argument("field").unwrap(); + + let field = self + .document + .get_object_type_definition(base_type) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown entity type `{}`", + key.entity_type, + key.entity_id, + key.entity_type, + ) + })? + .field(field_name.as_str().unwrap()) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown field `{}`", + key.entity_type, + key.entity_id, + key.entity_field, + ) + })?; - Ok((base_type, field.as_str().unwrap())) + Ok((base_type, field)) } else { Err(anyhow!( "Entity {}[{}]: field `{}` is not derived", diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index b8e6a0cbc36..94eda62d807 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -530,7 +530,7 @@ impl WasmInstance { link!( "store.loadRelated", store_load_related, - "host_export_store_get_derived", + "host_export_store_load_related", entity, id, field @@ -1086,10 +1086,8 @@ impl WasmInstanceContext { gas, )?; - let entities: Vec> = entities - .iter() - .map(|entity| entity.clone().sorted()) - .collect(); + let entities: Vec> = + entities.into_iter().map(|entity| entity.sorted()).collect(); let ret = asc_new(self, &entities, gas)?; Ok(ret) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 844dbe5dc61..724f4a24bdc 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1124,7 +1124,7 @@ impl DeploymentStore { site: Arc, derived_query: &DerivedEntityQuery, block: BlockNumber, - excluded_keys: &Option>, + excluded_keys: &Vec, ) -> Result, StoreError> { let conn = self.get_conn()?; let layout = self.layout(&conn, site)?; diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 6d6f9c9c66b..4e578db1ec1 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -563,7 +563,7 @@ impl Layout { conn: &PgConnection, derived_query: &DerivedEntityQuery, block: BlockNumber, - excluded_keys: &Option>, + excluded_keys: &Vec, ) -> Result, StoreError> { let table = self.table_for_entity(&derived_query.entity_type)?; let query = FindDerivedQuery::new(table, derived_query, block, excluded_keys); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index facadb6907d..35b86278d5d 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -1675,7 +1675,7 @@ pub struct FindDerivedQuery<'a> { table: &'a Table, derived_query: &'a DerivedEntityQuery, block: BlockNumber, - excluded_keys: &'a Option>, + excluded_keys: &'a Vec, } impl<'a> QueryFragment for FindDerivedQuery<'a> { @@ -1699,11 +1699,11 @@ impl<'a> QueryFragment for FindDerivedQuery<'a> { out.push_sql(self.table.qualified_name.as_str()); out.push_sql(" e\n where "); - if let Some(keys) = self.excluded_keys.as_ref().filter(|keys| keys.len() > 0) { + if self.excluded_keys.len() > 0 { let primary_key = self.table.primary_key(); out.push_identifier(primary_key.name.as_str())?; out.push_sql(" not in ("); - for (i, value) in keys.iter().enumerate() { + for (i, value) in self.excluded_keys.iter().enumerate() { if i > 0 { out.push_sql(", "); } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 2ed8fe59973..f95157ee3b7 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -254,7 +254,7 @@ impl SyncStore { &self, key: &DerivedEntityQuery, block: BlockNumber, - excluded_keys: Option>, + excluded_keys: Vec, ) -> Result, StoreError> { self.retry("get_derived", || { self.writable @@ -776,7 +776,9 @@ impl Queue { for emod in mods { let key = emod.entity_ref(); // we select just the entities that match the query - if derived_query.entity_type == key.entity_type { + if derived_query.entity_type == key.entity_type + && derived_query.value == key.entity_id + { map.insert(key.clone(), emod.entity().cloned()); } } @@ -793,7 +795,7 @@ impl Queue { // We filter to exclude the entities ids that we already have from the queue let mut items_from_database = self.store - .get_derived(derived_query, tracker.query_block(), Some(excluded_keys))?; + .get_derived(derived_query, tracker.query_block(), excluded_keys)?; // Extend the store results with the entities from the queue. // This overwrites any entitiy from the database with the same key from queue @@ -969,7 +971,7 @@ impl Writer { key: &DerivedEntityQuery, ) -> Result, StoreError> { match self { - Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, None), + Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, vec![]), Writer::Async(queue) => queue.get_derived(key), } } From a20f3d40f071dcfa2e1fdde48c8e8de670735a84 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Fri, 17 Mar 2023 19:32:01 -0300 Subject: [PATCH 20/22] graph: fix case-sensitive mistake --- graph/src/data/schema.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/graph/src/data/schema.rs b/graph/src/data/schema.rs index 311671a6d48..a34f4c4f574 100644 --- a/graph/src/data/schema.rs +++ b/graph/src/data/schema.rs @@ -545,7 +545,7 @@ impl Schema { /// example: /// /// type Account @entity { - /// wallets: [Wallet!]! @derivedFrom("account") + /// wallets: [Wallet!]! @derivedFrom(field: "account") /// } /// type Wallet { /// account: Account! @@ -576,7 +576,7 @@ impl Schema { ) })?; if field.is_derived() { - let derived_from = field.find_directive("derivedfrom").unwrap(); + let derived_from = field.find_directive("derivedFrom").unwrap(); let base_type = field.field_type.get_base_type(); let field_name = derived_from.argument("field").unwrap(); From 6b8fe6eb3b6485d894d1025eff69485c82b01f9d Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 21 Mar 2023 12:06:57 -0300 Subject: [PATCH 21/22] graph,store: add tests for load_related --- Cargo.lock | 3 + graph/Cargo.toml | 4 + graph/tests/entity_cache.rs | 417 ++++++++++++++++++++++++++++++++- store/postgres/src/writable.rs | 21 +- 4 files changed, 430 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index feb30dbac0a..1d105f00fbf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1544,8 +1544,11 @@ dependencies = [ "ethabi", "futures 0.1.31", "futures 0.3.16", + "graph-chain-ethereum", + "graph-store-postgres", "graphql-parser", "hex", + "hex-literal", "http", "isatty", "itertools", diff --git a/graph/Cargo.toml b/graph/Cargo.toml index c71c64c94f0..9686948ea27 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -66,8 +66,12 @@ web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-pat serde_plain = "1.0.1" [dev-dependencies] +test-store = { path = "../store/test-store" } +graph-store-postgres = { path = "../store/postgres" } +graph-chain-ethereum = { path = "../chain/ethereum" } clap = { version = "3.2.23", features = ["derive", "env"] } maplit = "1.0.2" +hex-literal = "0.3" [build-dependencies] tonic-build = { workspace = true } diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index c844a3a000d..38a490a2622 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -1,22 +1,28 @@ use async_trait::async_trait; use graph::blockchain::block_stream::FirehoseCursor; -use graph::blockchain::BlockPtr; -use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; -use graph::data_source::CausalityRegion; -use graph::prelude::{Schema, StopwatchMetrics, StoreError, UnfailOutcome}; -use lazy_static::lazy_static; -use slog::Logger; -use std::collections::{BTreeMap, BTreeSet}; -use std::sync::Arc; - use graph::components::store::{ - DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, ReadStore, - StoredDynamicDataSource, WritableStore, + DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest, + ReadStore, StoredDynamicDataSource, WritableStore, }; +use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth}; +use graph::data_source::CausalityRegion; +use graph::prelude::*; use graph::{ components::store::{DeploymentId, DeploymentLocator}, prelude::{DeploymentHash, Entity, EntityCache, EntityModification, Value}, }; +use hex_literal::hex; + +use graph::semver::Version; +use lazy_static::lazy_static; +use slog::Logger; +use std::collections::{BTreeMap, BTreeSet}; +use std::marker::PhantomData; +use std::sync::Arc; +use web3::types::H256; + +use graph_store_postgres::SubgraphStore as DieselSubgraphStore; +use test_store::*; lazy_static! { static ref SUBGRAPH_ID: DeploymentHash = DeploymentHash::new("entity_cache").unwrap(); @@ -358,3 +364,392 @@ fn consecutive_modifications() { },]) ); } + +const ACCOUNT_GQL: &str = " + type Account @entity { + id: ID! + name: String! + email: String! + age: Int! + wallets: [Wallet!]! @derivedFrom(field: \"account\") + } + + type Wallet @entity { + id: ID! + balance: Int! + account: Account! + } +"; + +const ACCOUNT: &str = "Account"; +const WALLET: &str = "Wallet"; + +lazy_static! { + static ref LOAD_RELATED_ID_STRING: String = String::from("loadrelatedsubgraph"); + static ref LOAD_RELATED_ID: DeploymentHash = + DeploymentHash::new(LOAD_RELATED_ID_STRING.as_str()).unwrap(); + static ref LOAD_RELATED_SUBGRAPH: Schema = + Schema::parse(ACCOUNT_GQL, LOAD_RELATED_ID.clone()).expect("Failed to parse user schema"); + static ref TEST_BLOCK_1_PTR: BlockPtr = ( + H256::from(hex!( + "8511fa04b64657581e3f00e14543c1d522d5d7e771b54aa3060b662ade47da13" + )), + 1u64 + ) + .into(); +} + +fn remove_test_data(store: Arc) { + store + .delete_all_entities_for_test_use_only() + .expect("deleting test entities succeeds"); +} + +fn run_store_test(test: F) +where + F: FnOnce( + EntityCache, + Arc, + DeploymentLocator, + Arc, + ) -> R + + Send + + 'static, + R: std::future::Future + Send + 'static, +{ + run_test_sequentially(|store| async move { + let subgraph_store = store.subgraph_store(); + // Reset state before starting + remove_test_data(subgraph_store.clone()); + + // Seed database with test data + let deployment = insert_test_data(subgraph_store.clone()).await; + let writable = store + .subgraph_store() + .writable(LOGGER.clone(), deployment.id) + .await + .expect("we can get a writable store"); + + // we send the information to the database + writable.flush().await.unwrap(); + + let read_store = Arc::new(writable.clone()); + + let cache = EntityCache::new(read_store); + // Run test and wait for the background writer to finish its work so + // it won't conflict with the next test + test(cache, subgraph_store.clone(), deployment, writable.clone()).await; + writable.flush().await.unwrap(); + }); +} + +async fn insert_test_data(store: Arc) -> DeploymentLocator { + let manifest = SubgraphManifest:: { + id: LOAD_RELATED_ID.clone(), + spec_version: Version::new(1, 0, 0), + features: Default::default(), + description: None, + repository: None, + schema: LOAD_RELATED_SUBGRAPH.clone(), + data_sources: vec![], + graft: None, + templates: vec![], + chain: PhantomData, + }; + + // Create SubgraphDeploymentEntity + let deployment = DeploymentCreate::new(String::new(), &manifest, None); + let name = SubgraphName::new("test/store").unwrap(); + let node_id = NodeId::new("test").unwrap(); + let deployment = store + .create_subgraph_deployment( + name, + &LOAD_RELATED_SUBGRAPH, + deployment, + node_id, + NETWORK_NAME.to_string(), + SubgraphVersionSwitchingMode::Instant, + ) + .unwrap(); + + // 1 account 3 wallets + let test_entity_1 = create_account_entity("1", "Johnton", "tonofjohn@email.com", 67_i32); + let wallet_entity_1 = create_wallet_operation("1", "1", 67_i32); + let wallet_entity_2 = create_wallet_operation("2", "1", 92_i32); + let wallet_entity_3 = create_wallet_operation("3", "1", 192_i32); + // 1 account 1 wallet + let test_entity_2 = create_account_entity("2", "Cindini", "dinici@email.com", 42_i32); + let wallet_entity_4 = create_wallet_operation("4", "2", 32_i32); + // 1 account 0 wallets + let test_entity_3 = create_account_entity("3", "Shaqueeena", "queensha@email.com", 28_i32); + transact_entity_operations( + &store, + &deployment, + GENESIS_PTR.clone(), + vec![ + test_entity_1, + test_entity_2, + test_entity_3, + wallet_entity_1, + wallet_entity_2, + wallet_entity_3, + wallet_entity_4, + ], + ) + .await + .unwrap(); + deployment +} + +fn create_account_entity(id: &str, name: &str, email: &str, age: i32) -> EntityOperation { + let mut test_entity = Entity::new(); + + test_entity.insert("id".to_owned(), Value::String(id.to_owned())); + test_entity.insert("name".to_owned(), Value::String(name.to_owned())); + test_entity.insert("email".to_owned(), Value::String(email.to_owned())); + test_entity.insert("age".to_owned(), Value::Int(age)); + + EntityOperation::Set { + key: EntityKey::data(ACCOUNT.to_owned(), id.to_owned()), + data: test_entity, + } +} + +fn create_wallet_entity(id: &str, account_id: &str, balance: i32) -> Entity { + let mut test_wallet = Entity::new(); + + test_wallet.insert("id".to_owned(), Value::String(id.to_owned())); + test_wallet.insert("account".to_owned(), Value::String(account_id.to_owned())); + test_wallet.insert("balance".to_owned(), Value::Int(balance)); + test_wallet +} +fn create_wallet_operation(id: &str, account_id: &str, balance: i32) -> EntityOperation { + let test_wallet = create_wallet_entity(id, account_id, balance); + EntityOperation::Set { + key: EntityKey::data(WALLET.to_owned(), id.to_owned()), + data: test_wallet, + } +} + +#[test] +fn check_for_account_with_multiple_wallets() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "1"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("1", account_id, 67_i32); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_account_with_single_wallet() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "2"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("4", account_id, 32_i32); + let expeted_vec = vec![wallet_1]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_account_with_no_wallet() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "3"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let expeted_vec = vec![]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_account_that_doesnt_exist() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "4"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let expeted_vec = vec![]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_non_existent_field() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "1"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "friends".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap_err(); + let expected = format!( + "Entity {}[{}]: unknown field `{}`", + request.entity_type, request.entity_id, request.entity_field, + ); + + assert_eq!(format!("{}", result), expected); + }); +} + +#[test] +fn check_for_insert_async_store() { + run_store_test(|mut cache, store, deployment, _writable| async move { + let account_id = "2"; + // insert a new wallet + let wallet_entity_5 = create_wallet_operation("5", account_id, 79_i32); + let wallet_entity_6 = create_wallet_operation("6", account_id, 200_i32); + + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![wallet_entity_5, wallet_entity_6], + ) + .await + .unwrap(); + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("4", account_id, 32_i32); + let wallet_2 = create_wallet_entity("5", account_id, 79_i32); + let wallet_3 = create_wallet_entity("6", account_id, 200_i32); + let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} +#[test] +fn check_for_insert_async_not_related() { + run_store_test(|mut cache, store, deployment, _writable| async move { + let account_id = "2"; + // insert a new wallet + let wallet_entity_5 = create_wallet_operation("5", account_id, 79_i32); + let wallet_entity_6 = create_wallet_operation("6", account_id, 200_i32); + + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![wallet_entity_5, wallet_entity_6], + ) + .await + .unwrap(); + let account_id = "1"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("1", account_id, 67_i32); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_update_async_related() { + run_store_test(|mut cache, store, deployment, writable| async move { + let account_id = "1"; + let entity_key = EntityKey::data(WALLET.to_owned(), "1".to_owned()); + let wallet_entity_update = create_wallet_operation("1", account_id, 79_i32); + + let new_data = match wallet_entity_update { + EntityOperation::Set { ref data, .. } => data.clone(), + _ => unreachable!(), + }; + assert_ne!(writable.get(&entity_key).unwrap().unwrap(), new_data); + // insert a new wallet + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![wallet_entity_update], + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![new_data, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_delete_async_related() { + run_store_test(|mut cache, store, deployment, _writable| async move { + let account_id = "1"; + let del_key = EntityKey::data(WALLET.to_owned(), "1".to_owned()); + // delete wallet + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![EntityOperation::Remove { key: del_key }], + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index f95157ee3b7..2227a6e7a9c 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -776,10 +776,23 @@ impl Queue { for emod in mods { let key = emod.entity_ref(); // we select just the entities that match the query - if derived_query.entity_type == key.entity_type - && derived_query.value == key.entity_id - { - map.insert(key.clone(), emod.entity().cloned()); + if derived_query.entity_type == key.entity_type { + if let Some(entity) = emod.entity().cloned() { + if let Some(related_id) = + entity.get(derived_query.entity_field.as_str()) + { + // we check only the field agains the value + if related_id.to_string() + == derived_query.value.to_string() + { + map.insert(key.clone(), Some(entity)); + } + } + } else { + // if the entity was deleted, we add here with no checks + // just for removing from the query + map.insert(key.clone(), emod.entity().cloned()); + } } } } From 196c36e5dbed9f917e901d5e9ede258866898ad4 Mon Sep 17 00:00:00 2001 From: Gustavo Inacio Date: Tue, 21 Mar 2023 12:34:48 -0300 Subject: [PATCH 22/22] runtime,graph: fix warnings and rebase types runtime: fix rebase problem --- Cargo.lock | 1 + graph/src/data/schema.rs | 2 +- store/postgres/src/deployment_store.rs | 4 ++-- store/postgres/src/writable.rs | 2 +- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d105f00fbf..b663dad768a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1580,6 +1580,7 @@ dependencies = [ "stable-hash 0.4.2", "strum", "strum_macros", + "test-store", "thiserror", "tiny-keccak 1.5.0", "tokio", diff --git a/graph/src/data/schema.rs b/graph/src/data/schema.rs index a34f4c4f574..dc0f026f77b 100644 --- a/graph/src/data/schema.rs +++ b/graph/src/data/schema.rs @@ -1,5 +1,5 @@ use crate::cheap_clone::CheapClone; -use crate::components::store::{EntityKey, EntityType, LoadRelatedRequest, SubgraphStore}; +use crate::components::store::{EntityKey, EntityType, LoadRelatedRequest}; use crate::data::graphql::ext::{DirectiveExt, DirectiveFinder, DocumentExt, TypeExt, ValueExt}; use crate::data::graphql::ObjectTypeExt; use crate::data::store::{self, ValueType}; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 724f4a24bdc..32e040f95e4 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -6,8 +6,8 @@ use diesel::r2d2::{ConnectionManager, PooledConnection}; use graph::anyhow::Context; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{ - EntityDerived, EntityKey, EntityType, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, - StoredDynamicDataSource, VersionStats, + DerivedEntityQuery, EntityKey, EntityType, PrunePhase, PruneReporter, PruneRequest, + PruningStrategy, StoredDynamicDataSource, VersionStats, }; use graph::components::versions::VERSIONS; use graph::data::query::Trace; diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 2227a6e7a9c..a5127aa715b 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -256,7 +256,7 @@ impl SyncStore { block: BlockNumber, excluded_keys: Vec, ) -> Result, StoreError> { - self.retry("get_derived", || { + retry::forever(&self.logger, "get_derived", || { self.writable .get_derived(self.site.cheap_clone(), key, block, &excluded_keys) })