Skip to content

Commit

Permalink
[#9936] Alter and Create table via PgClient
Browse files Browse the repository at this point in the history
Summary:
This diff continue migration to PgClient and introduces the following function with their usage:
1) AlterTable
2) CreateTable
3) CreateDatabase

Test Plan: Jenkins

Reviewers: dmitry

Reviewed By: dmitry

Subscribers: bogdan, ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D13133
  • Loading branch information
spolitov committed Sep 27, 2021
1 parent 1a591a1 commit 7d573ef
Show file tree
Hide file tree
Showing 28 changed files with 966 additions and 478 deletions.
1 change: 0 additions & 1 deletion src/yb/client/async_initializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class AsyncClientInitialiser {

YBClient* client() const;


YBClientBuilder& builder() {
return client_builder_;
}
Expand Down
4 changes: 2 additions & 2 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2438,7 +2438,7 @@ TEST_F(ClientTest, GetNamespaceInfo) {
kPgsqlKeyspaceID,
"" /* source_namespace_id */,
boost::none /* next_pg_oid */,
boost::none /* txn */,
nullptr /* txn */,
true /* colocated */));

// CQL non-colocated.
Expand Down Expand Up @@ -2571,7 +2571,7 @@ TEST_F(ClientTest, ColocatedTablesLookupTablet) {
common_table_name.namespace_id(),
/* source_namespace_id =*/ "",
/* next_pg_oid =*/ boost::none,
/* txn =*/ boost::none,
/* txn =*/ nullptr,
/* colocated =*/ true));

YBSchemaBuilder schemaBuilder;
Expand Down
22 changes: 10 additions & 12 deletions src/yb/client/client-unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,20 @@ using std::vector;
using namespace std::literals;
using namespace std::placeholders;

const std::string kNoPrimaryKeyMessage = "Invalid argument: No primary key specified";

TEST(ClientUnitTest, TestSchemaBuilder_EmptySchema) {
YBSchema s;
YBSchemaBuilder b;
ASSERT_EQ("Invalid argument: no primary key specified",
b.Build(&s).ToString(/* no file/line */ false));
ASSERT_EQ(kNoPrimaryKeyMessage, b.Build(&s).ToString(/* no file/line */ false));
}

TEST(ClientUnitTest, TestSchemaBuilder_KeyNotSpecified) {
YBSchema s;
YBSchemaBuilder b;
b.AddColumn("a")->Type(INT32)->NotNull();
b.AddColumn("b")->Type(INT32)->NotNull();
ASSERT_EQ("Invalid argument: no primary key specified",
b.Build(&s).ToString(/* no file/line */ false));
ASSERT_EQ(kNoPrimaryKeyMessage, b.Build(&s).ToString(/* no file/line */ false));
}

TEST(ClientUnitTest, TestSchemaBuilder_DuplicateColumn) {
Expand All @@ -79,11 +79,10 @@ TEST(ClientUnitTest, TestSchemaBuilder_WrongPrimaryKeyOrder) {
YBSchema s;
YBSchemaBuilder b;
b.AddColumn("key")->Type(INT32);
b.AddColumn("x")->Type(INT32)->NotNull()->PrimaryKey();;
b.AddColumn("x")->Type(INT32)->NotNull()->PrimaryKey();
b.AddColumn("x")->Type(INT32);
const char *expected_status =
"Invalid argument: The given columns in a schema must be ordered as hash primary key columns "
"then primary key columns and then regular columns";
"Invalid argument: Primary key column 'x' should be before regular column 'key'";
ASSERT_EQ(expected_status, b.Build(&s).ToString(/* no file/line */ false));
}

Expand All @@ -93,8 +92,7 @@ TEST(ClientUnitTest, TestSchemaBuilder_WrongHashKeyOrder) {
b.AddColumn("a")->Type(INT32)->PrimaryKey();
b.AddColumn("b")->Type(INT32)->HashPrimaryKey();
const char *expected_status =
"Invalid argument: The given columns in a schema must be ordered as hash primary key columns "
"then primary key columns and then regular columns";
"Invalid argument: Hash primary key column 'b' should be before primary key 'a'";
ASSERT_EQ(expected_status, b.Build(&s).ToString(/* no file/line */ false));
}

Expand All @@ -104,7 +102,7 @@ TEST(ClientUnitTest, TestSchemaBuilder_PrimaryKeyOnColumnAndSet) {
b.AddColumn("a")->Type(INT32)->PrimaryKey();
b.AddColumn("b")->Type(INT32);
b.SetPrimaryKey({ "a", "b" });
ASSERT_EQ("Invalid argument: primary key specified by both "
ASSERT_EQ("Invalid argument: Primary key specified by both "
"SetPrimaryKey() and on a specific column: a",
b.Build(&s).ToString(/* no file/line */ false));
}
Expand Down Expand Up @@ -138,7 +136,7 @@ TEST(ClientUnitTest, TestSchemaBuilder_CompoundKey_KeyNotFirst) {
b.AddColumn("a")->Type(INT32)->NotNull();
b.AddColumn("b")->Type(INT32)->NotNull();
b.SetPrimaryKey({ "a", "b" });
ASSERT_EQ("Invalid argument: primary key columns must be listed "
ASSERT_EQ("Invalid argument: Primary key columns must be listed "
"first in the schema: a",
b.Build(&s).ToString(/* no file/line */ false));
}
Expand All @@ -149,7 +147,7 @@ TEST(ClientUnitTest, TestSchemaBuilder_CompoundKey_BadColumnName) {
b.AddColumn("a")->Type(INT32)->NotNull();
b.AddColumn("b")->Type(INT32)->NotNull();
b.SetPrimaryKey({ "foo" });
ASSERT_EQ("Invalid argument: primary key column not defined: foo",
ASSERT_EQ("Invalid argument: Primary key column not defined: foo",
b.Build(&s).ToString(/* no file/line */ false));
}

Expand Down
15 changes: 9 additions & 6 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,9 @@ Status YBClient::CreateNamespace(const std::string& namespace_name,
const std::string& namespace_id,
const std::string& source_namespace_id,
const boost::optional<uint32_t>& next_pg_oid,
const boost::optional<TransactionMetadata>& txn,
const bool colocated) {
const TransactionMetadata* txn,
const bool colocated,
CoarseTimePoint deadline) {
CreateNamespaceRequestPB req;
CreateNamespaceResponsePB resp;
req.set_name(namespace_name);
Expand All @@ -747,7 +748,9 @@ Status YBClient::CreateNamespace(const std::string& namespace_name,
txn->ToPB(req.mutable_transaction());
}
req.set_colocated(colocated);
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
if (deadline == CoarseTimePoint()) {
deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
}
Status s = data_->SyncLeaderMasterRpc<CreateNamespaceRequestPB, CreateNamespaceResponsePB>(
deadline, req, &resp, nullptr, "CreateNamespace", &MasterServiceProxy::CreateNamespace);
if (resp.has_error()) {
Expand All @@ -758,8 +761,8 @@ Status YBClient::CreateNamespace(const std::string& namespace_name,

// Verify that the namespace we found is running so that, once this request returns,
// the client can send operations without receiving a "namespace not found" error.
RETURN_NOT_OK(data_->WaitForCreateNamespaceToFinish(this, namespace_name, database_type, cur_id,
CoarseMonoClock::Now() + default_admin_operation_timeout()));
RETURN_NOT_OK(data_->WaitForCreateNamespaceToFinish(
this, namespace_name, database_type, cur_id, deadline));

return Status::OK();
}
Expand All @@ -782,7 +785,7 @@ Status YBClient::CreateNamespaceIfNotExists(const std::string& namespace_name,
}

Status s = CreateNamespace(namespace_name, database_type, creator_role_name, namespace_id,
source_namespace_id, next_pg_oid, boost::none /* txn */, colocated);
source_namespace_id, next_pg_oid, nullptr /* txn */, colocated);
if (s.IsAlreadyPresent() && database_type && *database_type == YQLDatabase::YQL_DATABASE_CQL) {
return Status::OK();
}
Expand Down
5 changes: 3 additions & 2 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,9 @@ class YBClient {
const std::string& namespace_id = "",
const std::string& source_namespace_id = "",
const boost::optional<uint32_t>& next_pg_oid = boost::none,
const boost::optional<TransactionMetadata>& txn = boost::none,
const bool colocated = false);
const TransactionMetadata* txn = nullptr,
const bool colocated = false,
CoarseTimePoint deadline = CoarseTimePoint());

// It calls CreateNamespace(), but before it checks that the namespace has NOT been yet
// created. So, it prevents error 'namespace already exists'.
Expand Down
49 changes: 24 additions & 25 deletions src/yb/client/schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ YBSchemaBuilder* YBSchemaBuilder::SetTableProperties(const TableProperties& tabl
}

Status YBSchemaBuilder::Build(YBSchema* schema) {
vector<YBColumnSchema> cols;
cols.resize(data_->specs.size(), YBColumnSchema());
std::vector<YBColumnSchema> cols(data_->specs.size(), YBColumnSchema());
for (int i = 0; i < cols.size(); i++) {
RETURN_NOT_OK(data_->specs[i]->ToColumnSchema(&cols[i]));
}
Expand All @@ -218,40 +217,40 @@ Status YBSchemaBuilder::Build(YBSchema* schema) {
// Removing the following restriction from Kudu:
// If they didn't explicitly pass the column names for key,
// then they should have set it on exactly one column.
bool has_order_error = false;
bool reached_regular_column = false;
bool reached_primary_column = false;
const YBColumnSpec::Data* reached_primary_column = nullptr;
const YBColumnSpec::Data* reached_regular_column = nullptr;
for (int i = 0; i < cols.size(); i++) {
if (data_->specs[i]->data_->hash_primary_key) {
auto& column_data = *data_->specs[i]->data_;
if (column_data.hash_primary_key) {
num_key_cols++;
if (reached_primary_column || reached_regular_column) {
has_order_error = true;
break;
if (reached_primary_column) {
return STATUS_FORMAT(
InvalidArgument, "Hash primary key column '$0' should be before primary key '$1'",
column_data.name, reached_primary_column->name);
}
if (reached_regular_column) {
return STATUS_FORMAT(
InvalidArgument, "Hash primary key column '$0' should be before regular column '$1'",
column_data.name, reached_regular_column->name);
}

} else if (data_->specs[i]->data_->primary_key) {
} else if (column_data.primary_key) {
num_key_cols++;
if (reached_regular_column) {
has_order_error = true;
break;
return STATUS_FORMAT(
InvalidArgument, "Primary key column '$0' should be before regular column '$1'",
column_data.name, reached_regular_column->name);
}
reached_primary_column = true;

reached_primary_column = &column_data;
} else {
reached_regular_column = true;
reached_regular_column = &column_data;
}
}

if (num_key_cols <= 0) {
return STATUS(InvalidArgument, "no primary key specified");
return STATUS(InvalidArgument, "No primary key specified");
}

if (has_order_error) {
return STATUS(InvalidArgument,
"The given columns in a schema must be ordered as hash primary key columns "
"then primary key columns and then regular columns");
}

} else {
// Build a map from name to index of all of the columns.
unordered_map<string, int> name_to_idx_map;
Expand All @@ -260,7 +259,7 @@ Status YBSchemaBuilder::Build(YBSchema* schema) {
// If they did pass the key column names, then we should not have explicitly
// set it on any columns.
if (spec->data_->primary_key) {
return STATUS(InvalidArgument, "primary key specified by both SetPrimaryKey() and on a "
return STATUS(InvalidArgument, "Primary key specified by both SetPrimaryKey() and on a "
"specific column", spec->data_->name);
}

Expand All @@ -282,7 +281,7 @@ Status YBSchemaBuilder::Build(YBSchema* schema) {
for (const string& key_col_name : data_->key_col_names) {
int idx;
if (!FindCopy(name_to_idx_map, key_col_name, &idx)) {
return STATUS(InvalidArgument, "primary key column not defined", key_col_name);
return STATUS(InvalidArgument, "Primary key column not defined", key_col_name);
}
key_col_indexes.push_back(idx);
}
Expand All @@ -292,7 +291,7 @@ Status YBSchemaBuilder::Build(YBSchema* schema) {
// flexible user-facing API.
for (int i = 0; i < key_col_indexes.size(); i++) {
if (key_col_indexes[i] != i) {
return STATUS(InvalidArgument, "primary key columns must be listed first in the schema",
return STATUS(InvalidArgument, "Primary key columns must be listed first in the schema",
data_->key_col_names[i]);
}
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/common/common_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct ColumnId;
struct OpId;
struct QLTableColumn;
struct ReadHybridTime;
struct TransactionMetadata;

enum class PgSystemAttrNum : int;

Expand Down
4 changes: 2 additions & 2 deletions src/yb/integration-tests/create-table-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ TEST_F(CreateTableITest, TableColocationRemoteBootstrapTest) {
ASSERT_OK(
client_->CreateNamespace("colocation_test", boost::none /* db */, "" /* creator */,
"" /* ns_id */, "" /* src_ns_id */,
boost::none /* next_pg_oid */, boost::none /* txn */, true));
boost::none /* next_pg_oid */, nullptr /* txn */, true));

{
string ns_id;
Expand Down Expand Up @@ -437,7 +437,7 @@ TEST_F(CreateTableITest, TablegroupRemoteBootstrapTest) {

ASSERT_OK(client_->CreateNamespace(namespace_name, YQL_DATABASE_PGSQL, "" /* creator */,
"" /* ns_id */, "" /* src_ns_id */,
boost::none /* next_pg_oid */, boost::none /* txn */, false));
boost::none /* next_pg_oid */, nullptr /* txn */, false));

{
auto namespaces = ASSERT_RESULT(client_->ListNamespaces(boost::none));
Expand Down
3 changes: 0 additions & 3 deletions src/yb/master/catalog_entity_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -420,9 +420,6 @@ void TableInfo::GetTabletsInRange(
const std::string& partition_key_start, const std::string& partition_key_end,
TabletInfos* ret, const int32_t max_returned_locations) const {
SharedLock<decltype(lock_)> l(lock_);

LOG(INFO) << __func__ << ": " << AsString(partitions_);

decltype(partitions_)::const_iterator it, it_end;
if (partition_key_start.empty()) {
it = partitions_.begin();
Expand Down
6 changes: 3 additions & 3 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4785,8 +4785,8 @@ CHECKED_STATUS ApplyAlterSteps(server::Clock* clock,
Status CatalogManager::AlterTable(const AlterTableRequestPB* req,
AlterTableResponsePB* resp,
rpc::RpcContext* rpc) {
LOG(INFO) << "Servicing AlterTable request from " << RequestorString(rpc)
<< ": " << req->ShortDebugString();
LOG_WITH_PREFIX(INFO) << "Servicing " << __func__ << " request from " << RequestorString(rpc)
<< ": " << req->ShortDebugString();

std::vector<DdlLogEntry> ddl_log_entries;

Expand Down Expand Up @@ -7922,7 +7922,7 @@ CHECKED_STATUS CatalogManager::SendAlterTableRequest(const scoped_refptr<TableIn
auto call = std::make_shared<AsyncAlterTable>(master_, AsyncTaskPool(), tablet, table, txn_id);
tablet->table()->AddTask(call);
if (PREDICT_FALSE(FLAGS_TEST_slowdown_alter_table_rpcs_ms > 0)) {
LOG(INFO) << "Sleeping for " << tablet->id()
LOG(INFO) << "Sleeping for " << tablet->id() << " "
<< FLAGS_TEST_slowdown_alter_table_rpcs_ms
<< "ms before sending async alter table request";
SleepFor(MonoDelta::FromMilliseconds(FLAGS_TEST_slowdown_alter_table_rpcs_ms));
Expand Down
3 changes: 2 additions & 1 deletion src/yb/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ Status Master::RegisterServices() {
FLAGS_master_svc_queue_length,
std::make_unique<tserver::PgClientServiceImpl>(
client_future(), std::bind(&Master::TransactionPool, this),
metric_entity())));
metric_entity(),
&messenger()->scheduler())));

return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/tserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ set(TSERVER_SRCS
metrics_snapshotter.cc
mini_tablet_server.cc
pg_client_service.cc
pg_create_table.cc
remote_bootstrap_client.cc
remote_bootstrap_file_downloader.cc
remote_bootstrap_service.cc
Expand Down
Loading

0 comments on commit 7d573ef

Please sign in to comment.