diff --git a/chain/arweave/src/data_source.rs b/chain/arweave/src/data_source.rs index 1a8a16f4e28..a4189c8df5e 100644 --- a/chain/arweave/src/data_source.rs +++ b/chain/arweave/src/data_source.rs @@ -47,8 +47,11 @@ impl blockchain::DataSource for DataSource { self.source.start_block } - fn end_block(&self) -> Option { - self.source.end_block + fn has_expired(&self, block: BlockNumber) -> bool { + match self.source.end_block { + Some(end_block) => block > end_block, + None => false, + } } fn match_and_decode( diff --git a/chain/cosmos/src/data_source.rs b/chain/cosmos/src/data_source.rs index d5d8963fd3a..3ef74735eaf 100644 --- a/chain/cosmos/src/data_source.rs +++ b/chain/cosmos/src/data_source.rs @@ -49,8 +49,11 @@ impl blockchain::DataSource for DataSource { self.source.start_block } - fn end_block(&self) -> Option { - self.source.end_block + fn has_expired(&self, block: BlockNumber) -> bool { + match self.source.end_block { + Some(end_block) => block > end_block, + None => false, + } } fn match_and_decode( diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 04d1801b425..7e934d5fc08 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -109,8 +109,11 @@ impl blockchain::DataSource for DataSource { self.start_block } - fn end_block(&self) -> Option { - self.end_block + fn has_expired(&self, block: BlockNumber) -> bool { + match self.end_block { + Some(end_block) => block > end_block, + None => false, + } } fn match_and_decode( @@ -152,7 +155,6 @@ impl blockchain::DataSource for DataSource { address, mapping, context, - // EBTODO: Re-evaluate if endBlock need to be considered end_block: _, // The creation block is ignored for detection duplicate data sources. @@ -225,8 +227,7 @@ impl blockchain::DataSource for DataSource { manifest_idx, address, start_block: 0, - // EBTODO: Re-evaluate if this needs to be set to done_at, - end_block: done_at, + end_block: None, mapping: template.mapping.clone(), context: Arc::new(context), creation_block, diff --git a/chain/near/src/data_source.rs b/chain/near/src/data_source.rs index 60acf011118..d5be019d40d 100644 --- a/chain/near/src/data_source.rs +++ b/chain/near/src/data_source.rs @@ -75,8 +75,11 @@ impl blockchain::DataSource for DataSource { self.source.start_block } - fn end_block(&self) -> Option { - self.source.end_block + fn has_expired(&self, block: BlockNumber) -> bool { + match self.source.end_block { + Some(end_block) => block > end_block, + None => false, + } } fn match_and_decode( diff --git a/chain/substreams/src/data_source.rs b/chain/substreams/src/data_source.rs index f8d731f712d..1b0464214c8 100644 --- a/chain/substreams/src/data_source.rs +++ b/chain/substreams/src/data_source.rs @@ -51,8 +51,8 @@ impl blockchain::DataSource for DataSource { self.initial_block.unwrap_or(0) } - fn end_block(&self) -> Option { - None + fn has_expired(&self, _: BlockNumber) -> bool { + false } fn name(&self) -> &str { diff --git a/core/src/subgraph/context.rs b/core/src/subgraph/context.rs index 09fa464291a..92ee18d11ae 100644 --- a/core/src/subgraph/context.rs +++ b/core/src/subgraph/context.rs @@ -177,10 +177,6 @@ impl> IndexingContext { pub fn instance(&self) -> &SubgraphInstance { &self.instance } - - pub fn hosts(&self) -> &[Arc] { - self.instance.hosts() - } } pub struct OffchainMonitor { diff --git a/core/src/subgraph/context/instance.rs b/core/src/subgraph/context/instance.rs index b2226eb5618..5a1cfe30826 100644 --- a/core/src/subgraph/context/instance.rs +++ b/core/src/subgraph/context/instance.rs @@ -37,7 +37,6 @@ where pub fn from_manifest( logger: &Logger, manifest: SubgraphManifest, - static_data_sources: Vec>, data_sources: Vec>, host_builder: T, host_metrics: Arc, @@ -52,7 +51,7 @@ where host_builder, subgraph_id, network, - static_data_sources: Arc::new(static_data_sources), + static_data_sources: Arc::new(manifest.data_sources), hosts: Hosts::new(), module_cache: HashMap::new(), templates, @@ -180,7 +179,6 @@ where return vec![]; } - // EBTODO self.hosts .hosts() .iter() diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index fde066663ce..bcb67cf0dbf 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -273,18 +273,16 @@ impl SubgraphInstanceManager { // Dynamic data sources are loaded by appending them to the manifest. // // Refactor: Preferrably we'd avoid any mutation of the manifest. - let (manifest, static_data_sources, dynamic_data_sources) = { + let (manifest, dynamic_data_sources) = { let dynamic_data_sources = load_dynamic_data_sources(store.clone(), logger.clone(), &manifest) .await .context("Failed to load dynamic data sources")?; - let static_data_sources = manifest.data_sources.clone(); - - (manifest, static_data_sources, dynamic_data_sources) + (manifest, dynamic_data_sources) }; - let mut data_sources = static_data_sources.clone(); + let mut data_sources = manifest.data_sources.clone(); data_sources.extend(dynamic_data_sources); info!(logger, "Data source count at start: {}", data_sources.len()); @@ -384,7 +382,6 @@ impl SubgraphInstanceManager { let instance = super::context::instance::SubgraphInstance::from_manifest( &logger, manifest, - static_data_sources, data_sources, host_builder, host_metrics.clone(), diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index ceac466cd0f..d578f69cc3b 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -166,10 +166,8 @@ where .iter() .filter_map(|ds| ds.as_onchain()) // Filter out data sources that have reached their end block. - .filter(|ds| match ds.end_block() { - Some(end_block) => { - current_ptr.as_ref().map(|ptr| ptr.number).unwrap_or(0) > end_block - } + .filter(|ds| match current_ptr.as_ref() { + Some(block) => !ds.has_expired(block.number), None => true, }), ); @@ -187,6 +185,7 @@ where } else { C::TriggerFilter::from_data_sources( self.ctx + .instance() .hosts() .iter() .filter_map(|h| h.data_source().as_onchain()), @@ -506,24 +505,6 @@ where ); } - // EBTODO: Since endBlock reached datasources are ignored in match_and_decode. This might actually not be needed. - let end_block_reached_datasources = self.ctx.hosts().iter().filter_map(|host| { - if let Some(ds) = host.data_source().as_onchain() { - if ds.end_block() == Some(block_ptr.number) { - let mut stored_dynamic_data_source = ds.as_stored_dynamic_data_source(); - stored_dynamic_data_source.done_at = Some(block_ptr.number); - Some(stored_dynamic_data_source) - } else { - None - } - } else { - None - } - }); - - let processed_datasources = end_block_reached_datasources - .chain(processed_offchain_data_sources.into_iter()) - .collect::>(); // Transact entity operations into the store and update the // subgraph's block stream pointer let _section = self.metrics.host.stopwatch.start_section("transact_block"); @@ -560,7 +541,7 @@ where &self.metrics.host.stopwatch, persisted_data_sources, deterministic_errors, - processed_datasources, + processed_offchain_data_sources, is_non_fatal_errors_active, ) .await diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 527067381d9..a2287f39e29 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -70,7 +70,7 @@ impl DataSource for MockDataSource { todo!() } - fn end_block(&self) -> Option { + fn has_expired(&self, _: BlockNumber) -> bool { todo!() } diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index b92724228a6..8d307b8ba98 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -264,7 +264,11 @@ pub trait DataSource: 'static + Sized + Send + Sync + Clone { fn address(&self) -> Option<&[u8]>; fn start_block(&self) -> BlockNumber; - fn end_block(&self) -> Option; + + /// If the data source has an `endBlock`, check whether the trigger block is + /// within the range of blocks that the data source is supposed to handle. + /// Otherwise, ignore the trigger. + fn has_expired(&self, block: BlockNumber) -> bool; fn name(&self) -> &str; fn kind(&self) -> &str; fn network(&self) -> Option<&str>; diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index b6277753e0a..7de14ffb0a4 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -158,19 +158,10 @@ impl DataSource { logger: &Logger, ) -> Result>>, Error> { match (self, trigger) { - (Self::Onchain(ds), TriggerData::Onchain(trigger)) => { - // If the data source has an `endBlock` field, check that the trigger block is - // within the range of blocks that the data source is supposed to handle. - // Otherwise, ignore the trigger. - if let Some(end_block) = ds.end_block() { - if end_block < block.number() { - return Ok(None); - } - } - - ds.match_and_decode(trigger, block, logger) - .map(|t| t.map(|t| t.map(MappingTrigger::Onchain))) - } + (Self::Onchain(ds), _) if ds.has_expired(block.number()) => Ok(None), + (Self::Onchain(ds), TriggerData::Onchain(trigger)) => ds + .match_and_decode(trigger, block, logger) + .map(|t| t.map(|t| t.map(MappingTrigger::Onchain))), (Self::Offchain(ds), TriggerData::Offchain(trigger)) => { Ok(ds.match_and_decode(trigger)) } diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 058b3aa0c7c..9f7c2a1c84a 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1114,11 +1114,7 @@ impl DeploymentStore { dynds::insert(&conn, &site, &batch.data_sources, manifest_idx_and_name)?; - dynds::update_processed_datasources_status( - &conn, - &site, - &batch.offchain_to_remove, - )?; + dynds::update_offchain_status(&conn, &site, &batch.offchain_to_remove)?; if !batch.deterministic_errors.is_empty() { deployment::insert_subgraph_errors( diff --git a/store/postgres/src/dynds/mod.rs b/store/postgres/src/dynds/mod.rs index ab2df54bd45..b62957d1e91 100644 --- a/store/postgres/src/dynds/mod.rs +++ b/store/postgres/src/dynds/mod.rs @@ -47,7 +47,7 @@ pub(crate) fn revert( } } -pub(crate) fn update_processed_datasources_status( +pub(crate) fn update_offchain_status( conn: &PgConnection, site: &Site, data_sources: &write::DataSources, @@ -57,9 +57,9 @@ pub(crate) fn update_processed_datasources_status( } match site.schema_version.private_data_sources() { - true => DataSourcesTable::new(site.namespace.clone()) - .update_processed_datasources_status(conn, data_sources), - // EBTODO: understand this error, see if its applicable for non-offchain data sources that reached endBlock + true => { + DataSourcesTable::new(site.namespace.clone()).update_offchain_status(conn, data_sources) + } false => Err(constraint_violation!( "shared schema does not support data source offchain_found", )), diff --git a/store/postgres/src/dynds/private.rs b/store/postgres/src/dynds/private.rs index 0607edce586..08fc9f87272 100644 --- a/store/postgres/src/dynds/private.rs +++ b/store/postgres/src/dynds/private.rs @@ -292,7 +292,7 @@ impl DataSourcesTable { // Remove offchain data sources by checking the causality region, which currently uniquely // identifies an offchain data source. - pub(super) fn update_processed_datasources_status( + pub(super) fn update_offchain_status( &self, conn: &PgConnection, data_sources: &write::DataSources, @@ -309,7 +309,6 @@ impl DataSourcesTable { .bind::(ds.causality_region) .execute(conn)?; - // EBTODO: understand this error, see if how it needs to be modified to account for non-offchain data sources that reached endBlock if count > 1 { return Err(constraint_violation!( "expected to remove at most one offchain data source but would remove {}, causality region: {}", diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 846f10100a9..a7e268b3645 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -404,7 +404,6 @@ specVersion: 0.0.2 let end_block = data_source.as_onchain().unwrap().end_block; assert_eq!(Some(9562481), end_block); - assert_eq!("Qmmanifest", manifest.id.as_str()); } #[tokio::test]