Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add causality region column and implement isolation rules #4162

Merged
merged 13 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use anyhow::Error;
use graph::{
blockchain::{self, block_stream::BlockWithTriggers, BlockPtr, EmptyNodeCapabilities},
components::{
store::{DeploymentLocator, EntityKey, SubgraphFork},
store::{DeploymentLocator, EntityKey, EntityType, SubgraphFork},
subgraph::{MappingError, ProofOfIndexingEvent, SharedProofOfIndexing},
},
data::store::scalar::Bytes,
data_source,
data_source::{self, CausalityRegion},
prelude::{
anyhow, async_trait, BigDecimal, BigInt, BlockHash, BlockNumber, BlockState, Entity,
RuntimeHostBuilder, Value,
Expand Down Expand Up @@ -183,7 +183,11 @@ where
Operation::Create | Operation::Update => {
let entity_type: &str = &entity_change.entity;
let entity_id: String = entity_change.id.clone();
let key = EntityKey::data(entity_type.to_string(), entity_id.clone());
let key = EntityKey {
entity_type: EntityType::new(entity_type.to_string()),
entity_id: entity_id.clone().into(),
causality_region: CausalityRegion::ONCHAIN, // Substreams don't currently support offchain data
};
let mut data: HashMap<String, Value> = HashMap::from_iter(vec![]);

for field in entity_change.fields.iter() {
Expand Down Expand Up @@ -214,7 +218,11 @@ where
Operation::Delete => {
let entity_type: &str = &entity_change.entity;
let entity_id: String = entity_change.id.clone();
let key = EntityKey::data(entity_type.to_string(), entity_id.clone());
let key = EntityKey {
entity_type: EntityType::new(entity_type.to_string()),
entity_id: entity_id.clone().into(),
causality_region: CausalityRegion::ONCHAIN, // Substreams don't currently support offchain data
};

state.entity_cache.remove(key);

Expand Down
4 changes: 1 addition & 3 deletions core/src/subgraph/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ impl<I: SubgraphInstanceManager> SubgraphAssignmentProviderTrait for SubgraphAss
{
// Shut down subgraph processing
self.instance_manager.stop_subgraph(deployment).await;
Ok(())
} else {
Err(SubgraphAssignmentProviderError::NotRunning(deployment))
}
Ok(())
}
}
25 changes: 21 additions & 4 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ where
// - The event stream sees a Remove event for subgraph B, but the table query finds that
// subgraph B has already been removed.
// The `handle_assignment_events` function handles these cases by ignoring AlreadyRunning
// (on subgraph start) or NotRunning (on subgraph stop) error types, which makes the
// operations idempotent.
// (on subgraph start) which makes the operations idempotent. Subgraph stop is already idempotent.

// Start event stream
let assignment_event_stream = self.assignment_events();
Expand Down Expand Up @@ -455,7 +454,6 @@ async fn handle_assignment_event(
node_id: _,
} => match provider.stop(deployment).await {
Ok(()) => Ok(()),
Err(SubgraphAssignmentProviderError::NotRunning(_)) => Ok(()),
Err(e) => Err(CancelableError::Error(e)),
},
}
Expand Down Expand Up @@ -620,11 +618,30 @@ async fn create_subgraph_version<C: Blockchain, S: SubgraphStore>(
"block" => format!("{:?}", base_block.as_ref().map(|(_,ptr)| ptr.number))
);

// Entity types that may be touched by offchain data sources need a causality region column.
let needs_causality_region = manifest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could benefit from some test coverage, just making sure that given some manifests, the DeploymentCreate ends up with the right entities

.data_sources
.iter()
.filter_map(|ds| ds.as_offchain())
.map(|ds| ds.mapping.entities.iter())
.chain(
manifest
.templates
.iter()
.filter_map(|ds| ds.as_offchain())
.map(|ds| ds.mapping.entities.iter()),
)
.flatten()
.cloned()
.collect();

// Apply the subgraph versioning and deployment operations,
// creating a new subgraph deployment if one doesn't exist.
let deployment = DeploymentCreate::new(raw_string, &manifest, start_block)
.graft(base_block)
.debug(debug_fork);
.debug(debug_fork)
.entities_with_causality_region(needs_causality_region);

deployment_store
.create_subgraph_deployment(
name,
Expand Down
10 changes: 9 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use graph::data::subgraph::{
SubgraphFeature,
};
use graph::data_source::{
offchain, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
offchain, CausalityRegion, DataSource, DataSourceCreationError, DataSourceTemplate, TriggerData,
};
use graph::env::EnvVars;
use graph::prelude::*;
Expand Down Expand Up @@ -653,6 +653,7 @@ where
let mut block_state = BlockState::<C>::new(EmptyStore::new(schema), LfuCache::new());

// PoI ignores offchain events.
// See also: poi-ignores-offchain
let proof_of_indexing = None;
let causality_region = "";

Expand Down Expand Up @@ -998,7 +999,14 @@ async fn update_proof_of_indexing(
// Create the special POI entity key specific to this causality_region
let entity_key = EntityKey {
entity_type: POI_OBJECT.to_owned(),

// There are two things called causality regions here, one is the causality region for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is really helpful 👍

// the poi which is a string and the PoI entity id. The other is the data source
// causality region to which the PoI belongs as an entity. Currently offchain events do
// not affect PoI so it is assumed to be `ONCHAIN`.
// See also: poi-ignores-offchain
entity_id: causality_region.into(),
causality_region: CausalityRegion::ONCHAIN,
};

// Grab the current digest attribute on this entity
Expand Down
28 changes: 8 additions & 20 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use anyhow::anyhow;
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::fmt::{self, Debug};
use std::sync::Arc;

use crate::components::store::{
self as s, Entity, EntityKey, EntityOp, EntityOperation, EntityType,
};
use crate::components::store::{self as s, Entity, EntityKey, EntityOp, EntityOperation};
use crate::prelude::{Schema, ENV_VARS};
use crate::util::lfu_cache::LfuCache;

Expand Down Expand Up @@ -102,6 +100,10 @@ impl EntityCache {
// Get the current entity, apply any updates from `updates`, then
// from `handler_updates`.
let mut entity = self.current.get_entity(&*self.store, eref)?;

// Always test the cache consistency in debug mode.
debug_assert!(entity == self.store.get(&eref).unwrap());

if let Some(op) = self.updates.get(eref).cloned() {
entity = op.apply_to(entity)
}
Expand Down Expand Up @@ -233,22 +235,8 @@ impl EntityCache {
// violation in the database, ensuring correctness
let missing = missing.filter(|key| !self.schema.is_immutable(&key.entity_type));

let mut missing_by_type: BTreeMap<&EntityType, Vec<&str>> = BTreeMap::new();
for key in missing {
missing_by_type
.entry(&key.entity_type)
.or_default()
.push(&key.entity_id);
}

for (entity_type, entities) in self.store.get_many(missing_by_type)? {
for entity in entities {
let key = EntityKey {
entity_type: entity_type.clone(),
entity_id: entity.id().unwrap().into(),
};
self.current.insert(key, Some(entity));
}
for (entity_key, entity) in self.store.get_many(missing.cloned().collect())? {
self.current.insert(entity_key, Some(entity));
}

let mut mods = Vec::new();
Expand Down
47 changes: 39 additions & 8 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod traits;

pub use entity_cache::{EntityCache, ModificationsAndCache};

use diesel::types::{FromSql, ToSql};
pub use err::StoreError;
use itertools::Itertools;
pub use traits::*;
Expand All @@ -12,13 +13,14 @@ use futures::stream::poll_fn;
use futures::{Async, Poll, Stream};
use graphql_parser::schema as s;
use serde::{Deserialize, Serialize};
use std::borrow::Borrow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::fmt;
use std::fmt::Display;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{fmt, io};

use crate::blockchain::Block;
use crate::data::store::scalar::Bytes;
Expand Down Expand Up @@ -71,6 +73,12 @@ impl<'a> From<&s::InterfaceType<'a, String>> for EntityType {
}
}

impl Borrow<str> for EntityType {
fn borrow(&self) -> &str {
&self.0
}
}

// This conversion should only be used in tests since it makes it too
// easy to convert random strings into entity types
#[cfg(debug_assertions)]
Expand All @@ -82,6 +90,22 @@ impl From<&str> for EntityType {

impl CheapClone for EntityType {}

impl FromSql<diesel::sql_types::Text, diesel::pg::Pg> for EntityType {
fn from_sql(bytes: Option<&[u8]>) -> diesel::deserialize::Result<Self> {
let s = <String as FromSql<_, diesel::pg::Pg>>::from_sql(bytes)?;
Ok(EntityType::new(s))
}
}

impl ToSql<diesel::sql_types::Text, diesel::pg::Pg> for EntityType {
fn to_sql<W: io::Write>(
&self,
out: &mut diesel::serialize::Output<W, diesel::pg::Pg>,
) -> diesel::serialize::Result {
<str as ToSql<diesel::sql_types::Text, diesel::pg::Pg>>::to_sql(self.0.as_str(), out)
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntityFilterDerivative(bool);

Expand All @@ -104,13 +128,23 @@ pub struct EntityKey {

/// ID of the individual entity.
pub entity_id: Word,

/// This is the causality region of the data source that created the entity.
///
/// In the case of an entity lookup, this is the causality region of the data source that is
/// doing the lookup. So if the entity exists but was created on a different causality region,
/// the lookup will return empty.
pub causality_region: CausalityRegion,
mangas marked this conversation as resolved.
Show resolved Hide resolved
}

impl EntityKey {
pub fn data(entity_type: String, entity_id: String) -> Self {
// For use in tests only
#[cfg(debug_assertions)]
pub fn data(entity_type: impl Into<String>, entity_id: impl Into<String>) -> Self {
Self {
entity_type: EntityType::new(entity_type),
entity_id: entity_id.into(),
entity_type: EntityType::new(entity_type.into()),
entity_id: entity_id.into().into(),
causality_region: CausalityRegion::ONCHAIN,
}
}
}
Expand Down Expand Up @@ -1071,10 +1105,7 @@ impl ReadStore for EmptyStore {
Ok(None)
}

fn get_many(
&self,
_ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
fn get_many(&self, _: BTreeSet<EntityKey>) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
Ok(BTreeMap::new())
}

Expand Down
13 changes: 6 additions & 7 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,11 @@ pub trait ReadStore: Send + Sync + 'static {
/// Looks up an entity using the given store key at the latest block.
fn get(&self, key: &EntityKey) -> Result<Option<Entity>, StoreError>;

/// Look up multiple entities as of the latest block. Returns a map of
/// entities by type.
/// Look up multiple entities as of the latest block.
fn get_many(
&self,
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError>;
keys: BTreeSet<EntityKey>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError>;

fn input_schema(&self) -> Arc<Schema>;
}
Expand All @@ -189,9 +188,9 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {

fn get_many(
&self,
ids_for_type: BTreeMap<&EntityType, Vec<&str>>,
) -> Result<BTreeMap<EntityType, Vec<Entity>>, StoreError> {
(**self).get_many(ids_for_type)
keys: BTreeSet<EntityKey>,
) -> Result<BTreeMap<EntityKey, Entity>, StoreError> {
(**self).get_many(keys)
}

fn input_schema(&self) -> Arc<Schema> {
Expand Down
4 changes: 2 additions & 2 deletions graph/src/data/graphql/object_or_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl<'a> ObjectOrInterface<'a> {
ObjectOrInterface::Object(object) => Some(vec![object]),
ObjectOrInterface::Interface(interface) => schema
.types_for_interface()
.get(&interface.into())
.get(interface.name.as_str())
.map(|object_types| object_types.iter().collect()),
}
}
Expand All @@ -131,7 +131,7 @@ impl<'a> ObjectOrInterface<'a> {
) -> bool {
match self {
ObjectOrInterface::Object(o) => o.name == typename,
ObjectOrInterface::Interface(i) => types_for_interface[&i.into()]
ObjectOrInterface::Interface(i) => types_for_interface[i.name.as_str()]
.iter()
.any(|o| o.name == typename),
}
Expand Down
4 changes: 1 addition & 3 deletions graph/src/data/subgraph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
blockchain::{BlockPtr, Blockchain, DataSource as _},
components::{
link_resolver::LinkResolver,
store::{DeploymentLocator, StoreError, SubgraphStore},
store::{StoreError, SubgraphStore},
},
data::{
graphql::TryFromValue,
Expand Down Expand Up @@ -304,8 +304,6 @@ pub enum SubgraphAssignmentProviderError {
/// Occurs when attempting to remove a subgraph that's not hosted.
#[error("Subgraph with ID {0} already running")]
AlreadyRunning(DeploymentHash),
#[error("Subgraph with ID {0} is not running")]
NotRunning(DeploymentLocator),
#[error("Subgraph provider error: {0}")]
Unknown(#[from] anyhow::Error),
}
Expand Down
Loading