Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subgraph composition spec version #5782

Open
wants to merge 1 commit into
base: zoran/subgraph-composition-rework-vid-wrap2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,25 @@ impl EntityCache {
};

// Always test the cache consistency in debug mode. The test only
// makes sense when we were actually asked to read from the store
// makes sense when we were actually asked to read from the store.
// We need to remove the VID as the one from the DB might come from
// a legacy subgraph that has VID autoincremented while this trait
// always creates it in a new style.
debug_assert!(match scope {
GetScope::Store => entity == self.store.get(key).unwrap().map(Arc::new),
GetScope::Store => {
// Release build will never call this function and hence it's OK
// when that implementation is not correct.
fn remove_vid(entity: Option<Arc<Entity>>) -> Option<Entity> {
entity.map(|e| {
#[allow(unused_mut)]
let mut entity = (*e).clone();
#[cfg(debug_assertions)]
entity.remove("vid");
entity
})
}
remove_vid(entity.clone()) == remove_vid(self.store.get(key).unwrap().map(Arc::new))
}
GetScope::InBlock => true,
});

Expand Down
8 changes: 8 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ impl EntityType {
pub fn is_object_type(&self) -> bool {
self.schema.is_object_type(self.atom)
}

// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub fn strict_vid_order(&self) -> bool {
// Currently the agregations entities don't have VIDs in insertion order
self.schema.strict_vid_order() && self.is_object_type()
}
}

impl fmt::Display for EntityType {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data::graphql::{DirectiveExt, DocumentExt, ObjectTypeExt, TypeExt, Va
use crate::data::store::{
self, EntityValidationError, IdType, IntoEntityIterator, TryIntoEntityIterator, ValueType, ID,
};
use crate::data::subgraph::SPEC_VERSION_1_3_0;
use crate::data::value::Word;
use crate::derive::CheapClone;
use crate::prelude::q::Value;
Expand Down Expand Up @@ -955,6 +956,7 @@ pub struct Inner {
pool: Arc<AtomPool>,
/// A list of all timeseries types by interval
agg_mappings: Box<[AggregationMapping]>,
spec_version: Version,
}

impl InputSchema {
Expand Down Expand Up @@ -1042,6 +1044,7 @@ impl InputSchema {
enum_map,
pool,
agg_mappings,
spec_version: spec_version.clone(),
}),
})
}
Expand Down Expand Up @@ -1585,6 +1588,10 @@ impl InputSchema {
}?;
Some(EntityType::new(self.cheap_clone(), obj_type.name))
}

pub fn strict_vid_order(&self) -> bool {
self.inner.spec_version >= SPEC_VERSION_1_3_0
}
}

/// Create a new pool that contains the names of all the types defined
Expand Down
11 changes: 7 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use diesel::{
sql_query,
sql_types::{Nullable, Text},
};
use graph::semver::Version;
use graph::{
blockchain::block_stream::FirehoseCursor,
data::subgraph::schema::SubgraphError,
Expand Down Expand Up @@ -305,11 +306,13 @@ pub fn debug_fork(

pub fn schema(conn: &mut PgConnection, site: &Site) -> Result<(InputSchema, bool), StoreError> {
use subgraph_manifest as sm;
let (s, use_bytea_prefix) = sm::table
.select((sm::schema, sm::use_bytea_prefix))
let (s, spec_ver, use_bytea_prefix) = sm::table
.select((sm::schema, sm::spec_version, sm::use_bytea_prefix))
.filter(sm::id.eq(site.id))
.first::<(String, bool)>(conn)?;
InputSchema::parse_latest(s.as_str(), site.deployment.clone())
.first::<(String, String, bool)>(conn)?;
let spec_version =
Version::parse(spec_ver.as_str()).map_err(|err| StoreError::Unknown(err.into()))?;
InputSchema::parse(&spec_version, s.as_str(), site.deployment.clone())
.map_err(StoreError::Unknown)
.map(|schema| (schema, use_bytea_prefix))
}
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ impl Table {
Ok(cols)
}

// Currently the agregations entities don't have VIDs in insertion order
let vid_type = if self.object.is_object_type() {
let vid_type = if self.object.strict_vid_order() {
"bigint"
} else {
"bigserial"
Expand Down
7 changes: 3 additions & 4 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ impl TablePair {

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.is_object_type();
let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -253,9 +252,9 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
// Make sure the vid sequence continues from where it was in case
// that we use autoincrementing order of the DB
if !self.src.object.strict_vid_order() {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
Expand Down
12 changes: 6 additions & 6 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2377,7 +2377,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let new_vid_form = self.table.object.is_object_type();
let strict_vid_order = self.table.object.strict_vid_order();

// Construct a query
// insert into schema.table(column, ...)
Expand All @@ -2404,7 +2404,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}
out.push_sql(") values\n");
Expand All @@ -2424,7 +2424,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
if new_vid_form {
if strict_vid_order {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
Expand Down Expand Up @@ -4827,7 +4827,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.is_object_type();
let strict_vid_order = self.src.object.strict_vid_order();

// Construct a query
// insert into {dst}({columns})
Expand All @@ -4849,7 +4849,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}

Expand Down Expand Up @@ -4917,7 +4917,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}

Expand Down
4 changes: 2 additions & 2 deletions store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub async fn create_subgraph(

let manifest = SubgraphManifest::<graph::blockchain::mock::MockBlockchain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: BTreeSet::new(),
description: Some(format!("manifest for {}", subgraph_id)),
repository: Some(format!("repo for {}", subgraph_id)),
Expand Down Expand Up @@ -227,7 +227,7 @@ pub async fn create_test_subgraph_with_features(

let manifest = SubgraphManifest::<graph::blockchain::mock::MockBlockchain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features,
description: Some(format!("manifest for {}", subgraph_id)),
repository: Some(format!("repo for {}", subgraph_id)),
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: LOAD_RELATED_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/graft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down Expand Up @@ -1270,7 +1270,7 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
.expect("Failed to parse user schema");
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn create_subgraph() {

let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id,
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down Expand Up @@ -547,7 +547,7 @@ fn subgraph_features() {
} = get_subgraph_features(id.to_string()).unwrap();

assert_eq!(NAME, subgraph_id.as_str());
assert_eq!("1.0.0", spec_version);
assert_eq!("1.3.0", spec_version);
assert_eq!("1.0.0", api_version.unwrap());
assert_eq!(NETWORK_NAME, network);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lazy_static! {
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down