Skip to content

Commit

Permalink
node, store: Add graphman stats set-target
Browse files Browse the repository at this point in the history
  • Loading branch information
lutter committed Oct 22, 2022
1 parent 92d69eb commit 0637ce0
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 15 deletions.
51 changes: 51 additions & 0 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,36 @@ pub enum StatsCommand {
/// The deployment (see `help info`).
deployment: DeploymentSearch,
},
/// Set the statistics targets for the statistics collector
///
/// Set (or reset) the target for a deployment. The statistics target
/// determines how much of a table Postgres will sample when it analyzes
/// a table. This can be particularly beneficial when Postgres chooses
/// suboptimal query plans for some queries. Increasing the target will
/// make analyzing tables take longer and will require more space in
/// Postgres' internal statistics storage.
///
/// If no `columns` are provided, change the statistics target for the
/// `id` and `block_range` columns which will usually be enough to
/// improve query performance, but it might be necessary to increase the
/// target for other columns, too.
SetTarget {
/// The value of the statistics target
#[clap(short, long, default_value = "200", conflicts_with = "reset")]
target: u32,
/// Reset the target so the default is used
#[clap(long, conflicts_with = "target")]
reset: bool,
/// Do not analyze changed tables
#[clap(long)]
no_analyze: bool,
/// The deployment (see `help info`).
deployment: DeploymentSearch,
/// The table for which to set the target, all if omitted
entity: Option<String>,
/// The columns to which to apply the target. Defaults to `id, block_range`
columns: Vec<String>,
},
}

#[derive(Clone, Debug, Subcommand)]
Expand Down Expand Up @@ -1134,6 +1164,27 @@ async fn main() -> anyhow::Result<()> {
let subgraph_store = store.subgraph_store();
commands::stats::target(subgraph_store, primary_pool, &deployment)
}
SetTarget {
target,
reset,
no_analyze,
deployment,
entity,
columns,
} => {
let (store, primary) = ctx.store_and_primary();
let store = store.subgraph_store();
let target = if reset { -1 } else { target as i32 };
commands::stats::set_target(
store,
primary,
&deployment,
entity.as_deref(),
columns,
target,
no_analyze,
)
}
}
}
Index(cmd) => {
Expand Down
34 changes: 34 additions & 0 deletions node/src/manager/commands/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::manager::deployment::DeploymentSearch;
use diesel::r2d2::ConnectionManager;
use diesel::r2d2::PooledConnection;
use diesel::PgConnection;
use graph::components::store::DeploymentLocator;
use graph::components::store::VersionStats;
use graph::prelude::anyhow;
use graph_store_postgres::command_support::catalog as store_catalog;
Expand Down Expand Up @@ -121,6 +122,14 @@ pub fn analyze(
entity_name: Option<&str>,
) -> Result<(), anyhow::Error> {
let locator = search.locate_unique(&pool)?;
analyze_loc(store, &locator, entity_name)
}

fn analyze_loc(
store: Arc<SubgraphStore>,
locator: &DeploymentLocator,
entity_name: Option<&str>,
) -> Result<(), anyhow::Error> {
match entity_name {
Some(entity_name) => println!("Analyzing table sgd{}.{entity_name}", locator.id),
None => println!("Analyzing all tables for sgd{}", locator.id),
Expand Down Expand Up @@ -165,3 +174,28 @@ pub fn target(
}
Ok(())
}

pub fn set_target(
store: Arc<SubgraphStore>,
primary: ConnectionPool,
search: &DeploymentSearch,
entity: Option<&str>,
columns: Vec<String>,
target: i32,
no_analyze: bool,
) -> Result<(), anyhow::Error> {
let columns = if columns.is_empty() {
vec!["id".to_string(), "block_range".to_string()]
} else {
columns
};

let locator = search.locate_unique(&primary)?;

store.set_stats_target(&locator, entity, columns, target)?;

if !no_analyze {
analyze_loc(store, &locator, entity)?;
}
Ok(())
}
10 changes: 8 additions & 2 deletions store/postgres/src/block_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use diesel::sql_types::{Integer, Range};
use std::io::Write;
use std::ops::{Bound, RangeBounds, RangeFrom};

use graph::prelude::{BlockNumber, BlockPtr, BLOCK_NUMBER_MAX};
use graph::prelude::{lazy_static, BlockNumber, BlockPtr, BLOCK_NUMBER_MAX};

use crate::relational::Table;
use crate::relational::{SqlName, Table};

/// The name of the column in which we store the block range for mutable
/// entities
Expand Down Expand Up @@ -39,6 +39,12 @@ pub(crate) const UNVERSIONED_RANGE: (Bound<i32>, Bound<i32>) =
/// immutable entity is visible
pub(crate) const BLOCK_COLUMN: &str = "block$";

lazy_static! {
pub(crate) static ref BLOCK_RANGE_COLUMN_SQL: SqlName =
SqlName::verbatim(BLOCK_RANGE_COLUMN.to_string());
pub(crate) static ref BLOCK_COLUMN_SQL: SqlName = SqlName::verbatim(BLOCK_COLUMN.to_string());
}

/// The range of blocks for which an entity is valid. We need this struct
/// to bind ranges into Diesel queries.
#[derive(Clone, Debug)]
Expand Down
17 changes: 17 additions & 0 deletions store/postgres/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use diesel::{
ExpressionMethods, QueryDsl,
};
use graph::components::store::VersionStats;
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Write;
use std::iter::FromIterator;
Expand Down Expand Up @@ -747,3 +748,19 @@ pub(crate) fn stats_targets(
);
Ok(map)
}

pub(crate) fn set_stats_target(
conn: &PgConnection,
namespace: &Namespace,
table: &SqlName,
columns: &[&SqlName],
target: i32,
) -> Result<(), StoreError> {
let columns = columns
.iter()
.map(|column| format!("alter column {} set statistics {}", column.quoted(), target))
.join(", ");
let query = format!("alter table {}.{} {}", namespace, table.quoted(), columns);
conn.batch_execute(&query)?;
Ok(())
}
60 changes: 47 additions & 13 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::catalog;
use crate::deployment;
use crate::detail::ErrorDetail;
use crate::dynds::DataSourcesTable;
use crate::layout_for_tests::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
use crate::relational::{Layout, LayoutCache, SqlName, Table};
use crate::relational_queries::FromEntityData;
use crate::{connection_pool::ConnectionPool, detail};
Expand Down Expand Up @@ -709,6 +710,31 @@ impl DeploymentStore {
Ok((default, targets))
}

pub(crate) fn set_stats_target(
&self,
site: Arc<Site>,
entity: Option<&str>,
columns: Vec<String>,
target: i32,
) -> Result<(), StoreError> {
let conn = self.get_conn()?;
let layout = self.layout(&conn, site.clone())?;

let tables = entity
.map(|entity| resolve_table_name(&layout, &entity))
.transpose()?
.map(|table| vec![table])
.unwrap_or_else(|| layout.tables.values().map(Arc::as_ref).collect());

conn.transaction(|| {
for table in tables {
let columns = resolve_column_names(table, &columns)?;
catalog::set_stats_target(&conn, &site.namespace, &table.name, &columns, target)?;
}
Ok(())
})
}

/// Runs the SQL `ANALYZE` command in a table, with a shared connection.
pub(crate) fn analyze_with_conn(
&self,
Expand Down Expand Up @@ -1646,26 +1672,34 @@ fn resolve_table_name<'a>(layout: &'a Layout, name: &'_ str) -> Result<&'a Table
})
}

// Resolves column names.
//
// Since we allow our input to be either camel-case or snake-case, we must retry the
// search using the latter if the search for the former fails.
/// Resolves column names against the `table`. The `field_names` can be
/// either GraphQL attributes or the SQL names of columns. We also accept
/// the names `block_range` and `block$` and map that to the correct name
/// for the block range column for that table.
fn resolve_column_names<'a, T: AsRef<str>>(
table: &'a Table,
field_names: &[T],
) -> Result<Vec<&'a SqlName>, StoreError> {
fn lookup<'a>(table: &'a Table, field: &str) -> Result<&'a SqlName, StoreError> {
table
.column_for_field(field)
.or_else(|_error| {
let sql_name = SqlName::from(field);
table
.column(&sql_name)
.ok_or_else(|| StoreError::UnknownField(field.to_string()))
})
.map(|column| &column.name)
}

field_names
.iter()
.map(|f| {
table
.column_for_field(f.as_ref())
.or_else(|_error| {
let sql_name = SqlName::from(f.as_ref());
table
.column(&sql_name)
.ok_or_else(|| StoreError::UnknownField(f.as_ref().to_string()))
})
.map(|column| &column.name)
if f.as_ref() == BLOCK_RANGE_COLUMN || f.as_ref() == BLOCK_COLUMN {
Ok(table.block_column())
} else {
lookup(table, f.as_ref())
}
})
.collect()
}
8 changes: 8 additions & 0 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,14 @@ impl Table {
conn.execute(&sql)?;
Ok(())
}

pub(crate) fn block_column(&self) -> &SqlName {
if self.immutable {
&*crate::block_range::BLOCK_COLUMN_SQL
} else {
&*crate::block_range::BLOCK_RANGE_COLUMN_SQL
}
}
}

/// Return the enclosed named type for a field type, i.e., the type after
Expand Down
14 changes: 14 additions & 0 deletions store/postgres/src/subgraph_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,20 @@ impl SubgraphStoreInner {
store.stats_targets(site)
}

/// Set the statistics target for columns `columns` in `deployment`. If
/// `entity` is `Some`, only set it for the table for that entity, if it
/// is `None`, set it for all tables in the deployment.
pub fn set_stats_target(
&self,
deployment: &DeploymentLocator,
entity: Option<&str>,
columns: Vec<String>,
target: i32,
) -> Result<(), StoreError> {
let (store, site) = self.store(&deployment.hash)?;
store.set_stats_target(site, entity, columns, target)
}

pub async fn create_manual_index(
&self,
deployment: &DeploymentLocator,
Expand Down

0 comments on commit 0637ce0

Please sign in to comment.