Skip to content

Commit

Permalink
feature: Add support for offchain data sources & templates (#3791)
Browse files Browse the repository at this point in the history
* Refactor manifest data sources

* Refactor manifest data source templates

* Start offchain monitors for static sources

* Run offchain handlers

* offchain: dont expect `manifest_idx` in the manifest

* offchain: add `match_and_decode`, persist normally, require source

* trigger processor: take block ptr from the trigger

* offchain: Return cid to dataSource.address host fn

* runner: transact modifications of offchain events

* ethereum: fix test build

* ipfs: Set a default maximum file size

* ipfs: Add env var for max concurrent requests

* ipfs: Share ipfs service across subgraphs

* offchain: move `ready_offchain_events` to `OffchainMonitor`

* runner: Clarify comments

* core: Remove unecessary params from `add_dynamic_data_source`

* core: Move poi_version out of the instance

* core: Move `mod instance` under `mod context`

This should clarify the SubgraphInstance responsiblity of keeping
track of the hosts.

* core: Refactor OffchainMonitor::add_data_source

* offchain: Better handling of duplicates

* offchain: Bump max ipfs concurrent requests to 100

* refactor: Expose RuntimeHost data source

* offchain: Remove dses that have been processed

* refactor: Extract ReadStore out of WritableStore

* test: Add graphql queries to end-to-end tests

* feat(file ds): Bump max spec version to 0.0.7

* test: Add basic file data sources e2e test

* runner: Isolate offchain data sources

* offchain: Forbid static file data sources

* store: Assign separate causality region for offchain dses

* graph: Fix release build

* tests: yarn upgrade, add file ds to the workspace

* fix: Update comments

Co-authored-by: Leonardo Yvens <[email protected]>
  • Loading branch information
Theodus and leoyvens authored Aug 25, 2022
1 parent 9211241 commit fc4cd68
Show file tree
Hide file tree
Showing 78 changed files with 2,516 additions and 1,019 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
TESTS_GANACHE_HARD_WAIT_SECONDS: "30"
with:
command: test
args: --verbose --package graph-tests -- --nocapture --skip parallel_integration_tests
args: --verbose --package graph-tests -- --skip parallel_integration_tests

integration-tests:
name: Run integration tests
Expand Down
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ impl blockchain::DataSource<Chain> for DataSource {
},
};

Ok(Some(TriggerWithHandler::new(
Ok(Some(TriggerWithHandler::<Chain>::new(
trigger.cheap_clone(),
handler.to_owned(),
block.ptr(),
)))
}

Expand Down
4 changes: 2 additions & 2 deletions chain/arweave/src/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use graph::blockchain;
use graph::blockchain::Block;
use graph::blockchain::TriggerData;
use graph::cheap_clone::CheapClone;
Expand All @@ -9,6 +8,7 @@ use graph::runtime::gas::GasCounter;
use graph::runtime::AscHeap;
use graph::runtime::AscPtr;
use graph::runtime::DeterministicHostError;
use graph_runtime_wasm::module::ToAscPtr;
use std::{cmp::Ordering, sync::Arc};

use crate::codec;
Expand All @@ -33,7 +33,7 @@ impl std::fmt::Debug for ArweaveTrigger {
}
}

impl blockchain::MappingTrigger for ArweaveTrigger {
impl ToAscPtr for ArweaveTrigger {
fn to_asc_ptr<H: AscHeap>(
self,
heap: &mut H,
Expand Down
3 changes: 2 additions & 1 deletion chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ impl blockchain::DataSource<Chain> for DataSource {
},
};

Ok(Some(TriggerWithHandler::new(
Ok(Some(TriggerWithHandler::<Chain>::new(
trigger.cheap_clone(),
handler,
block.ptr(),
)))
}

Expand Down
5 changes: 3 additions & 2 deletions chain/cosmos/src/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{cmp::Ordering, sync::Arc};

use graph::blockchain::{Block, BlockHash, MappingTrigger, TriggerData};
use graph::blockchain::{Block, BlockHash, TriggerData};
use graph::cheap_clone::CheapClone;
use graph::prelude::{BlockNumber, Error};
use graph::runtime::{asc_new, gas::GasCounter, AscHeap, AscPtr, DeterministicHostError};
use graph_runtime_wasm::module::ToAscPtr;

use crate::codec;
use crate::data_source::EventOrigin;
Expand Down Expand Up @@ -34,7 +35,7 @@ impl std::fmt::Debug for CosmosTrigger {
}
}

impl MappingTrigger for CosmosTrigger {
impl ToAscPtr for CosmosTrigger {
fn to_asc_ptr<H: AscHeap>(
self,
heap: &mut H,
Expand Down
1 change: 0 additions & 1 deletion chain/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ graph-runtime-wasm = { path = "../../runtime/wasm" }
graph-runtime-derive = { path = "../../runtime/derive" }

[dev-dependencies]
graph-core = { path = "../../core" }
test-store = { path = "../../store/test-store" }
base64 = "0.13.0"

Expand Down
7 changes: 7 additions & 0 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ impl Chain {
#[async_trait]
impl Blockchain for Chain {
const KIND: BlockchainKind = BlockchainKind::Ethereum;
const ALIASES: &'static [&'static str] = &["ethereum/contract"];

type Block = BlockFinality;

Expand Down Expand Up @@ -389,6 +390,12 @@ pub enum BlockFinality {
NonFinal(EthereumBlockWithCalls),
}

impl Default for BlockFinality {
fn default() -> Self {
Self::Final(Arc::default())
}
}

impl BlockFinality {
pub(crate) fn light_block(&self) -> &Arc<LightEthereumBlock> {
match self {
Expand Down
19 changes: 16 additions & 3 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ impl blockchain::DataSource<Chain> for DataSource {
.as_ref()
.map(|ctx| serde_json::to_value(&ctx).unwrap()),
creation_block: self.creation_block,
is_offchain: false,
}
}

Expand All @@ -143,8 +144,14 @@ impl blockchain::DataSource<Chain> for DataSource {
param,
context,
creation_block,
is_offchain,
} = stored;

ensure!(
!is_offchain,
"attempted to convert offchain data source to ethereum data source"
);

let context = context.map(serde_json::from_value).transpose()?;

let contract_abi = template.mapping.find_abi(&template.source.abi)?;
Expand Down Expand Up @@ -485,11 +492,12 @@ impl DataSource {
Some(handler) => handler,
None => return Ok(None),
};
Ok(Some(TriggerWithHandler::new(
Ok(Some(TriggerWithHandler::<Chain>::new(
MappingTrigger::Block {
block: block.cheap_clone(),
},
handler.handler,
block.block_ptr(),
)))
}
EthereumTrigger::Log(log, receipt) => {
Expand Down Expand Up @@ -587,7 +595,7 @@ impl DataSource {
"address" => format!("{}", &log.address),
"transaction" => format!("{}", &transaction.hash),
});
Ok(Some(TriggerWithHandler::new_with_logging_extras(
Ok(Some(TriggerWithHandler::<Chain>::new_with_logging_extras(
MappingTrigger::Log {
block: block.cheap_clone(),
transaction: Arc::new(transaction),
Expand All @@ -596,6 +604,7 @@ impl DataSource {
receipt: receipt.clone(),
},
event_handler.handler,
block.block_ptr(),
logging_extras,
)))
}
Expand Down Expand Up @@ -696,7 +705,7 @@ impl DataSource {
"to" => format!("{}", &call.to),
"transaction" => format!("{}", &transaction.hash),
});
Ok(Some(TriggerWithHandler::new_with_logging_extras(
Ok(Some(TriggerWithHandler::<Chain>::new_with_logging_extras(
MappingTrigger::Call {
block: block.cheap_clone(),
transaction,
Expand All @@ -705,6 +714,7 @@ impl DataSource {
outputs,
},
handler.handler,
block.block_ptr(),
logging_extras,
)))
}
Expand Down Expand Up @@ -757,6 +767,9 @@ impl TryFrom<DataSourceTemplateInfo<Chain>> for DataSource {
context,
creation_block,
} = info;
let template = template.into_onchain().ok_or(anyhow!(
"Cannot create onchain data source from offchain template"
))?;

// Obtain the address from the parameters
let string = params
Expand Down
4 changes: 2 additions & 2 deletions chain/ethereum/src/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use graph::blockchain;
use graph::blockchain::TriggerData;
use graph::data::subgraph::API_VERSION_0_0_2;
use graph::data::subgraph::API_VERSION_0_0_6;
Expand All @@ -24,6 +23,7 @@ use graph::runtime::AscHeap;
use graph::runtime::AscPtr;
use graph::runtime::DeterministicHostError;
use graph::semver::Version;
use graph_runtime_wasm::module::ToAscPtr;
use std::convert::TryFrom;
use std::ops::Deref;
use std::{cmp::Ordering, sync::Arc};
Expand Down Expand Up @@ -111,7 +111,7 @@ impl std::fmt::Debug for MappingTrigger {
}
}

impl blockchain::MappingTrigger for MappingTrigger {
impl ToAscPtr for MappingTrigger {
fn to_asc_ptr<H: AscHeap>(
self,
heap: &mut H,
Expand Down
27 changes: 18 additions & 9 deletions chain/ethereum/tests/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::time::Duration;

use graph::data::subgraph::schema::SubgraphError;
use graph::data::subgraph::{SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7};
use graph::data_source::DataSourceTemplate;
use graph::prelude::{
anyhow, async_trait, serde_yaml, tokio, DeploymentHash, Entity, Link, Logger, SubgraphManifest,
SubgraphManifestValidationError, UnvalidatedSubgraphManifest,
Expand All @@ -26,6 +27,7 @@ const GQL_SCHEMA_FULLTEXT: &str = include_str!("full-text.graphql");
const MAPPING_WITH_IPFS_FUNC_WASM: &[u8] = include_bytes!("ipfs-on-ethereum-contracts.wasm");
const ABI: &str = "[{\"type\":\"function\", \"inputs\": [{\"name\": \"i\",\"type\": \"uint256\"}],\"name\":\"get\",\"outputs\": [{\"type\": \"address\",\"name\": \"o\"}]}]";
const FILE: &str = "{}";
const FILE_CID: &str = "bafkreigkhuldxkyfkoaye4rgcqcwr45667vkygd45plwq6hawy7j4rbdky";

#[derive(Default, Debug, Clone)]
struct TextResolver {
Expand Down Expand Up @@ -82,7 +84,7 @@ async fn resolve_manifest(
resolver.add("/ipfs/Qmschema", &GQL_SCHEMA);
resolver.add("/ipfs/Qmabi", &ABI);
resolver.add("/ipfs/Qmmapping", &MAPPING_WITH_IPFS_FUNC_WASM);
resolver.add("/ipfs/Qmfile", &FILE);
resolver.add(FILE_CID, &FILE);

let resolver: Arc<dyn LinkResolverTrait> = Arc::new(resolver);

Expand Down Expand Up @@ -128,16 +130,14 @@ specVersion: 0.0.2

#[tokio::test]
async fn ipfs_manifest() {
const YAML: &str = "
let yaml = "
schema:
file:
/: /ipfs/Qmschema
dataSources:
dataSources: []
templates:
- name: IpfsSource
kind: file/ipfs
source:
file:
/: /ipfs/Qmfile
mapping:
apiVersion: 0.0.6
language: wasm/assemblyscript
Expand All @@ -149,11 +149,15 @@ dataSources:
specVersion: 0.0.7
";

let manifest = resolve_manifest(YAML, SPEC_VERSION_0_0_7).await;
let manifest = resolve_manifest(&yaml, SPEC_VERSION_0_0_7).await;

assert_eq!("Qmmanifest", manifest.id.as_str());
assert_eq!(manifest.offchain_data_sources.len(), 1);
assert_eq!(manifest.data_sources.len(), 0);
let data_source = match &manifest.templates[0] {
DataSourceTemplate::Offchain(ds) => ds,
DataSourceTemplate::Onchain(_) => unreachable!(),
};
assert_eq!(data_source.kind, "file/ipfs");
}

#[tokio::test]
Expand Down Expand Up @@ -392,7 +396,12 @@ specVersion: 0.0.2
";

let manifest = resolve_manifest(YAML, SPEC_VERSION_0_0_4).await;
let required_capabilities = NodeCapabilities::from_data_sources(&manifest.data_sources);
let onchain_data_sources = manifest
.data_sources
.iter()
.filter_map(|ds| ds.as_onchain().cloned())
.collect::<Vec<_>>();
let required_capabilities = NodeCapabilities::from_data_sources(&onchain_data_sources);

assert_eq!("Qmmanifest", manifest.id.as_str());
assert_eq!(true, required_capabilities.traces);
Expand Down
1 change: 1 addition & 0 deletions chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ mod test {
codec::Block {
header: Some(BlockHeader {
height,
hash: Some(codec::CryptoHash { bytes: vec![0; 32] }),
..Default::default()
}),
shards: vec![IndexerShard {
Expand Down
3 changes: 2 additions & 1 deletion chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ impl blockchain::DataSource<Chain> for DataSource {
}
};

Ok(Some(TriggerWithHandler::new(
Ok(Some(TriggerWithHandler::<Chain>::new(
trigger.cheap_clone(),
handler.to_owned(),
block.ptr(),
)))
}

Expand Down
10 changes: 4 additions & 6 deletions chain/near/src/trigger.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use graph::blockchain;
use graph::blockchain::Block;
use graph::blockchain::TriggerData;
use graph::cheap_clone::CheapClone;
use graph::prelude::hex;
use graph::prelude::web3::types::H256;
use graph::prelude::BlockNumber;
use graph::runtime::{asc_new, gas::GasCounter, AscHeap, AscPtr, DeterministicHostError};
use graph_runtime_wasm::module::ToAscPtr;
use std::{cmp::Ordering, sync::Arc};

use crate::codec;
Expand Down Expand Up @@ -35,7 +35,7 @@ impl std::fmt::Debug for NearTrigger {
}
}

impl blockchain::MappingTrigger for NearTrigger {
impl ToAscPtr for NearTrigger {
fn to_asc_ptr<H: AscHeap>(
self,
heap: &mut H,
Expand Down Expand Up @@ -159,8 +159,7 @@ mod tests {
let mut heap = BytesHeap::new(API_VERSION_0_0_5);
let trigger = NearTrigger::Block(Arc::new(block()));

let result =
blockchain::MappingTrigger::to_asc_ptr(trigger, &mut heap, &GasCounter::default());
let result = trigger.to_asc_ptr(&mut heap, &GasCounter::default());
assert!(result.is_ok());
}

Expand All @@ -173,8 +172,7 @@ mod tests {
receipt: receipt().unwrap(),
}));

let result =
blockchain::MappingTrigger::to_asc_ptr(trigger, &mut heap, &GasCounter::default());
let result = trigger.to_asc_ptr(&mut heap, &GasCounter::default());
assert!(result.is_ok());
}

Expand Down
Loading

0 comments on commit fc4cd68

Please sign in to comment.