Skip to content

Commit

Permalink
remove VID wrapping
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Jan 16, 2025
1 parent 6da9287 commit 70a0f32
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 167 deletions.
29 changes: 12 additions & 17 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::sync::Arc;
use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityV, EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::ENV_VARS;
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
Expand All @@ -33,8 +33,8 @@ pub enum GetScope {
#[derive(Debug, Clone)]
enum EntityOp {
Remove,
Update(EntityV),
Overwrite(EntityV),
Update(Entity),
Overwrite(Entity),
}

impl EntityOp {
Expand All @@ -45,7 +45,7 @@ impl EntityOp {
use EntityOp::*;
match (self, entity) {
(Remove, _) => Ok(None),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new.e)),
(Overwrite(new), _) | (Update(new), None) => Ok(Some(new)),
(Update(updates), Some(entity)) => {
let mut e = entity.borrow().clone();
e.merge_remove_null_fields(updates)?;
Expand All @@ -69,7 +69,7 @@ impl EntityOp {
match self {
// This is how `Overwrite` is constructed, by accumulating `Update` onto `Remove`.
Remove => *self = Overwrite(update),
Update(current) | Overwrite(current) => current.e.merge(update.e),
Update(current) | Overwrite(current) => current.merge(update),
}
}
}
Expand Down Expand Up @@ -288,9 +288,9 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, &entity.e) =>
if query.matches(key, &entity) =>
{
Ok(Some(entity.e.clone()))
Ok(Some(entity.clone()))
}
EntityOp::Remove => Ok(None),
_ => Ok(None),
Expand Down Expand Up @@ -371,7 +371,8 @@ impl EntityCache {
// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let entity = EntityV::new(entity, vid);
let mut entity = entity;
let _ = entity.set_vid(vid).expect("the vid should be set");

self.entity_op(key.clone(), EntityOp::Update(entity));

Expand Down Expand Up @@ -478,22 +479,19 @@ impl EntityCache {
// Entity was created
(None, EntityOp::Update(mut updates))
| (None, EntityOp::Overwrite(mut updates)) => {
let vid = updates.vid;
updates.e.remove_null_fields();
let data = Arc::new(updates.e.clone());
updates.remove_null_fields();
let data = Arc::new(updates);
self.current.insert(key.clone(), Some(data.cheap_clone()));
Some(Insert {
key,
data,
block,
end: None,
vid,
})
}
// Entity may have been changed
(Some(current), EntityOp::Update(updates)) => {
let mut data = current.as_ref().clone();
let vid = updates.vid;
data.merge_remove_null_fields(updates)
.map_err(|e| key.unknown_attribute(e))?;
let data = Arc::new(data);
Expand All @@ -504,24 +502,21 @@ impl EntityCache {
data,
block,
end: None,
vid,
})
} else {
None
}
}
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let vid = data.vid;
let data = Arc::new(data.e.clone());
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
data,
block,
end: None,
vid,
})
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions graph/src/components/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::constraint_violation;
use crate::data::store::scalar::Bytes;
use crate::data::store::{EntityV, Id, IdList, Value};
use crate::data::store::{Id, IdList, Value};
use crate::data::value::Word;
use crate::data_source::CausalityRegion;
use crate::derive::CheapClone;
Expand Down Expand Up @@ -829,7 +829,7 @@ where
pub enum EntityOperation {
/// Locates the entity specified by `key` and sets its attributes according to the contents of
/// `data`. If no entity exists with this key, creates a new entity.
Set { key: EntityKey, data: EntityV },
Set { key: EntityKey, data: Entity },

/// Removes an entity with the specified key, if one exists.
Remove { key: EntityKey },
Expand Down
29 changes: 7 additions & 22 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,13 @@ pub enum EntityModification {
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Update the entity by overwriting it
Overwrite {
key: EntityKey,
data: Arc<Entity>,
block: BlockNumber,
end: Option<BlockNumber>,
vid: i64,
},
/// Remove the entity
Remove { key: EntityKey, block: BlockNumber },
Expand All @@ -69,7 +67,6 @@ pub struct EntityWrite<'a> {
// The end of the block range for which this write is valid. The value
// of `end` itself is not included in the range
pub end: Option<BlockNumber>,
pub vid: i64,
}

impl std::fmt::Display for EntityWrite<'_> {
Expand All @@ -92,28 +89,24 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> {
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),
EntityModification::Overwrite {
key,
data,
block,
end,
vid,
} => Ok(EntityWrite {
id: &key.entity_id,
entity: &data,
causality_region: key.causality_region,
block: *block,
end: *end,
vid: *vid,
}),

EntityModification::Remove { .. } => Err(()),
Expand Down Expand Up @@ -220,13 +213,11 @@ impl EntityModification {
data,
block,
end,
vid,
} => Ok(Insert {
key,
data,
block,
end,
vid,
}),
Remove { key, .. } => {
return Err(constraint_violation!(
Expand Down Expand Up @@ -280,23 +271,21 @@ impl EntityModification {
}

impl EntityModification {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
EntityModification::Insert {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self {
pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self {
EntityModification::Overwrite {
key,
data: Arc::new(data),
block,
end: None,
vid,
}
}

Expand Down Expand Up @@ -1028,36 +1017,32 @@ mod test {

let value = value.clone();
let key = THING_TYPE.parse_key("one").unwrap();
let vid = 0;
let vid = 0i64;
match value {
Ins(block) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }),
block,
end: None,
vid,
},
Ovw(block) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }),
block,
end: None,
vid,
},
Rem(block) => EntityModification::Remove { key, block },
InsC(block, end) => EntityModification::Insert {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }),
block,
end: Some(end),
vid,
},
OvwC(block, end) => EntityModification::Overwrite {
key,
data: Arc::new(entity! { SCHEMA => id: "one", count: block }),
data: Arc::new(entity! { SCHEMA => id: "one", count: block, vid: vid }),
block,
end: Some(end),
vid,
},
}
}
Expand Down
29 changes: 14 additions & 15 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,10 @@ impl Entity {
.expect("the vid is set to a valid value")
}

pub fn set_vid(&mut self, value: i64) -> Result<Option<Value>, InternError> {
self.0.insert("vid", value.into())
}

/// This version of the function returns 0 if the VID is not set. It should be
/// used only in the testing code for more lenient definition of entities.
#[cfg(debug_assertions)]
Expand All @@ -932,6 +936,14 @@ impl Entity {
.expect("the vid is set to a valid value")
}

#[cfg(debug_assertions)]
pub fn set_vid_if_empty(&mut self) {
let vid = self.get("vid");
if vid.is_none() {
let _ = self.set_vid(100).expect("the vid should be set");
}
}

/// Merges an entity update `update` into this entity.
///
/// If a key exists in both entities, the value from `update` is chosen.
Expand All @@ -946,8 +958,8 @@ impl Entity {
/// If a key exists in both entities, the value from `update` is chosen.
/// If a key only exists on one entity, the value from that entity is chosen.
/// If a key is set to `Value::Null` in `update`, the key/value pair is removed.
pub fn merge_remove_null_fields(&mut self, update: EntityV) -> Result<(), InternError> {
for (key, value) in update.e.0.into_iter() {
pub fn merge_remove_null_fields(&mut self, update: Entity) -> Result<(), InternError> {
for (key, value) in update.into_iter() {
match value {
Value::Null => self.0.remove(&key),
_ => self.0.insert(&key, value)?,
Expand Down Expand Up @@ -1098,19 +1110,6 @@ impl std::fmt::Debug for Entity {
}
}

/// An entity wrapper that has VID too.
#[derive(Debug, Clone, CacheWeight, PartialEq, Eq, Serialize)]
pub struct EntityV {
pub e: Entity,
pub vid: i64,
}

impl EntityV {
pub fn new(e: Entity, vid: i64) -> Self {
Self { e, vid }
}
}

/// An object that is returned from a query. It's a an `r::Value` which
/// carries the attributes of the object (`__typename`, `id` etc.) and
/// possibly a pointer to its parent if the query that constructed it is one
Expand Down
4 changes: 2 additions & 2 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,11 @@ fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
}
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA};
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid };
let key = THING_TYPE.parse_key(id).unwrap();
(
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
EntityModification::insert(key, data, 0, vid),
EntityModification::insert(key, data, 0),
)
}

Expand Down
2 changes: 1 addition & 1 deletion server/index-node/src/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ fn entity_changes_to_graphql(entity_changes: Vec<EntityOperation>) -> r::Value {
.push(key.entity_id);
}
EntityOperation::Set { key, data } => {
updates.entry(key.entity_type).or_default().push(data.e);
updates.entry(key.entity_type).or_default().push(data);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::{
},
};
use graph::components::store::DerivedEntityQuery;
use graph::data::store::{EntityV, Id, IdList, IdType, BYTES_SCALAR};
use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR};
use graph::data::subgraph::schema::POI_TABLE;
use graph::prelude::{
anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityOperation, Logger,
Expand Down Expand Up @@ -697,10 +697,9 @@ impl Layout {
let entity_id = data.id();
processed_entities.insert((entity_type.clone(), entity_id.clone()));

let vid = data.vid();
changes.push(EntityOperation::Set {
key: entity_type.key_in(entity_id, CausalityRegion::from_entity(&data)),
data: EntityV::new(data, vid),
data,
});
}

Expand Down
12 changes: 10 additions & 2 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,14 @@ impl EntityData {
// table column; those will be things like the
// block_range that `select *` pulls in but that we
// don't care about here
if let Some(column) = table.column(&SqlName::verbatim(key)) {
if key == "vid" {
// TODO: optimize
match T::Value::from_column_value(&ColumnType::Int8, json) {
Ok(value) if value.is_null() => None,
Ok(value) => Some(Ok((Word::from("vid".to_string()), value))),
Err(e) => Some(Err(e)),
}
} else if let Some(column) = table.column(&SqlName::verbatim(key)) {
match T::Value::from_column_value(&column.column_type, json) {
Ok(value) if value.is_null() => None,
Ok(value) => Some(Ok((Word::from(column.field.to_string()), value))),
Expand Down Expand Up @@ -2450,7 +2457,8 @@ impl<'a> InsertRow<'a> {
}
let br_value = BlockRangeValue::new(table, row.block, row.end);
let causality_region = row.causality_region;
let vid = row.vid;
// println!("ROW: {:?}", row.entity);
let vid = row.entity.vid();
Ok(Self {
values,
br_value,
Expand Down
Loading

0 comments on commit 70a0f32

Please sign in to comment.