Skip to content
This repository has been archived by the owner on Sep 27, 2019. It is now read-only.

Commit

Permalink
Namespace(Schema) support + Catalog refactoring (#1302)
Browse files Browse the repository at this point in the history
* codegen insert catalog tuple

* fix segfault caused by bad insert plan

* consider cached catalog queries

* add update method using old engine

* add update method using old engine

* add version id column into pg_table

* column id and offset use uint32

* system catalogs

* using system catalog instead of singleton

* remove default constructer

* fix catalog map

* system catalog return value change to shared pointer

* refactoring system catalog, seems to compile now

* refactor fix

* refactor, peloton can start and finish bootstrap. TODO: refactor global unique oid

* fixed pg_database index, still cannot find system catalog in other databases

* fix two misuse of CATALOG_DATABASE_OID

* seems to work now

* fix two tests

* fix drop index bug, need information about database

* change test case, mainly gettablecount

* fix query cache test

* half way fixing trigger test

* trigger catalog

* move trigger_catalog into per database

* can pass trigger test now, also drop trigger when drop table

* fix udf test, bootstrap only once

* fix db names

* initialize nullptr

* TODO: fix GetInstance, fix AddBuiltInFunctions

* roll back pg_proc for now... require refactoring later

* query metrics catalog refactor

* change metrics tables access method

* stats_test still has problem

* fix query metrics bugs, pass test cases in stats_test

* merge local changes

* fix all the test cases

* fix double initialize

* add full functionality of create/drop schema(namespace)

* rebase to latest master branch

* add more comments

* fix test case errors, able to compile, one failing test

* resolve conflicts

* fix iso compile bug

* make changes according to pooja's reviews

* added namespace sql test

* addressing pooja's comments

* fix plan util test
  • Loading branch information
mengranwo authored and pervazea committed Apr 30, 2018
1 parent ff5b583 commit d68ab71
Show file tree
Hide file tree
Showing 124 changed files with 3,616 additions and 2,508 deletions.
14 changes: 7 additions & 7 deletions src/binder/bind_node_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@
//
//===----------------------------------------------------------------------===//

#include "expression/expression_util.h"
#include "binder/bind_node_visitor.h"
#include "expression/star_expression.h"
#include "catalog/catalog.h"
#include "expression/expression_util.h"
#include "expression/star_expression.h"
#include "type/type_id.h"

#include "expression/aggregate_expression.h"
#include "expression/case_expression.h"
#include "expression/function_expression.h"
#include "expression/operator_expression.h"
#include "expression/star_expression.h"
#include "expression/tuple_value_expression.h"
#include "expression/subquery_expression.h"
#include "expression/tuple_value_expression.h"

namespace peloton {
namespace binder {
Expand Down Expand Up @@ -155,8 +155,8 @@ void BindNodeVisitor::Visit(parser::UpdateStatement *node) {
void BindNodeVisitor::Visit(parser::DeleteStatement *node) {
context_ = std::make_shared<BinderContext>(nullptr);
node->TryBindDatabaseName(default_database_name_);
context_->AddRegularTable(node->GetDatabaseName(), node->GetTableName(),
node->GetTableName(), txn_);
context_->AddRegularTable(node->GetDatabaseName(), node->GetSchemaName(),
node->GetTableName(), node->GetTableName(), txn_);

if (node->expr != nullptr) {
node->expr->Accept(this);
Expand All @@ -174,8 +174,8 @@ void BindNodeVisitor::Visit(parser::CreateStatement *node) {
void BindNodeVisitor::Visit(parser::InsertStatement *node) {
node->TryBindDatabaseName(default_database_name_);
context_ = std::make_shared<BinderContext>(nullptr);
context_->AddRegularTable(node->GetDatabaseName(), node->GetTableName(),
node->GetTableName(), txn_);
context_->AddRegularTable(node->GetDatabaseName(), node->GetSchemaName(),
node->GetTableName(), node->GetTableName(), txn_);
if (node->select != nullptr) {
node->select->Accept(this);
}
Expand Down
11 changes: 6 additions & 5 deletions src/binder/binder_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
#include "catalog/column_catalog.h"
#include "catalog/database_catalog.h"
#include "catalog/table_catalog.h"
#include "parser/table_ref.h"
#include "expression/tuple_value_expression.h"
#include "parser/table_ref.h"
#include "storage/storage_manager.h"

namespace peloton {
Expand All @@ -28,17 +28,18 @@ void BinderContext::AddRegularTable(parser::TableRef *table_ref,
concurrency::TransactionContext *txn) {
table_ref->TryBindDatabaseName(default_database_name);
auto table_alias = table_ref->GetTableAlias();
AddRegularTable(table_ref->GetDatabaseName(), table_ref->GetTableName(),
table_alias, txn);
AddRegularTable(table_ref->GetDatabaseName(), table_ref->GetSchemaName(),
table_ref->GetTableName(), table_alias, txn);
}

void BinderContext::AddRegularTable(const std::string db_name,
const std::string schema_name,
const std::string table_name,
const std::string table_alias,
concurrency::TransactionContext *txn) {
// using catalog object to retrieve meta-data
auto table_object =
catalog::Catalog::GetInstance()->GetTableObject(db_name, table_name, txn);
auto table_object = catalog::Catalog::GetInstance()->GetTableObject(
db_name, schema_name, table_name, txn);

if (regular_table_alias_map_.find(table_alias) !=
regular_table_alias_map_.end() ||
Expand Down
215 changes: 148 additions & 67 deletions src/catalog/abstract_catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@

#include "catalog/abstract_catalog.h"

#include "binder/bind_node_visitor.h"

#include "common/statement.h"

#include "catalog/catalog.h"
#include "catalog/database_catalog.h"
#include "catalog/table_catalog.h"

Expand All @@ -32,7 +31,9 @@
#include "executor/delete_executor.h"
#include "executor/index_scan_executor.h"
#include "executor/insert_executor.h"
#include "executor/plan_executor.h"
#include "executor/seq_scan_executor.h"
#include "executor/update_executor.h"

#include "storage/database.h"
#include "storage/storage_manager.h"
Expand All @@ -45,84 +46,95 @@ AbstractCatalog::AbstractCatalog(oid_t catalog_table_oid,
std::string catalog_table_name,
catalog::Schema *catalog_table_schema,
storage::Database *pg_catalog) {
// set database_oid
database_oid = pg_catalog->GetOid();
// Create catalog_table_
catalog_table_ = storage::TableFactory::GetDataTable(
CATALOG_DATABASE_OID, catalog_table_oid, catalog_table_schema,
catalog_table_name, DEFAULT_TUPLES_PER_TILEGROUP, true, false, true);

database_oid, catalog_table_oid, catalog_table_schema, catalog_table_name,
DEFAULT_TUPLES_PER_TILEGROUP, true, false, true);
// Add catalog_table_ into pg_catalog database
pg_catalog->AddTable(catalog_table_, true);
}

AbstractCatalog::AbstractCatalog(const std::string &catalog_table_ddl,
concurrency::TransactionContext *txn) {
// Get catalog table schema
// get catalog table schema
auto &peloton_parser = parser::PostgresParser::GetInstance();

// Build the parse tree
const auto parse_tree_list = peloton_parser.BuildParseTree(catalog_table_ddl);
if (parse_tree_list->GetStatements().empty()) {
throw CatalogException(
"Parse tree list has no parse trees. Cannot build plan");
}
// TODO: support multi-statement queries
auto parse_tree = parse_tree_list->GetStatement(0);

// Run binder
auto bind_node_visitor = binder::BindNodeVisitor(txn, DATABASE_CATALOG_NAME);
bind_node_visitor.BindNameToNode(parse_tree);

// Create the plan tree
auto create_plan = std::dynamic_pointer_cast<planner::CreatePlan>(
optimizer::Optimizer().BuildPelotonPlanTree(parse_tree_list, txn));
optimizer::Optimizer().BuildPelotonPlanTree(
peloton_parser.BuildParseTree(catalog_table_ddl), txn));
auto catalog_table_schema = create_plan->GetSchema();
auto catalog_table_name = create_plan->GetTableName();

// Create catalog table
auto catalog_schema_name = create_plan->GetSchemaName();
auto catalog_database_name = create_plan->GetDatabaseName();
PELOTON_ASSERT(catalog_schema_name == std::string(CATALOG_SCHEMA_NAME));
// create catalog table
Catalog::GetInstance()->CreateTable(
CATALOG_DATABASE_NAME, catalog_table_name,
catalog_database_name, catalog_schema_name, catalog_table_name,
std::unique_ptr<catalog::Schema>(catalog_table_schema), txn, true);

// Get catalog table oid
// get catalog table oid
auto catalog_table_object = Catalog::GetInstance()->GetTableObject(
CATALOG_DATABASE_NAME, catalog_table_name, txn);
catalog_database_name, catalog_schema_name, catalog_table_name, txn);

// Set catalog_table_
// set catalog_table_
try {
catalog_table_ = storage::StorageManager::GetInstance()->GetTableWithOid(
CATALOG_DATABASE_OID, catalog_table_object->GetTableOid());
catalog_table_object->GetDatabaseOid(),
catalog_table_object->GetTableOid());
// set database_oid
database_oid = catalog_table_object->GetDatabaseOid();
} catch (CatalogException &e) {
LOG_TRACE("Can't find table %d! Return false",
catalog_table_object->GetTableOid());
}
}

/*@brief insert tuple(reord) helper function
* @param tuple tuple to be inserted
* @param txn TransactionContext
* @return Whether insertion is Successful
*/
* @param tuple tuple to be inserted
* @param txn TransactionContext
* @return Whether insertion is Successful
*/
bool AbstractCatalog::InsertTuple(std::unique_ptr<storage::Tuple> tuple,
concurrency::TransactionContext *txn) {
if (txn == nullptr)
throw CatalogException("Insert tuple requires transaction");

std::unique_ptr<executor::ExecutorContext> context(
new executor::ExecutorContext(txn));
planner::InsertPlan node(catalog_table_, std::move(tuple));
executor::InsertExecutor executor(&node, context.get());
executor.Init();
bool status = executor.Execute();
std::vector<type::Value> params;
std::vector<std::string> columns;
std::vector<std::vector<std::unique_ptr<expression::AbstractExpression>>>
values;
values.push_back(
std::vector<std::unique_ptr<expression::AbstractExpression>>());
std::vector<int> result_format(tuple->GetSchema()->GetColumnCount(), 0);
for (size_t i = 0; i < tuple->GetSchema()->GetColumnCount(); i++) {
params.push_back(tuple->GetValue(i));
columns.push_back(tuple->GetSchema()->GetColumn(i).GetName());
values[0].emplace_back(
new expression::ConstantValueExpression(tuple->GetValue(i)));
}
auto node =
std::make_shared<planner::InsertPlan>(catalog_table_, &columns, &values);

return status;
executor::ExecutionResult this_p_status;
auto on_complete = [&this_p_status](
executor::ExecutionResult p_status,
std::vector<ResultValue> &&values UNUSED_ATTRIBUTE) {
this_p_status = p_status;
};

executor::PlanExecutor::ExecutePlan(node, txn, params, result_format,
on_complete);

return this_p_status.m_result == peloton::ResultType::SUCCESS;
}

/*@brief Delete a tuple using index scan
* @param index_offset Offset of index for scan
* @param values Values for search
* @param txn TransactionContext
* @return Whether deletion is Successful
*/
* @param index_offset Offset of index for scan
* @param values Values for search
* @param txn TransactionContext
* @return Whether deletion is Successful
*/
bool AbstractCatalog::DeleteWithIndexScan(
oid_t index_offset, std::vector<type::Value> values,
concurrency::TransactionContext *txn) {
Expand Down Expand Up @@ -167,12 +179,12 @@ bool AbstractCatalog::DeleteWithIndexScan(
}

/*@brief Index scan helper function
* @param column_offsets Column ids for search (projection)
* @param index_offset Offset of index for scan
* @param values Values for search
* @param txn TransactionContext
* @return Unique pointer of vector of logical tiles
*/
* @param column_offsets Column ids for search (projection)
* @param index_offset Offset of index for scan
* @param values Values for search
* @param txn TransactionContext
* @return Unique pointer of vector of logical tiles
*/
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
AbstractCatalog::GetResultWithIndexScan(
std::vector<oid_t> column_offsets, oid_t index_offset,
Expand Down Expand Up @@ -215,14 +227,14 @@ AbstractCatalog::GetResultWithIndexScan(
}

/*@brief Sequential scan helper function
* NOTE: try to use efficient index scan instead of sequential scan, but you
* shouldn't build too many indexes on one catalog table
* @param column_offsets Column ids for search (projection)
* @param predicate predicate for this sequential scan query
* @param txn TransactionContext
*
* @return Unique pointer of vector of logical tiles
*/
* NOTE: try to use efficient index scan instead of sequential scan, but you
* shouldn't build too many indexes on one catalog table
* @param column_offsets Column ids for search (projection)
* @param predicate predicate for this sequential scan query
* @param txn TransactionContext
*
* @return Unique pointer of vector of logical tiles
*/
std::unique_ptr<std::vector<std::unique_ptr<executor::LogicalTile>>>
AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets,
expression::AbstractExpression *predicate,
Expand Down Expand Up @@ -250,14 +262,14 @@ AbstractCatalog::GetResultWithSeqScan(std::vector<oid_t> column_offsets,
}

/*@brief Add index on catalog table
* @param key_attrs indexed column offset(position)
* @param index_oid index id(global unique)
* @param index_name index name(global unique)
* @param index_constraint index constraints
* @return Unique pointer of vector of logical tiles
* Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
* IndexCatalog should need this
*/
* @param key_attrs indexed column offset(position)
* @param index_oid index id(global unique)
* @param index_name index name(global unique)
* @param index_constraint index constraints
* @return Unique pointer of vector of logical tiles
* Note: Use catalog::Catalog::CreateIndex() if you can, only ColumnCatalog and
* IndexCatalog should need this
*/
void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs,
oid_t index_oid, const std::string &index_name,
IndexConstraintType index_constraint) {
Expand Down Expand Up @@ -286,5 +298,74 @@ void AbstractCatalog::AddIndex(const std::vector<oid_t> &key_attrs,
index_name.c_str(), (int)catalog_table_->GetOid());
}

/*@brief Update specific columns using index scan
* @param update_columns Columns to be updated
* @param update_values Values to be updated
* @param scan_values Value to be scaned (used in index scan)
* @param index_offset Offset of index for scan
* @return true if successfully executes
*/
bool AbstractCatalog::UpdateWithIndexScan(
std::vector<oid_t> update_columns, std::vector<type::Value> update_values,
std::vector<type::Value> scan_values, oid_t index_offset,
concurrency::TransactionContext *txn) {
if (txn == nullptr) throw CatalogException("Scan table requires transaction");

std::unique_ptr<executor::ExecutorContext> context(
new executor::ExecutorContext(txn));
// Construct index scan executor
auto index = catalog_table_->GetIndex(index_offset);
std::vector<oid_t> key_column_offsets =
index->GetMetadata()->GetKeySchema()->GetIndexedColumns();

// NOTE: For indexed scan on catalog tables, we expect it not to be "partial
// indexed scan"(efficiency purpose).That being said, indexed column number
// must be equal to passed in "scan_values" size
PELOTON_ASSERT(scan_values.size() == key_column_offsets.size());
std::vector<ExpressionType> expr_types(scan_values.size(),
ExpressionType::COMPARE_EQUAL);
std::vector<expression::AbstractExpression *> runtime_keys;

planner::IndexScanPlan::IndexScanDesc index_scan_desc(
index->GetOid(), key_column_offsets, expr_types, scan_values,
runtime_keys);

planner::IndexScanPlan index_scan_node(catalog_table_, nullptr,
update_columns, index_scan_desc);

executor::IndexScanExecutor index_scan_executor(&index_scan_node,
context.get());
// Construct update executor
TargetList target_list;
DirectMapList direct_map_list;

size_t column_count = catalog_table_->GetSchema()->GetColumnCount();
for (size_t col_itr = 0; col_itr < column_count; col_itr++) {
// Skip any column for update
if (std::find(std::begin(update_columns), std::end(update_columns),
col_itr) == std::end(update_columns)) {
direct_map_list.emplace_back(col_itr, std::make_pair(0, col_itr));
}
}

PELOTON_ASSERT(update_columns.size() == update_values.size());
for (size_t i = 0; i < update_values.size(); i++) {
planner::DerivedAttribute update_attribute{
new expression::ConstantValueExpression(update_values[i])};
target_list.emplace_back(update_columns[i], update_attribute);
}

std::unique_ptr<const planner::ProjectInfo> project_info(
new planner::ProjectInfo(std::move(target_list),
std::move(direct_map_list)));
planner::UpdatePlan update_node(catalog_table_, std::move(project_info));

executor::UpdateExecutor update_executor(&update_node, context.get());
update_executor.AddChild(&index_scan_executor);
// Execute
update_executor.Init();
return update_executor.Execute();
}

} // namespace catalog
} // namespace peloton
Loading

0 comments on commit d68ab71

Please sign in to comment.