diff --git a/Cargo.lock b/Cargo.lock index feb30dbac0a..b663dad768a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1544,8 +1544,11 @@ dependencies = [ "ethabi", "futures 0.1.31", "futures 0.3.16", + "graph-chain-ethereum", + "graph-store-postgres", "graphql-parser", "hex", + "hex-literal", "http", "isatty", "itertools", @@ -1577,6 +1580,7 @@ dependencies = [ "stable-hash 0.4.2", "strum", "strum_macros", + "test-store", "thiserror", "tiny-keccak 1.5.0", "tokio", diff --git a/graph/Cargo.toml b/graph/Cargo.toml index c71c64c94f0..9686948ea27 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -66,8 +66,12 @@ web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-pat serde_plain = "1.0.1" [dev-dependencies] +test-store = { path = "../store/test-store" } +graph-store-postgres = { path = "../store/postgres" } +graph-chain-ethereum = { path = "../chain/ethereum" } clap = { version = "3.2.23", features = ["derive", "env"] } maplit = "1.0.2" +hex-literal = "0.3" [build-dependencies] tonic-build = { workspace = true } diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 538af72ff18..af618bb8aad 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -7,6 +7,8 @@ use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOpe use crate::prelude::{Schema, ENV_VARS}; use crate::util::lfu_cache::LfuCache; +use super::{DerivedEntityQuery, EntityType, LoadRelatedRequest}; + /// A cache for entities from the store that provides the basic functionality /// needed for the store interactions in the host exports. This struct tracks /// how entities are modified, and caches all entities looked up from the @@ -113,6 +115,27 @@ impl EntityCache { Ok(entity) } + pub fn load_related( + &mut self, + eref: &LoadRelatedRequest, + ) -> Result, anyhow::Error> { + let (base_type, field) = self.schema.get_field_related(eref)?; + + let query = DerivedEntityQuery { + entity_type: EntityType::new(base_type.to_string()), + entity_field: field.name.clone().into(), + value: eref.entity_id.clone(), + causality_region: eref.causality_region, + }; + + let entities = self.store.get_derived(&query)?; + entities.iter().for_each(|(key, e)| { + self.current.insert(key.clone(), Some(e.clone())); + }); + let entities: Vec = entities.values().cloned().collect(); + Ok(entities) + } + pub fn remove(&mut self, key: EntityKey) { self.entity_op(key, EntityOp::Remove); } @@ -151,6 +174,7 @@ impl EntityCache { } } + // check the validate for derived fields let is_valid = entity.validate(&self.schema, &key).is_ok(); self.entity_op(key.clone(), EntityOp::Update(entity)); diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index a1c42d5f1bf..c18352e4315 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -138,6 +138,40 @@ pub struct EntityKey { pub causality_region: CausalityRegion, } +#[derive(Debug, Clone)] +pub struct LoadRelatedRequest { + /// Name of the entity type. + pub entity_type: EntityType, + /// ID of the individual entity. + pub entity_id: Word, + /// Field the shall be loaded + pub entity_field: Word, + + /// This is the causality region of the data source that created the entity. + /// + /// In the case of an entity lookup, this is the causality region of the data source that is + /// doing the lookup. So if the entity exists but was created on a different causality region, + /// the lookup will return empty. + pub causality_region: CausalityRegion, +} + +#[derive(Debug)] +pub struct DerivedEntityQuery { + /// Name of the entity to search + pub entity_type: EntityType, + /// The field to check + pub entity_field: Word, + /// The value to compare against + pub value: Word, + + /// This is the causality region of the data source that created the entity. + /// + /// In the case of an entity lookup, this is the causality region of the data source that is + /// doing the lookup. So if the entity exists but was created on a different causality region, + /// the lookup will return empty. + pub causality_region: CausalityRegion, +} + impl EntityKey { // For use in tests only #[cfg(debug_assertions)] @@ -148,6 +182,15 @@ impl EntityKey { causality_region: CausalityRegion::ONCHAIN, } } + + pub fn from(id: &String, load_related_request: &LoadRelatedRequest) -> Self { + let clone = load_related_request.clone(); + Self { + entity_id: id.clone().into(), + entity_type: clone.entity_type, + causality_region: clone.causality_region, + } + } } #[derive(Clone, Debug, PartialEq)] @@ -1127,6 +1170,13 @@ impl ReadStore for EmptyStore { Ok(BTreeMap::new()) } + fn get_derived( + &self, + _query: &DerivedEntityQuery, + ) -> Result, StoreError> { + Ok(BTreeMap::new()) + } + fn input_schema(&self) -> Arc { self.schema.cheap_clone() } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 04bc36aa1e6..dc376b7f65b 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -186,6 +186,12 @@ pub trait ReadStore: Send + Sync + 'static { keys: BTreeSet, ) -> Result, StoreError>; + /// Reverse lookup + fn get_derived( + &self, + query_derived: &DerivedEntityQuery, + ) -> Result, StoreError>; + fn input_schema(&self) -> Arc; } @@ -202,6 +208,13 @@ impl ReadStore for Arc { (**self).get_many(keys) } + fn get_derived( + &self, + entity_derived: &DerivedEntityQuery, + ) -> Result, StoreError> { + (**self).get_derived(entity_derived) + } + fn input_schema(&self) -> Arc { (**self).input_schema() } diff --git a/graph/src/data/schema.rs b/graph/src/data/schema.rs index 899ef70fc93..dc0f026f77b 100644 --- a/graph/src/data/schema.rs +++ b/graph/src/data/schema.rs @@ -1,5 +1,5 @@ use crate::cheap_clone::CheapClone; -use crate::components::store::{EntityKey, EntityType}; +use crate::components::store::{EntityKey, EntityType, LoadRelatedRequest}; use crate::data::graphql::ext::{DirectiveExt, DirectiveFinder, DocumentExt, TypeExt, ValueExt}; use crate::data::graphql::ObjectTypeExt; use crate::data::store::{self, ValueType}; @@ -539,6 +539,79 @@ impl Schema { } } + /// Returns the field that has the relationship with the key requested + /// This works as a reverse search for the Field related to the query + /// + /// example: + /// + /// type Account @entity { + /// wallets: [Wallet!]! @derivedFrom(field: "account") + /// } + /// type Wallet { + /// account: Account! + /// balance: Int! + /// } + /// + /// When asked to load the related entities from "Account" in the field "wallets" + /// This function will return the type "Wallet" with the field "account" + pub fn get_field_related(&self, key: &LoadRelatedRequest) -> Result<(&str, &Field), Error> { + let field = self + .document + .get_object_type_definition(key.entity_type.as_str()) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown entity type `{}`", + key.entity_type, + key.entity_id, + key.entity_type, + ) + })? + .field(&key.entity_field) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown field `{}`", + key.entity_type, + key.entity_id, + key.entity_field, + ) + })?; + if field.is_derived() { + let derived_from = field.find_directive("derivedFrom").unwrap(); + let base_type = field.field_type.get_base_type(); + let field_name = derived_from.argument("field").unwrap(); + + let field = self + .document + .get_object_type_definition(base_type) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown entity type `{}`", + key.entity_type, + key.entity_id, + key.entity_type, + ) + })? + .field(field_name.as_str().unwrap()) + .ok_or_else(|| { + anyhow!( + "Entity {}[{}]: unknown field `{}`", + key.entity_type, + key.entity_id, + key.entity_field, + ) + })?; + + Ok((base_type, field)) + } else { + Err(anyhow!( + "Entity {}[{}]: field `{}` is not derived", + key.entity_type, + key.entity_id, + key.entity_field, + )) + } + } + pub fn is_immutable(&self, entity_type: &EntityType) -> bool { self.immutable_types.contains(entity_type) } diff --git a/graph/src/runtime/gas/size_of.rs b/graph/src/runtime/gas/size_of.rs index 49bb60b1215..8f4e535a1fd 100644 --- a/graph/src/runtime/gas/size_of.rs +++ b/graph/src/runtime/gas/size_of.rs @@ -1,7 +1,7 @@ //! Various implementations of GasSizeOf; use crate::{ - components::store::{EntityKey, EntityType}, + components::store::{EntityKey, EntityType, LoadRelatedRequest}, data::store::{scalar::Bytes, Value}, prelude::{BigDecimal, BigInt}, }; @@ -168,6 +168,14 @@ impl GasSizeOf for EntityKey { } } +impl GasSizeOf for LoadRelatedRequest { + fn gas_size_of(&self) -> Gas { + self.entity_type.gas_size_of() + + self.entity_id.gas_size_of() + + self.entity_field.gas_size_of() + } +} + impl GasSizeOf for EntityType { fn gas_size_of(&self) -> Gas { self.as_str().gas_size_of() diff --git a/graph/src/runtime/mod.rs b/graph/src/runtime/mod.rs index 74007b96cef..917f4d85d40 100644 --- a/graph/src/runtime/mod.rs +++ b/graph/src/runtime/mod.rs @@ -260,6 +260,7 @@ pub enum IndexForAscTypeId { Log = 1001, ArrayH256 = 1002, ArrayLog = 1003, + ArrayTypedMapStringStoreValue = 1004, // Continue to add more Ethereum type IDs here. // e.g.: // NextEthereumType = 1004, diff --git a/graph/tests/entity_cache.rs b/graph/tests/entity_cache.rs index bbec082ec3b..38a490a2622 100644 --- a/graph/tests/entity_cache.rs +++ b/graph/tests/entity_cache.rs @@ -1,22 +1,28 @@ use async_trait::async_trait; use graph::blockchain::block_stream::FirehoseCursor; -use graph::blockchain::BlockPtr; -use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth}; -use graph::data_source::CausalityRegion; -use graph::prelude::{Schema, StopwatchMetrics, StoreError, UnfailOutcome}; -use lazy_static::lazy_static; -use slog::Logger; -use std::collections::{BTreeMap, BTreeSet}; -use std::sync::Arc; - use graph::components::store::{ - DeploymentCursorTracker, EntityKey, EntityType, ReadStore, StoredDynamicDataSource, - WritableStore, + DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest, + ReadStore, StoredDynamicDataSource, WritableStore, }; +use graph::data::subgraph::schema::{DeploymentCreate, SubgraphError, SubgraphHealth}; +use graph::data_source::CausalityRegion; +use graph::prelude::*; use graph::{ components::store::{DeploymentId, DeploymentLocator}, prelude::{DeploymentHash, Entity, EntityCache, EntityModification, Value}, }; +use hex_literal::hex; + +use graph::semver::Version; +use lazy_static::lazy_static; +use slog::Logger; +use std::collections::{BTreeMap, BTreeSet}; +use std::marker::PhantomData; +use std::sync::Arc; +use web3::types::H256; + +use graph_store_postgres::SubgraphStore as DieselSubgraphStore; +use test_store::*; lazy_static! { static ref SUBGRAPH_ID: DeploymentHash = DeploymentHash::new("entity_cache").unwrap(); @@ -60,6 +66,13 @@ impl ReadStore for MockStore { Ok(self.get_many_res.clone()) } + fn get_derived( + &self, + _key: &DerivedEntityQuery, + ) -> Result, StoreError> { + Ok(self.get_many_res.clone()) + } + fn input_schema(&self) -> Arc { SCHEMA.clone() } @@ -351,3 +364,392 @@ fn consecutive_modifications() { },]) ); } + +const ACCOUNT_GQL: &str = " + type Account @entity { + id: ID! + name: String! + email: String! + age: Int! + wallets: [Wallet!]! @derivedFrom(field: \"account\") + } + + type Wallet @entity { + id: ID! + balance: Int! + account: Account! + } +"; + +const ACCOUNT: &str = "Account"; +const WALLET: &str = "Wallet"; + +lazy_static! { + static ref LOAD_RELATED_ID_STRING: String = String::from("loadrelatedsubgraph"); + static ref LOAD_RELATED_ID: DeploymentHash = + DeploymentHash::new(LOAD_RELATED_ID_STRING.as_str()).unwrap(); + static ref LOAD_RELATED_SUBGRAPH: Schema = + Schema::parse(ACCOUNT_GQL, LOAD_RELATED_ID.clone()).expect("Failed to parse user schema"); + static ref TEST_BLOCK_1_PTR: BlockPtr = ( + H256::from(hex!( + "8511fa04b64657581e3f00e14543c1d522d5d7e771b54aa3060b662ade47da13" + )), + 1u64 + ) + .into(); +} + +fn remove_test_data(store: Arc) { + store + .delete_all_entities_for_test_use_only() + .expect("deleting test entities succeeds"); +} + +fn run_store_test(test: F) +where + F: FnOnce( + EntityCache, + Arc, + DeploymentLocator, + Arc, + ) -> R + + Send + + 'static, + R: std::future::Future + Send + 'static, +{ + run_test_sequentially(|store| async move { + let subgraph_store = store.subgraph_store(); + // Reset state before starting + remove_test_data(subgraph_store.clone()); + + // Seed database with test data + let deployment = insert_test_data(subgraph_store.clone()).await; + let writable = store + .subgraph_store() + .writable(LOGGER.clone(), deployment.id) + .await + .expect("we can get a writable store"); + + // we send the information to the database + writable.flush().await.unwrap(); + + let read_store = Arc::new(writable.clone()); + + let cache = EntityCache::new(read_store); + // Run test and wait for the background writer to finish its work so + // it won't conflict with the next test + test(cache, subgraph_store.clone(), deployment, writable.clone()).await; + writable.flush().await.unwrap(); + }); +} + +async fn insert_test_data(store: Arc) -> DeploymentLocator { + let manifest = SubgraphManifest:: { + id: LOAD_RELATED_ID.clone(), + spec_version: Version::new(1, 0, 0), + features: Default::default(), + description: None, + repository: None, + schema: LOAD_RELATED_SUBGRAPH.clone(), + data_sources: vec![], + graft: None, + templates: vec![], + chain: PhantomData, + }; + + // Create SubgraphDeploymentEntity + let deployment = DeploymentCreate::new(String::new(), &manifest, None); + let name = SubgraphName::new("test/store").unwrap(); + let node_id = NodeId::new("test").unwrap(); + let deployment = store + .create_subgraph_deployment( + name, + &LOAD_RELATED_SUBGRAPH, + deployment, + node_id, + NETWORK_NAME.to_string(), + SubgraphVersionSwitchingMode::Instant, + ) + .unwrap(); + + // 1 account 3 wallets + let test_entity_1 = create_account_entity("1", "Johnton", "tonofjohn@email.com", 67_i32); + let wallet_entity_1 = create_wallet_operation("1", "1", 67_i32); + let wallet_entity_2 = create_wallet_operation("2", "1", 92_i32); + let wallet_entity_3 = create_wallet_operation("3", "1", 192_i32); + // 1 account 1 wallet + let test_entity_2 = create_account_entity("2", "Cindini", "dinici@email.com", 42_i32); + let wallet_entity_4 = create_wallet_operation("4", "2", 32_i32); + // 1 account 0 wallets + let test_entity_3 = create_account_entity("3", "Shaqueeena", "queensha@email.com", 28_i32); + transact_entity_operations( + &store, + &deployment, + GENESIS_PTR.clone(), + vec![ + test_entity_1, + test_entity_2, + test_entity_3, + wallet_entity_1, + wallet_entity_2, + wallet_entity_3, + wallet_entity_4, + ], + ) + .await + .unwrap(); + deployment +} + +fn create_account_entity(id: &str, name: &str, email: &str, age: i32) -> EntityOperation { + let mut test_entity = Entity::new(); + + test_entity.insert("id".to_owned(), Value::String(id.to_owned())); + test_entity.insert("name".to_owned(), Value::String(name.to_owned())); + test_entity.insert("email".to_owned(), Value::String(email.to_owned())); + test_entity.insert("age".to_owned(), Value::Int(age)); + + EntityOperation::Set { + key: EntityKey::data(ACCOUNT.to_owned(), id.to_owned()), + data: test_entity, + } +} + +fn create_wallet_entity(id: &str, account_id: &str, balance: i32) -> Entity { + let mut test_wallet = Entity::new(); + + test_wallet.insert("id".to_owned(), Value::String(id.to_owned())); + test_wallet.insert("account".to_owned(), Value::String(account_id.to_owned())); + test_wallet.insert("balance".to_owned(), Value::Int(balance)); + test_wallet +} +fn create_wallet_operation(id: &str, account_id: &str, balance: i32) -> EntityOperation { + let test_wallet = create_wallet_entity(id, account_id, balance); + EntityOperation::Set { + key: EntityKey::data(WALLET.to_owned(), id.to_owned()), + data: test_wallet, + } +} + +#[test] +fn check_for_account_with_multiple_wallets() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "1"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("1", account_id, 67_i32); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_account_with_single_wallet() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "2"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("4", account_id, 32_i32); + let expeted_vec = vec![wallet_1]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_account_with_no_wallet() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "3"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let expeted_vec = vec![]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_account_that_doesnt_exist() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "4"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let expeted_vec = vec![]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_non_existent_field() { + run_store_test(|mut cache, _store, _deployment, _writable| async move { + let account_id = "1"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "friends".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap_err(); + let expected = format!( + "Entity {}[{}]: unknown field `{}`", + request.entity_type, request.entity_id, request.entity_field, + ); + + assert_eq!(format!("{}", result), expected); + }); +} + +#[test] +fn check_for_insert_async_store() { + run_store_test(|mut cache, store, deployment, _writable| async move { + let account_id = "2"; + // insert a new wallet + let wallet_entity_5 = create_wallet_operation("5", account_id, 79_i32); + let wallet_entity_6 = create_wallet_operation("6", account_id, 200_i32); + + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![wallet_entity_5, wallet_entity_6], + ) + .await + .unwrap(); + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("4", account_id, 32_i32); + let wallet_2 = create_wallet_entity("5", account_id, 79_i32); + let wallet_3 = create_wallet_entity("6", account_id, 200_i32); + let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} +#[test] +fn check_for_insert_async_not_related() { + run_store_test(|mut cache, store, deployment, _writable| async move { + let account_id = "2"; + // insert a new wallet + let wallet_entity_5 = create_wallet_operation("5", account_id, 79_i32); + let wallet_entity_6 = create_wallet_operation("6", account_id, 200_i32); + + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![wallet_entity_5, wallet_entity_6], + ) + .await + .unwrap(); + let account_id = "1"; + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_1 = create_wallet_entity("1", account_id, 67_i32); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![wallet_1, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_update_async_related() { + run_store_test(|mut cache, store, deployment, writable| async move { + let account_id = "1"; + let entity_key = EntityKey::data(WALLET.to_owned(), "1".to_owned()); + let wallet_entity_update = create_wallet_operation("1", account_id, 79_i32); + + let new_data = match wallet_entity_update { + EntityOperation::Set { ref data, .. } => data.clone(), + _ => unreachable!(), + }; + assert_ne!(writable.get(&entity_key).unwrap().unwrap(), new_data); + // insert a new wallet + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![wallet_entity_update], + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![new_data, wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} + +#[test] +fn check_for_delete_async_related() { + run_store_test(|mut cache, store, deployment, _writable| async move { + let account_id = "1"; + let del_key = EntityKey::data(WALLET.to_owned(), "1".to_owned()); + // delete wallet + transact_entity_operations( + &store, + &deployment, + TEST_BLOCK_1_PTR.clone(), + vec![EntityOperation::Remove { key: del_key }], + ) + .await + .unwrap(); + + let request = LoadRelatedRequest { + entity_type: EntityType::new(ACCOUNT.to_string()), + entity_field: "wallets".into(), + entity_id: account_id.into(), + causality_region: CausalityRegion::ONCHAIN, + }; + let result = cache.load_related(&request).unwrap(); + let wallet_2 = create_wallet_entity("2", account_id, 92_i32); + let wallet_3 = create_wallet_entity("3", account_id, 192_i32); + let expeted_vec = vec![wallet_2, wallet_3]; + + assert_eq!(result, expeted_vec); + }); +} diff --git a/runtime/wasm/src/asc_abi/class.rs b/runtime/wasm/src/asc_abi/class.rs index 0fdac204847..95c14b1bfb3 100644 --- a/runtime/wasm/src/asc_abi/class.rs +++ b/runtime/wasm/src/asc_abi/class.rs @@ -609,6 +609,10 @@ impl AscIndexId for AscTypedMap> { const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::TypedMapStringStoreValue; } +impl AscIndexId for Array> { + const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::ArrayTypedMapStringStoreValue; +} + impl AscIndexId for AscTypedMap> { const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::TypedMapStringJsonValue; } diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index ded1d7193d6..e943749b332 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -9,7 +9,7 @@ use wasmtime::Trap; use web3::types::H160; use graph::blockchain::Blockchain; -use graph::components::store::EnsLookup; +use graph::components::store::{EnsLookup, LoadRelatedRequest}; use graph::components::store::{EntityKey, EntityType}; use graph::components::subgraph::{ PoICausalityRegion, ProofOfIndexingEvent, SharedProofOfIndexing, @@ -239,6 +239,28 @@ impl HostExports { Ok(result) } + pub(crate) fn store_load_related( + &self, + state: &mut BlockState, + entity_type: String, + entity_id: String, + entity_field: String, + gas: &GasCounter, + ) -> Result, anyhow::Error> { + let store_key = LoadRelatedRequest { + entity_type: EntityType::new(entity_type), + entity_id: entity_id.into(), + entity_field: entity_field.into(), + causality_region: self.data_source_causality_region, + }; + self.check_entity_type_access(&store_key.entity_type)?; + + let result = state.entity_cache.load_related(&store_key)?; + gas.consume_host_fn(gas::STORE_GET.with_args(complexity::Linear, (&store_key, &result)))?; + + Ok(result) + } + /// Prints the module of `n` in hex. /// Integers are encoded using the least amount of digits (no leading zero digits). /// Their encoding may be of uneven length. The number zero encodes as "0x0". diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index c7ac94175ac..94eda62d807 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -527,6 +527,14 @@ impl WasmInstance { link!("abort", abort, message_ptr, file_name_ptr, line, column); link!("store.get", store_get, "host_export_store_get", entity, id); + link!( + "store.loadRelated", + store_load_related, + "host_export_store_load_related", + entity, + id, + field + ); link!( "store.set", store_set, @@ -1059,6 +1067,31 @@ impl WasmInstanceContext { Ok(ret) } + /// function store.loadRelated(entity_type: string, id: string, field: string): Array + pub fn store_load_related( + &mut self, + gas: &GasCounter, + entity_type_ptr: AscPtr, + id_ptr: AscPtr, + field_ptr: AscPtr, + ) -> Result>>, HostExportError> { + let entity_type: String = asc_get(self, entity_type_ptr, gas)?; + let id: String = asc_get(self, id_ptr, gas)?; + let field: String = asc_get(self, field_ptr, gas)?; + let entities = self.ctx.host_exports.store_load_related( + &mut self.ctx.state, + entity_type.clone(), + id.clone(), + field.clone(), + gas, + )?; + + let entities: Vec> = + entities.into_iter().map(|entity| entity.sorted()).collect(); + let ret = asc_new(self, &entities, gas)?; + Ok(ret) + } + /// function typeConversion.bytesToString(bytes: Bytes): string pub fn bytes_to_string( &mut self, diff --git a/runtime/wasm/src/to_from/external.rs b/runtime/wasm/src/to_from/external.rs index 69532fbf237..fd8a2e2ad16 100644 --- a/runtime/wasm/src/to_from/external.rs +++ b/runtime/wasm/src/to_from/external.rs @@ -333,6 +333,18 @@ impl ToAscObj for Vec<(String, store::Value)> { } } +impl ToAscObj>> for Vec> { + fn to_asc_obj( + &self, + heap: &mut H, + gas: &GasCounter, + ) -> Result>, HostExportError> { + let content: Result, _> = self.iter().map(|x| asc_new(heap, &x, gas)).collect(); + let content = content?; + Array::new(&content, heap, gas) + } +} + impl ToAscObj> for serde_json::Value { fn to_asc_obj( &self, diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index d40e48c68ef..32e040f95e4 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -6,8 +6,8 @@ use diesel::r2d2::{ConnectionManager, PooledConnection}; use graph::anyhow::Context; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{ - EntityKey, EntityType, PrunePhase, PruneReporter, PruneRequest, PruningStrategy, - StoredDynamicDataSource, VersionStats, + DerivedEntityQuery, EntityKey, EntityType, PrunePhase, PruneReporter, PruneRequest, + PruningStrategy, StoredDynamicDataSource, VersionStats, }; use graph::components::versions::VERSIONS; use graph::data::query::Trace; @@ -1119,6 +1119,18 @@ impl DeploymentStore { layout.find_many(&conn, ids_for_type, block) } + pub(crate) fn get_derived( + &self, + site: Arc, + derived_query: &DerivedEntityQuery, + block: BlockNumber, + excluded_keys: &Vec, + ) -> Result, StoreError> { + let conn = self.get_conn()?; + let layout = self.layout(&conn, site)?; + layout.find_derived(&conn, derived_query, block, excluded_keys) + } + pub(crate) fn get_changes( &self, site: Arc, diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 26cc5ca304b..4e578db1ec1 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -41,7 +41,7 @@ use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use crate::relational_queries::{FindChangesQuery, FindPossibleDeletionsQuery}; +use crate::relational_queries::{FindChangesQuery, FindDerivedQuery, FindPossibleDeletionsQuery}; use crate::{ primary::{Namespace, Site}, relational_queries::{ @@ -49,7 +49,7 @@ use crate::{ FilterQuery, FindManyQuery, FindQuery, InsertQuery, RevertClampQuery, RevertRemoveQuery, }, }; -use graph::components::store::{EntityKey, EntityType}; +use graph::components::store::{DerivedEntityQuery, EntityKey, EntityType}; use graph::data::graphql::ext::{DirectiveFinder, DocumentExt, ObjectTypeExt}; use graph::data::schema::{FulltextConfig, FulltextDefinition, Schema, SCHEMA_TYPE_NAME}; use graph::data::store::BYTES_SCALAR; @@ -558,6 +558,32 @@ impl Layout { Ok(entities) } + pub fn find_derived( + &self, + conn: &PgConnection, + derived_query: &DerivedEntityQuery, + block: BlockNumber, + excluded_keys: &Vec, + ) -> Result, StoreError> { + let table = self.table_for_entity(&derived_query.entity_type)?; + let query = FindDerivedQuery::new(table, derived_query, block, excluded_keys); + + let mut entities = BTreeMap::new(); + + for data in query.load::(conn)? { + let entity_type = data.entity_type(); + let entity_data: Entity = data.deserialize_with_layout(self, None, true)?; + let key = EntityKey { + entity_type, + entity_id: entity_data.id()?.into(), + causality_region: CausalityRegion::from_entity(&entity_data), + }; + + entities.insert(key, entity_data); + } + Ok(entities) + } + pub fn find_changes( &self, conn: &PgConnection, @@ -713,6 +739,7 @@ impl Layout { query.query_id, &self.site, )?; + let query_clone = query.clone(); let start = Instant::now(); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index bb60880935c..35b86278d5d 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -12,7 +12,7 @@ use diesel::result::{Error as DieselError, QueryResult}; use diesel::sql_types::{Array, BigInt, Binary, Bool, Integer, Jsonb, Text}; use diesel::Connection; -use graph::components::store::EntityKey; +use graph::components::store::{DerivedEntityQuery, EntityKey}; use graph::data::value::Word; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -1668,6 +1668,76 @@ impl<'a> LoadQuery for FindManyQuery<'a> { impl<'a, Conn> RunQueryDsl for FindManyQuery<'a> {} +/// A query that finds an entity by key. Used during indexing. +/// See also `FindManyQuery`. +#[derive(Debug, Clone, Constructor)] +pub struct FindDerivedQuery<'a> { + table: &'a Table, + derived_query: &'a DerivedEntityQuery, + block: BlockNumber, + excluded_keys: &'a Vec, +} + +impl<'a> QueryFragment for FindDerivedQuery<'a> { + fn walk_ast(&self, mut out: AstPass) -> QueryResult<()> { + out.unsafe_to_cache_prepared(); + + let DerivedEntityQuery { + entity_type: _, + entity_field, + value: entity_id, + causality_region, + } = self.derived_query; + + // Generate + // select '..' as entity, to_jsonb(e.*) as data + // from schema.table e where field = $1 + out.push_sql("select "); + out.push_bind_param::(&self.table.object.as_str())?; + out.push_sql(" as entity, to_jsonb(e.*) as data\n"); + out.push_sql(" from "); + out.push_sql(self.table.qualified_name.as_str()); + out.push_sql(" e\n where "); + + if self.excluded_keys.len() > 0 { + let primary_key = self.table.primary_key(); + out.push_identifier(primary_key.name.as_str())?; + out.push_sql(" not in ("); + for (i, value) in self.excluded_keys.iter().enumerate() { + if i > 0 { + out.push_sql(", "); + } + out.push_bind_param::(&value.entity_id.as_str())?; + } + out.push_sql(") and "); + } + out.push_identifier(entity_field.as_str())?; + out.push_sql(" = "); + out.push_bind_param::(&entity_id.as_str())?; + out.push_sql(" and "); + if self.table.has_causality_region { + out.push_sql("causality_region = "); + out.push_bind_param::(causality_region)?; + out.push_sql(" and "); + } + BlockRangeColumn::new(self.table, "e.", self.block).contains(&mut out) + } +} + +impl<'a> QueryId for FindDerivedQuery<'a> { + type QueryId = (); + + const HAS_STATIC_QUERY_ID: bool = false; +} + +impl<'a> LoadQuery for FindDerivedQuery<'a> { + fn internal_load(self, conn: &PgConnection) -> QueryResult> { + conn.query_by_name(&self) + } +} + +impl<'a, Conn> RunQueryDsl for FindDerivedQuery<'a> {} + #[derive(Debug)] pub struct InsertQuery<'a> { table: &'a Table, diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 62b47b57097..a5127aa715b 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -4,8 +4,7 @@ use std::sync::Mutex; use std::{collections::BTreeMap, sync::Arc}; use graph::blockchain::block_stream::FirehoseCursor; -use graph::components::store::ReadStore; -use graph::components::store::{DeploymentCursorTracker, EntityKey}; +use graph::components::store::{DeploymentCursorTracker, DerivedEntityQuery, EntityKey, ReadStore}; use graph::data::subgraph::schema; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -251,6 +250,18 @@ impl SyncStore { }) } + fn get_derived( + &self, + key: &DerivedEntityQuery, + block: BlockNumber, + excluded_keys: Vec, + ) -> Result, StoreError> { + retry::forever(&self.logger, "get_derived", || { + self.writable + .get_derived(self.site.cheap_clone(), key, block, &excluded_keys) + }) + } + async fn is_deployment_synced(&self) -> Result { retry::forever_async(&self.logger, "is_deployment_synced", || async { self.writable @@ -746,6 +757,70 @@ impl Queue { Ok(map) } + fn get_derived( + &self, + derived_query: &DerivedEntityQuery, + ) -> Result, StoreError> { + let mut tracker = BlockTracker::new(); + + // Get entities from entries in the queue + let entities_in_queue = self.queue.fold( + BTreeMap::new(), + |mut map: BTreeMap>, req| { + tracker.update(req.as_ref()); + match req.as_ref() { + Request::Write { + block_ptr, mods, .. + } => { + if tracker.visible(block_ptr) { + for emod in mods { + let key = emod.entity_ref(); + // we select just the entities that match the query + if derived_query.entity_type == key.entity_type { + if let Some(entity) = emod.entity().cloned() { + if let Some(related_id) = + entity.get(derived_query.entity_field.as_str()) + { + // we check only the field agains the value + if related_id.to_string() + == derived_query.value.to_string() + { + map.insert(key.clone(), Some(entity)); + } + } + } else { + // if the entity was deleted, we add here with no checks + // just for removing from the query + map.insert(key.clone(), emod.entity().cloned()); + } + } + } + } + } + Request::RevertTo { .. } | Request::Stop => { /* nothing to do */ } + } + map + }, + ); + + let excluded_keys: Vec = entities_in_queue.keys().cloned().collect(); + + // We filter to exclude the entities ids that we already have from the queue + let mut items_from_database = + self.store + .get_derived(derived_query, tracker.query_block(), excluded_keys)?; + + // Extend the store results with the entities from the queue. + // This overwrites any entitiy from the database with the same key from queue + let items_from_queue: BTreeMap = entities_in_queue + .into_iter() + .filter_map(|(key, entity)| entity.map(|entity| (key, entity))) + .collect(); + items_from_database.extend(items_from_queue); + + Ok(items_from_database) + } + /// Load dynamic data sources by looking at both the queue and the store async fn load_dynamic_data_sources( &self, @@ -904,6 +979,16 @@ impl Writer { } } + fn get_derived( + &self, + key: &DerivedEntityQuery, + ) -> Result, StoreError> { + match self { + Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, vec![]), + Writer::Async(queue) => queue.get_derived(key), + } + } + async fn load_dynamic_data_sources( &self, manifest_idx_and_name: Vec<(u32, String)>, @@ -993,6 +1078,13 @@ impl ReadStore for WritableStore { self.writer.get_many(keys) } + fn get_derived( + &self, + key: &DerivedEntityQuery, + ) -> Result, StoreError> { + self.writer.get_derived(key) + } + fn input_schema(&self) -> Arc { self.store.input_schema() }