Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coord: don't track view uses separately from catalog #3979

Closed
wants to merge 9 commits into from
112 changes: 104 additions & 8 deletions src/coord/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::collections::{BTreeMap, HashMap, HashSet};
use std::iter;
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::SystemTime;

Expand Down Expand Up @@ -72,7 +73,7 @@ pub const MZ_CATALOG_SCHEMA: &str = "mz_catalog";
pub struct Catalog {
by_name: BTreeMap<String, Database>,
by_id: BTreeMap<GlobalId, CatalogEntry>,
indexes: HashMap<GlobalId, Vec<Vec<ScalarExpr>>>,
indexes: HashMap<GlobalId, Vec<(GlobalId, Vec<ScalarExpr>)>>,
ambient_schemas: BTreeMap<String, Schema>,
temporary_schemas: HashMap<u32, Schema>,
storage: Arc<Mutex<storage::Connection>>,
Expand Down Expand Up @@ -739,12 +740,19 @@ impl Catalog {
}
}

if let CatalogItem::Index(index) = entry.item() {
self.indexes
.entry(index.on)
.or_insert_with(Vec::new)
.push(index.keys.clone());
match entry.item() {
CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::View(_) => {
self.indexes.insert(id, vec![]);
}
CatalogItem::Index(index) => {
self.indexes
.get_mut(&index.on)
.unwrap()
.push((id, index.keys.clone()));
}
CatalogItem::Sink(_) => (),
}

let conn_id = entry.item().conn_id().unwrap_or(SYSTEM_CONN_ID);
self.get_schema_mut(&entry.name.database, &entry.name.schema, conn_id)
.expect("catalog out of sync")
Expand Down Expand Up @@ -1115,9 +1123,11 @@ impl Catalog {
.expect("catalog out of sync");
let i = indexes
.iter()
.position(|keys| keys == &index.keys)
.position(|(idx_id, _keys)| *idx_id == id)
.expect("catalog out of sync");
indexes.remove(i);
} else {
self.indexes.remove(&id);
}
OpStatus::DroppedItem(metadata)
}
Expand Down Expand Up @@ -1254,10 +1264,73 @@ impl Catalog {

/// Returns a mapping that indicates all indices that are available for
/// each item in the catalog.
pub fn indexes(&self) -> &HashMap<GlobalId, Vec<Vec<ScalarExpr>>> {
pub fn indexes(&self) -> &HashMap<GlobalId, Vec<(GlobalId, Vec<ScalarExpr>)>> {
&self.indexes
}

/// Finds the nearest indexes that can satisfy the views or sources whose
/// identifiers are listed in `ids`.
///
/// Returns the identifiers of all discovered indexes, along with a boolean
/// indicating whether the set of indexes is complete. If incomplete, then
/// one of the provided identifiers transitively depends on an
/// unmaterialized source.
pub fn nearest_indexes(&self, ids: &[GlobalId]) -> (Vec<GlobalId>, bool) {
fn inner(
catalog: &Catalog,
id: GlobalId,
indexes: &mut Vec<GlobalId>,
complete: &mut bool,
) {
// If an index exists for `id`, record it in the output set and stop
// searching.
if let Some((index_id, _)) = catalog.indexes[&id].first() {
indexes.push(*index_id);
return;
}

match catalog.get_by_id(&id).item() {
view @ CatalogItem::View(_) => {
// Unmaterialized view. Recursively search its dependencies.
for id in view.uses() {
inner(catalog, id, indexes, complete)
}
}
CatalogItem::Source(_) => {
// Unmaterialized source. Record that we are missing at
// least one index.
*complete = false;
}
CatalogItem::Table(_) => {
unreachable!("tables always have at least one index");
}
CatalogItem::Sink(_) | CatalogItem::Index(_) => {
unreachable!("sinks and indexes cannot be depended upon");
}
}
}

let mut indexes = vec![];
let mut complete = true;
for id in ids {
inner(self, *id, &mut indexes, &mut complete)
}
indexes.sort();
indexes.dedup();
(indexes, complete)
}

pub fn uses_tables(&self, id: GlobalId) -> bool {
match self.get_by_id(&id).item() {
CatalogItem::Table(_) => true,
CatalogItem::Source(_) => false,
item @ CatalogItem::View(_) => item.uses().into_iter().any(|id| self.uses_tables(id)),
CatalogItem::Sink(_) | CatalogItem::Index(_) => {
unreachable!("sinks and indexes cannot be depended upon");
}
}
}

pub fn dump(&self) -> String {
serde_json::to_string(&self.by_name).expect("serialization cannot fail")
}
Expand Down Expand Up @@ -1346,6 +1419,20 @@ impl From<PlanContext> for SerializedPlanContext {
}
}

/// Loads the catalog stored at `path` and returns its serialized state.
///
/// There are no guarantees about the format of the serialized state, except
/// that the serialized state for two identical catalogs will compare
/// identically.
pub fn dump(path: &Path) -> Result<String, anyhow::Error> {
let catalog = Catalog::open(Config {
path: Some(path),
enable_logging: true,
experimental_mode: None,
})?;
Ok(catalog.dump())
}

impl sql::catalog::Catalog for ConnCatalog<'_> {
fn startup_time(&self) -> SystemTime {
self.catalog.startup_time
Expand Down Expand Up @@ -1436,6 +1523,15 @@ impl sql::catalog::Catalog for ConnCatalog<'_> {
fn experimental_mode(&self) -> bool {
self.catalog.experimental_mode
}

fn is_queryable(&self, id: GlobalId) -> bool {
let (_, complete) = self.catalog.nearest_indexes(&[id]);
complete
}

fn is_materialized(&self, id: GlobalId) -> bool {
!self.catalog.indexes[&id].is_empty()
}
}

impl sql::catalog::CatalogItem for CatalogEntry {
Expand Down
7 changes: 0 additions & 7 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,4 @@ impl BUILTINS {
_ => None,
})
}

pub fn tables(&self) -> impl Iterator<Item = &'static BuiltinTable> + '_ {
self.values().filter_map(|b| match b {
Builtin::Table(table) => Some(*table),
_ => None,
})
}
}
Loading