Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement moving of subgraphs #4374

Merged
merged 4 commits into from
Feb 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/implementation/metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ shard alongside the deployment's data in `sgdNNN`.
| `use_bytea_prefix` | `boolean!` | |
| `start_block_hash` | `bytea` | Parent of the smallest start block from the manifest |
| `start_block_number` | `int4` | |
| `on_sync` | `text` | Additional behavior when deployment becomes synced |

### `subgraph_deployment_assignment`

Expand Down
13 changes: 12 additions & 1 deletion node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ pub enum CopyCommand {
/// How far behind `src` subgraph head to copy
#[clap(long, short, default_value = "200")]
offset: u32,
/// Activate this copy once it has synced
#[clap(long, short, conflicts_with = "replace")]
activate: bool,
/// Replace the source with this copy once it has synced
#[clap(long, short, conflicts_with = "activate")]
replace: bool,
/// The source deployment (see `help info`)
src: DeploymentSearch,
/// The name of the database shard into which to copy
Expand Down Expand Up @@ -1093,10 +1099,15 @@ async fn main() -> anyhow::Result<()> {
shard,
node,
offset,
activate,
replace,
} => {
let shards: Vec<_> = ctx.config.stores.keys().cloned().collect();
let (store, primary) = ctx.store_and_primary();
commands::copy::create(store, primary, src, shard, shards, node, offset).await
commands::copy::create(
store, primary, src, shard, shards, node, offset, activate, replace,
)
.await
}
Activate { deployment, shard } => {
commands::copy::activate(ctx.subgraph_store(), deployment, shard)
Expand Down
32 changes: 24 additions & 8 deletions node/src/manager/commands/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use diesel::{ExpressionMethods, JoinOnDsl, OptionalExtension, QueryDsl, RunQuery
use std::{collections::HashMap, sync::Arc, time::SystemTime};

use graph::{
components::store::BlockStore as _,
components::store::{BlockStore as _, DeploymentId},
data::query::QueryTarget,
prelude::{
anyhow::{anyhow, bail, Error},
Expand All @@ -11,7 +11,10 @@ use graph::{
},
};
use graph_store_postgres::{
command_support::catalog::{self, copy_state, copy_table_state},
command_support::{
catalog::{self, copy_state, copy_table_state},
on_sync, OnSync,
},
PRIMARY_SHARD,
};
use graph_store_postgres::{connection_pool::ConnectionPool, Shard, Store, SubgraphStore};
Expand Down Expand Up @@ -56,7 +59,7 @@ impl CopyState {
pools: &HashMap<Shard, ConnectionPool>,
shard: &Shard,
dst: i32,
) -> Result<Option<(CopyState, Vec<CopyTableState>)>, Error> {
) -> Result<Option<(CopyState, Vec<CopyTableState>, OnSync)>, Error> {
use copy_state as cs;
use copy_table_state as cts;

Expand All @@ -71,11 +74,13 @@ impl CopyState {
.order_by(cts::entity_type)
.load::<CopyTableState>(&dconn)?;

let on_sync = on_sync(&dconn, DeploymentId(dst))?;

Ok(cs::table
.filter(cs::dst.eq(dst))
.get_result::<CopyState>(&dconn)
.optional()?
.map(|state| (state, tables)))
.map(|state| (state, tables, on_sync)))
}
}

Expand All @@ -87,8 +92,17 @@ pub async fn create(
shards: Vec<String>,
node: String,
block_offset: u32,
activate: bool,
replace: bool,
) -> Result<(), Error> {
let block_offset = block_offset as i32;
let on_sync = match (activate, replace) {
(true, true) => bail!("--activate and --replace can't both be specified"),
(true, false) => OnSync::Activate,
(false, true) => OnSync::Replace,
(false, false) => OnSync::None,
};

let subgraph_store = store.subgraph_store();
let src = src.locate_unique(&primary)?;
let query_store = store
Expand Down Expand Up @@ -134,7 +148,7 @@ pub async fn create(
let shard = Shard::new(shard)?;
let node = NodeId::new(node.clone()).map_err(|()| anyhow!("invalid node id `{}`", node))?;

let dst = subgraph_store.copy_deployment(&src, shard, node, base_ptr)?;
let dst = subgraph_store.copy_deployment(&src, shard, node, base_ptr, on_sync)?;

println!("created deployment {} as copy of {}", dst, src);
Ok(())
Expand Down Expand Up @@ -193,7 +207,7 @@ pub fn list(pools: HashMap<Shard, ConnectionPool>) -> Result<(), Error> {
println!("{:20} | {}", "deployment", deployment_hash);
println!("{:20} | sgd{} -> sgd{} ({})", "action", src, dst, shard);
match CopyState::find(&pools, &shard, dst)? {
Some((state, tables)) => match cancelled_at {
Some((state, tables, _)) => match cancelled_at {
Some(cancel_requested) => match state.cancelled_at {
Some(cancelled_at) => status("cancelled", cancelled_at),
None => status("cancel requested", cancel_requested),
Expand Down Expand Up @@ -263,8 +277,8 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
.map(|(_, cancelled_at)| (true, cancelled_at))
.unwrap_or((false, None));

let (state, tables) = match CopyState::find(&pools, &shard, dst)? {
Some((state, tables)) => (state, tables),
let (state, tables, on_sync) = match CopyState::find(&pools, &shard, dst)? {
Some((state, tables, on_sync)) => (state, tables, on_sync),
None => {
if active {
println!("copying is queued but has not started");
Expand All @@ -290,6 +304,7 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
"src",
"dst",
"target block",
"on sync",
"duration",
"status",
];
Expand All @@ -298,6 +313,7 @@ pub fn status(pools: HashMap<Shard, ConnectionPool>, dst: &DeploymentSearch) ->
state.src.to_string(),
state.dst.to_string(),
state.target_block_number.to_string(),
on_sync.to_str().to_string(),
duration(&state.started_at, &state.finished_at),
progress,
];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table subgraphs.subgraph_manifest
drop column on_sync;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
alter table subgraphs.subgraph_manifest
add column on_sync text
-- use a check constraint instead of an enum because
-- enums are a pain to update
constraint subgraph_manifest_on_sync_ck check (on_sync in ('activate', 'replace'));
11 changes: 11 additions & 0 deletions store/postgres/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,17 @@ impl CopyState {
}
}

pub(crate) fn source(conn: &PgConnection, dst: &Site) -> Result<Option<DeploymentId>, StoreError> {
use copy_state as cs;

cs::table
.filter(cs::dst.eq(dst.id))
.select(cs::src)
.get_result::<DeploymentId>(conn)
.optional()
.map_err(StoreError::from)
}

/// Track the desired size of a batch in such a way that doing the next
/// batch gets close to TARGET_DURATION for the time it takes to copy one
/// batch, but don't step up the size by more than 2x at once
Expand Down
85 changes: 85 additions & 0 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,63 @@ impl From<SubgraphHealth> for graph::data::subgraph::schema::SubgraphHealth {
}
}

/// Additional behavior for a deployment when it becomes synced
#[derive(Clone, Copy, Debug)]
pub enum OnSync {
None,
/// Activate this deployment
Activate,
/// Activate this deployment and unassign any other copies of the same
/// deployment
Replace,
}

impl TryFrom<Option<&str>> for OnSync {
type Error = StoreError;

fn try_from(value: Option<&str>) -> Result<Self, Self::Error> {
match value {
None => Ok(OnSync::None),
Some("activate") => Ok(OnSync::Activate),
Some("replace") => Ok(OnSync::Replace),
_ => Err(constraint_violation!("illegal value for on_sync: {value}")),
}
}
}

impl OnSync {
pub fn activate(&self) -> bool {
match self {
OnSync::None => false,
OnSync::Activate => true,
OnSync::Replace => true,
}
}

pub fn replace(&self) -> bool {
match self {
OnSync::None => false,
OnSync::Activate => false,
OnSync::Replace => true,
}
}

pub fn to_str(&self) -> &str {
match self {
OnSync::None => "none",
OnSync::Activate => "activate",
OnSync::Replace => "replace",
}
}

fn to_sql(&self) -> Option<&str> {
match self {
OnSync::None => None,
OnSync::Activate | OnSync::Replace => Some(self.to_str()),
}
}
}

table! {
subgraphs.subgraph_deployment (id) {
id -> Integer,
Expand Down Expand Up @@ -121,6 +178,7 @@ table! {
// Entity types that have a `causality_region` column.
// Names stored as present in the schema, not in snake case.
entities_with_causality_region -> Array<Text>,
on_sync -> Nullable<Text>,
}
}

Expand Down Expand Up @@ -1083,6 +1141,33 @@ pub fn set_earliest_block(
Ok(())
}

pub fn on_sync(conn: &PgConnection, id: impl Into<DeploymentId>) -> Result<OnSync, StoreError> {
use subgraph_manifest as m;

let s = m::table
.filter(m::id.eq(id.into()))
.select(m::on_sync)
.get_result::<Option<String>>(conn)?;
OnSync::try_from(s.as_deref())
}

pub fn set_on_sync(conn: &PgConnection, site: &Site, on_sync: OnSync) -> Result<(), StoreError> {
use subgraph_manifest as m;

let n = update(m::table.filter(m::id.eq(site.id)))
.set(m::on_sync.eq(on_sync.to_sql()))
.execute(conn)?;

match n {
0 => Err(StoreError::DeploymentNotFound(site.to_string())),
1 => Ok(()),
_ => Err(constraint_violation!(
"multiple manifests for deployment {}",
site.to_string()
)),
}
}

/// Lock the deployment `site` for writes while `f` is running. The lock can
/// cross transactions, and `f` can therefore execute multiple transactions
/// while other write activity for that deployment is locked out. Block the
Expand Down
20 changes: 19 additions & 1 deletion store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ use web3::types::Address;

use crate::block_range::{block_number, BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
use crate::catalog;
use crate::deployment;
use crate::deployment::{self, OnSync};
use crate::detail::ErrorDetail;
use crate::dynds::DataSourcesTable;
use crate::primary::DeploymentId;
use crate::relational::index::{CreateIndex, Method};
use crate::relational::{Layout, LayoutCache, SqlName, Table};
use crate::relational_queries::FromEntityData;
Expand Down Expand Up @@ -169,6 +170,7 @@ impl DeploymentStore {
site: Arc<Site>,
graft_base: Option<Arc<Layout>>,
replace: bool,
on_sync: OnSync,
) -> Result<(), StoreError> {
let conn = self.get_conn()?;
conn.transaction(|| -> Result<_, StoreError> {
Expand Down Expand Up @@ -211,6 +213,9 @@ impl DeploymentStore {
conn.batch_execute(&DataSourcesTable::new(site.namespace.clone()).as_ddl())?;
}
}

deployment::set_on_sync(&conn, &site, on_sync)?;

Ok(())
})
}
Expand Down Expand Up @@ -640,6 +645,19 @@ impl DeploymentStore {
conn.transaction(|| deployment::set_synced(&conn, id))
}

/// Look up the on_sync action for this deployment
pub(crate) fn on_sync(&self, site: &Site) -> Result<OnSync, StoreError> {
let conn = self.get_conn()?;
deployment::on_sync(&conn, site.id)
}

/// Return the source if `site` or `None` if `site` is neither a graft
/// nor a copy
pub(crate) fn source_of_copy(&self, site: &Site) -> Result<Option<DeploymentId>, StoreError> {
let conn = self.get_conn()?;
crate::copy::source(&conn, site)
}

// Only used for tests
#[cfg(debug_assertions)]
pub(crate) fn drop_deployment_schema(
Expand Down
1 change: 1 addition & 0 deletions store/postgres/src/detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ struct StoredSubgraphManifest {
start_block_hash: Option<Bytes>,
raw_yaml: Option<String>,
entities_with_causality_region: Vec<EntityType>,
on_sync: Option<String>,
}

impl From<StoredSubgraphManifest> for SubgraphManifestEntity {
Expand Down
1 change: 1 addition & 0 deletions store/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub mod command_support {
pub mod index {
pub use crate::relational::index::{CreateIndex, Method};
}
pub use crate::deployment::{on_sync, OnSync};
pub use crate::primary::Namespace;
pub use crate::relational::{Catalog, Column, ColumnType, Layout, SqlName};
}
8 changes: 7 additions & 1 deletion store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ pub struct Site {
/// Whether this is the site that should be used for queries. There's
/// exactly one for each `deployment`, i.e., other entries for that
/// deployment have `active = false`
pub(crate) active: bool,
pub active: bool,

pub(crate) schema_version: DeploymentSchemaVersion,
/// Only the store and tests can create Sites
Expand Down Expand Up @@ -370,6 +370,12 @@ impl TryFrom<Schema> for Site {
}
}

impl std::fmt::Display for Site {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}[sgd{}]", self.deployment, self.id)
}
}

impl From<&Site> for DeploymentLocator {
fn from(site: &Site) -> Self {
DeploymentLocator::new(site.id.into(), site.deployment.clone())
Expand Down
Loading