Skip to content

Commit

Permalink
chain,core: refactor endBlock implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Aug 7, 2023
1 parent 6067942 commit d960f49
Show file tree
Hide file tree
Showing 16 changed files with 47 additions and 76 deletions.
7 changes: 5 additions & 2 deletions chain/arweave/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ impl blockchain::DataSource<Chain> for DataSource {
self.source.start_block
}

fn end_block(&self) -> Option<BlockNumber> {
self.source.end_block
fn has_expired(&self, block: BlockNumber) -> bool {
match self.source.end_block {
Some(end_block) => block > end_block,
None => false,
}
}

fn match_and_decode(
Expand Down
7 changes: 5 additions & 2 deletions chain/cosmos/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ impl blockchain::DataSource<Chain> for DataSource {
self.source.start_block
}

fn end_block(&self) -> Option<BlockNumber> {
self.source.end_block
fn has_expired(&self, block: BlockNumber) -> bool {
match self.source.end_block {
Some(end_block) => block > end_block,
None => false,
}
}

fn match_and_decode(
Expand Down
11 changes: 6 additions & 5 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,11 @@ impl blockchain::DataSource<Chain> for DataSource {
self.start_block
}

fn end_block(&self) -> Option<BlockNumber> {
self.end_block
fn has_expired(&self, block: BlockNumber) -> bool {
match self.end_block {
Some(end_block) => block > end_block,
None => false,
}
}

fn match_and_decode(
Expand Down Expand Up @@ -152,7 +155,6 @@ impl blockchain::DataSource<Chain> for DataSource {
address,
mapping,
context,
// EBTODO: Re-evaluate if endBlock need to be considered
end_block: _,

// The creation block is ignored for detection duplicate data sources.
Expand Down Expand Up @@ -225,8 +227,7 @@ impl blockchain::DataSource<Chain> for DataSource {
manifest_idx,
address,
start_block: 0,
// EBTODO: Re-evaluate if this needs to be set to done_at,
end_block: done_at,
end_block: None,
mapping: template.mapping.clone(),
context: Arc::new(context),
creation_block,
Expand Down
7 changes: 5 additions & 2 deletions chain/near/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,11 @@ impl blockchain::DataSource<Chain> for DataSource {
self.source.start_block
}

fn end_block(&self) -> Option<BlockNumber> {
self.source.end_block
fn has_expired(&self, block: BlockNumber) -> bool {
match self.source.end_block {
Some(end_block) => block > end_block,
None => false,
}
}

fn match_and_decode(
Expand Down
4 changes: 2 additions & 2 deletions chain/substreams/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl blockchain::DataSource<Chain> for DataSource {
self.initial_block.unwrap_or(0)
}

fn end_block(&self) -> Option<BlockNumber> {
None
fn has_expired(&self, _: BlockNumber) -> bool {
false
}

fn name(&self) -> &str {
Expand Down
4 changes: 0 additions & 4 deletions core/src/subgraph/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,6 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
pub fn instance(&self) -> &SubgraphInstance<C, T> {
&self.instance
}

pub fn hosts(&self) -> &[Arc<T::Host>] {
self.instance.hosts()
}
}

pub struct OffchainMonitor {
Expand Down
4 changes: 1 addition & 3 deletions core/src/subgraph/context/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ where
pub fn from_manifest(
logger: &Logger,
manifest: SubgraphManifest<C>,
static_data_sources: Vec<DataSource<C>>,
data_sources: Vec<DataSource<C>>,
host_builder: T,
host_metrics: Arc<HostMetrics>,
Expand All @@ -52,7 +51,7 @@ where
host_builder,
subgraph_id,
network,
static_data_sources: Arc::new(static_data_sources),
static_data_sources: Arc::new(manifest.data_sources),
hosts: Hosts::new(),
module_cache: HashMap::new(),
templates,
Expand Down Expand Up @@ -180,7 +179,6 @@ where
return vec![];
}

// EBTODO
self.hosts
.hosts()
.iter()
Expand Down
9 changes: 3 additions & 6 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,16 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
// Dynamic data sources are loaded by appending them to the manifest.
//
// Refactor: Preferrably we'd avoid any mutation of the manifest.
let (manifest, static_data_sources, dynamic_data_sources) = {
let (manifest, dynamic_data_sources) = {
let dynamic_data_sources =
load_dynamic_data_sources(store.clone(), logger.clone(), &manifest)
.await
.context("Failed to load dynamic data sources")?;

let static_data_sources = manifest.data_sources.clone();

(manifest, static_data_sources, dynamic_data_sources)
(manifest, dynamic_data_sources)
};

let mut data_sources = static_data_sources.clone();
let mut data_sources = manifest.data_sources.clone();
data_sources.extend(dynamic_data_sources);

info!(logger, "Data source count at start: {}", data_sources.len());
Expand Down Expand Up @@ -384,7 +382,6 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
let instance = super::context::instance::SubgraphInstance::from_manifest(
&logger,
manifest,
static_data_sources,
data_sources,
host_builder,
host_metrics.clone(),
Expand Down
27 changes: 4 additions & 23 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,8 @@ where
.iter()
.filter_map(|ds| ds.as_onchain())
// Filter out data sources that have reached their end block.
.filter(|ds| match ds.end_block() {
Some(end_block) => {
current_ptr.as_ref().map(|ptr| ptr.number).unwrap_or(0) > end_block
}
.filter(|ds| match current_ptr.as_ref() {
Some(block) => !ds.has_expired(block.number),
None => true,
}),
);
Expand All @@ -187,6 +185,7 @@ where
} else {
C::TriggerFilter::from_data_sources(
self.ctx
.instance()
.hosts()
.iter()
.filter_map(|h| h.data_source().as_onchain()),
Expand Down Expand Up @@ -506,24 +505,6 @@ where
);
}

// EBTODO: Since endBlock reached datasources are ignored in match_and_decode. This might actually not be needed.
let end_block_reached_datasources = self.ctx.hosts().iter().filter_map(|host| {
if let Some(ds) = host.data_source().as_onchain() {
if ds.end_block() == Some(block_ptr.number) {
let mut stored_dynamic_data_source = ds.as_stored_dynamic_data_source();
stored_dynamic_data_source.done_at = Some(block_ptr.number);
Some(stored_dynamic_data_source)
} else {
None
}
} else {
None
}
});

let processed_datasources = end_block_reached_datasources
.chain(processed_offchain_data_sources.into_iter())
.collect::<Vec<_>>();
// Transact entity operations into the store and update the
// subgraph's block stream pointer
let _section = self.metrics.host.stopwatch.start_section("transact_block");
Expand Down Expand Up @@ -560,7 +541,7 @@ where
&self.metrics.host.stopwatch,
persisted_data_sources,
deterministic_errors,
processed_datasources,
processed_offchain_data_sources,
is_non_fatal_errors_active,
)
.await
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<C: Blockchain> DataSource<C> for MockDataSource {
todo!()
}

fn end_block(&self) -> Option<BlockNumber> {
fn has_expired(&self, _: BlockNumber) -> bool {
todo!()
}

Expand Down
6 changes: 5 additions & 1 deletion graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,11 @@ pub trait DataSource<C: Blockchain>: 'static + Sized + Send + Sync + Clone {

fn address(&self) -> Option<&[u8]>;
fn start_block(&self) -> BlockNumber;
fn end_block(&self) -> Option<BlockNumber>;

/// If the data source has an `endBlock`, check whether the trigger block is
/// within the range of blocks that the data source is supposed to handle.
/// Otherwise, ignore the trigger.
fn has_expired(&self, block: BlockNumber) -> bool;
fn name(&self) -> &str;
fn kind(&self) -> &str;
fn network(&self) -> Option<&str>;
Expand Down
17 changes: 4 additions & 13 deletions graph/src/data_source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,19 +158,10 @@ impl<C: Blockchain> DataSource<C> {
logger: &Logger,
) -> Result<Option<TriggerWithHandler<MappingTrigger<C>>>, Error> {
match (self, trigger) {
(Self::Onchain(ds), TriggerData::Onchain(trigger)) => {
// If the data source has an `endBlock` field, check that the trigger block is
// within the range of blocks that the data source is supposed to handle.
// Otherwise, ignore the trigger.
if let Some(end_block) = ds.end_block() {
if end_block < block.number() {
return Ok(None);
}
}

ds.match_and_decode(trigger, block, logger)
.map(|t| t.map(|t| t.map(MappingTrigger::Onchain)))
}
(Self::Onchain(ds), _) if ds.has_expired(block.number()) => Ok(None),
(Self::Onchain(ds), TriggerData::Onchain(trigger)) => ds
.match_and_decode(trigger, block, logger)
.map(|t| t.map(|t| t.map(MappingTrigger::Onchain))),
(Self::Offchain(ds), TriggerData::Offchain(trigger)) => {
Ok(ds.match_and_decode(trigger))
}
Expand Down
6 changes: 1 addition & 5 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1114,11 +1114,7 @@ impl DeploymentStore {

dynds::insert(&conn, &site, &batch.data_sources, manifest_idx_and_name)?;

dynds::update_processed_datasources_status(
&conn,
&site,
&batch.offchain_to_remove,
)?;
dynds::update_offchain_status(&conn, &site, &batch.offchain_to_remove)?;

if !batch.deterministic_errors.is_empty() {
deployment::insert_subgraph_errors(
Expand Down
8 changes: 4 additions & 4 deletions store/postgres/src/dynds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub(crate) fn revert(
}
}

pub(crate) fn update_processed_datasources_status(
pub(crate) fn update_offchain_status(
conn: &PgConnection,
site: &Site,
data_sources: &write::DataSources,
Expand All @@ -57,9 +57,9 @@ pub(crate) fn update_processed_datasources_status(
}

match site.schema_version.private_data_sources() {
true => DataSourcesTable::new(site.namespace.clone())
.update_processed_datasources_status(conn, data_sources),
// EBTODO: understand this error, see if its applicable for non-offchain data sources that reached endBlock
true => {
DataSourcesTable::new(site.namespace.clone()).update_offchain_status(conn, data_sources)
}
false => Err(constraint_violation!(
"shared schema does not support data source offchain_found",
)),
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/dynds/private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl DataSourcesTable {

// Remove offchain data sources by checking the causality region, which currently uniquely
// identifies an offchain data source.
pub(super) fn update_processed_datasources_status(
pub(super) fn update_offchain_status(
&self,
conn: &PgConnection,
data_sources: &write::DataSources,
Expand All @@ -309,7 +309,6 @@ impl DataSourcesTable {
.bind::<Integer, _>(ds.causality_region)
.execute(conn)?;

// EBTODO: understand this error, see if how it needs to be modified to account for non-offchain data sources that reached endBlock
if count > 1 {
return Err(constraint_violation!(
"expected to remove at most one offchain data source but would remove {}, causality region: {}",
Expand Down
1 change: 0 additions & 1 deletion store/test-store/tests/chain/ethereum/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ specVersion: 0.0.2
let end_block = data_source.as_onchain().unwrap().end_block;

assert_eq!(Some(9562481), end_block);
assert_eq!("Qmmanifest", manifest.id.as_str());
}

#[tokio::test]
Expand Down

0 comments on commit d960f49

Please sign in to comment.