Skip to content

Commit

Permalink
[#25127] docdb: create pg_advisory_locks and add a helper class to lo…
Browse files Browse the repository at this point in the history
…ad this table

Summary:
Create a system table pg_advisory_locks to store advisory locks with CatalogManagerBgTask.
Add a helper class YsqlAdvisoryLocksTableManager to acess this table.

GFlags:
`num_advisory_locks_tablets` - Number of tablets of pg_advisory_locks table. 1 by default.
`yb_enable_advisory_lock` - Whether to enable advisory lock at docdb layer. For this diff, it's used to control if pg_advisory_locks should be created.
Jira: DB-14277

Test Plan: advisory_lock-test

Reviewers: hsunder

Reviewed By: hsunder

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D40359
  • Loading branch information
Huqicheng committed Dec 4, 2024
1 parent 9fad4bd commit 0d5728b
Show file tree
Hide file tree
Showing 9 changed files with 243 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,4 @@ ADD_YB_TEST(snapshot-txn-test)
ADD_YB_TEST(snapshot_schedule-test)
ADD_YB_TEST(serializable-txn-test)
ADD_YB_TEST(tablet_rpc-test)
ADD_YB_TEST(advisory_lock-test)
102 changes: 102 additions & 0 deletions src/yb/client/advisory_lock-test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright (c) YugaByte, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//

#include "yb/client/yb_table_name.h"
#include "yb/master/master_defaults.h"
#include "yb/tserver/ysql_advisory_lock_table.h"

#include "yb/client/meta_cache.h"

#include "yb/integration-tests/mini_cluster.h"
#include "yb/integration-tests/yb_mini_cluster_test_base.h"

DECLARE_int32(catalog_manager_bg_task_wait_ms);
DECLARE_uint32(num_advisory_locks_tablets);
DECLARE_bool(yb_enable_advisory_lock);

namespace yb {
namespace client {

const int kNumAdvisoryLocksTablets = 1;

class AdvisoryLockTest: public MiniClusterTestWithClient<MiniCluster> {
public:
void SetUp() override {
MiniClusterTestWithClient::SetUp();

SetFlags();
auto opts = MiniClusterOptions();
opts.num_tablet_servers = 3;
opts.num_masters = 1;
cluster_.reset(new MiniCluster(opts));
ASSERT_OK(cluster_->Start());

ASSERT_OK(CreateClient());
if (ANNOTATE_UNPROTECTED_READ(FLAGS_yb_enable_advisory_lock)) {
ASSERT_OK(WaitForCreateTableToFinish());
}
}

Status WaitForCreateTableToFinish() {
YBTableName table_name(
YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName);
return client_->WaitForCreateTableToFinish(
table_name, CoarseMonoClock::Now() + 10s * kTimeMultiplier);
}

Status CheckNumTablets(const YBTablePtr& table) {
auto future = client_->LookupAllTabletsFuture(table, CoarseMonoClock::Now() + 10s);
SCHECK_EQ(VERIFY_RESULT(future.get()).size(),
ANNOTATE_UNPROTECTED_READ(FLAGS_num_advisory_locks_tablets),
IllegalState, "tablet number mismatch");
return Status::OK();
}

std::unique_ptr<YsqlAdvisoryLocksTable> GetYsqlAdvisoryLocksTable() {
return std::make_unique<YsqlAdvisoryLocksTable>(*client_.get());
}

protected:
virtual void SetFlags() {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_num_advisory_locks_tablets) = kNumAdvisoryLocksTablets;
}
};

TEST_F(AdvisoryLockTest, TestAdvisoryLockTableCreated) {
auto table = GetYsqlAdvisoryLocksTable();
ASSERT_OK(CheckNumTablets(ASSERT_RESULT(table->GetTable())));
}

class AdvisoryLocksDisabledTest : public AdvisoryLockTest {
protected:
void SetFlags() override {
AdvisoryLockTest::SetFlags();
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = false;
}
};

TEST_F(AdvisoryLocksDisabledTest, ToggleAdvisoryLockFlag) {
auto table = GetYsqlAdvisoryLocksTable();
// Wait for the background task to run a few times.
SleepFor(FLAGS_catalog_manager_bg_task_wait_ms * kTimeMultiplier * 3ms);
auto res = table->GetTable();
ASSERT_NOK(res);
ASSERT_TRUE(res.status().IsNotSupported());
ANNOTATE_UNPROTECTED_WRITE(FLAGS_yb_enable_advisory_lock) = true;
ASSERT_OK(WaitForCreateTableToFinish());
ASSERT_OK(CheckNumTablets(ASSERT_RESULT(table->GetTable())));
}

} // namespace client
} // namespace yb
4 changes: 4 additions & 0 deletions src/yb/master/catalog_manager_bg_tasks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "yb/master/ts_descriptor.h"
#include "yb/master/tablet_split_manager.h"
#include "yb/master/xcluster/xcluster_manager_if.h"
#include "yb/master/ysql/ysql_manager.h"
#include "yb/master/ysql_backends_manager.h"

#include "yb/util/callsite_profiling.h"
Expand Down Expand Up @@ -201,6 +202,9 @@ void CatalogManagerBgTasks::Run() {
"Failed to create Test Echo service");
}

WARN_NOT_OK(catalog_manager_->ysql_manager_->CreateYbAdvisoryLocksTableIfNeeded(l.epoch()),
"Failed to create YB advisory locks table");

// TODO(auto-analyze, #19464): we allow enabling this service at runtime. We should also allow
// disabling this service at runtime i.e., the service should stop on the tserver hosting it
// when the flag is set to false.
Expand Down
47 changes: 47 additions & 0 deletions src/yb/master/ysql/ysql_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,27 @@

#include "yb/master/ysql/ysql_manager.h"

#include "yb/client/schema.h"
#include "yb/client/yb_table_name.h"
#include "yb/tserver/ysql_advisory_lock_table.h"

#include "yb/master/catalog_manager.h"
#include "yb/master/ysql/ysql_initdb_major_upgrade_handler.h"
#include "yb/util/flag_validators.h"
#include "yb/util/is_operation_done_result.h"

// TODO (mbautin, 2019-12): switch the default to true after updating all external callers
// (yb-ctl, YugaWare) and unit tests.
DEFINE_RUNTIME_bool(master_auto_run_initdb, false,
"Automatically run initdb on master leader initialization");

DEFINE_NON_RUNTIME_uint32(num_advisory_locks_tablets, 1,
"Number of advisory lock tablets. Must be set "
"before yb_enable_advisory_lock is set to true");
DEFINE_validator(num_advisory_locks_tablets, FLAG_GT_VALUE_VALIDATOR(0));

DECLARE_bool(yb_enable_advisory_lock);

namespace yb::master {

YsqlManager::YsqlManager(
Expand Down Expand Up @@ -160,4 +172,39 @@ Status YsqlManager::RollbackYsqlMajorCatalogVersion(
return Status::OK();
}

Status YsqlManager::CreateYbAdvisoryLocksTableIfNeeded(const LeaderEpoch& epoch) {
if (advisory_locks_table_created_ || !FLAGS_enable_ysql || !FLAGS_yb_enable_advisory_lock) {
return Status::OK();
}

TableProperties table_properties;
table_properties.SetTransactional(true);
client::YBSchemaBuilder schema_builder;
schema_builder.AddColumn("dbid")->Type(DataType::UINT32)->HashPrimaryKey();
schema_builder.AddColumn("classid")->Type(DataType::UINT32)->PrimaryKey();
schema_builder.AddColumn("objid")->Type(DataType::UINT32)->PrimaryKey();
schema_builder.AddColumn("objsubid")->Type(DataType::UINT32)->PrimaryKey();
schema_builder.SetTableProperties(table_properties);
client::YBSchema yb_schema;
CHECK_OK(schema_builder.Build(&yb_schema));

CreateTableRequestPB req;
CreateTableResponsePB resp;
req.set_name(kPgAdvisoryLocksTableName);
req.mutable_namespace_()->set_name(kSystemNamespaceName);
req.set_table_type(TableType::YQL_TABLE_TYPE);
req.set_num_tablets(FLAGS_num_advisory_locks_tablets);

auto schema = yb::client::internal::GetSchema(yb_schema);

SchemaToPB(schema, req.mutable_schema());

Status s = catalog_manager_.CreateTable(&req, &resp, /* RpcContext */ nullptr, epoch);
if (!s.ok() && !s.IsAlreadyPresent()) {
return s;
}
advisory_locks_table_created_ = true;
return Status::OK();
}

} // namespace yb::master
4 changes: 4 additions & 0 deletions src/yb/master/ysql/ysql_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class YsqlManager : public YsqlManagerIf {
RollbackYsqlMajorCatalogVersionResponsePB* resp, rpc::RpcContext* rpc,
const LeaderEpoch& epoch);

Status CreateYbAdvisoryLocksTableIfNeeded(const LeaderEpoch& epoch);

private:
Result<bool> StartRunningInitDbIfNeededInternal(const LeaderEpoch& epoch);

Expand All @@ -107,6 +109,8 @@ class YsqlManager : public YsqlManagerIf {
// This is used for tracking that initdb has started running previously.
std::atomic<bool> pg_proc_exists_{false};

bool advisory_locks_table_created_ = false;

DISALLOW_COPY_AND_ASSIGN(YsqlManager);
};

Expand Down
3 changes: 3 additions & 0 deletions src/yb/server/server_common_flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ DEFINE_RUNTIME_AUTO_PG_FLAG(
// ycql_ignore_group_by_error in introduced for the same functionality.
DEFINE_RUNTIME_AUTO_bool(ycql_suppress_group_by_error, kLocalVolatile, true, false,
"This flag is deprecated, please use ycql_ignore_group_by_error");

DEFINE_RUNTIME_bool(yb_enable_advisory_lock, false,
"Whether to enable advisory locks.");
3 changes: 2 additions & 1 deletion src/yb/tserver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ set(TSERVER_SRCS
xcluster_poller_id.cc
xcluster_poller_stats.cc
xcluster_output_client.cc
xcluster_write_implementations.cc)
xcluster_write_implementations.cc
ysql_advisory_lock_table.cc)

set(TSERVER_DEPS
protobuf
Expand Down
42 changes: 42 additions & 0 deletions src/yb/tserver/ysql_advisory_lock_table.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
//

#include <yb/tserver/ysql_advisory_lock_table.h>
#include "yb/client/yb_table_name.h"
#include "yb/client/client.h"
#include "yb/master/master_defaults.h"

DECLARE_bool(yb_enable_advisory_lock);

namespace yb {

YsqlAdvisoryLocksTable::YsqlAdvisoryLocksTable(client::YBClient& client)
: client_(client) {}

YsqlAdvisoryLocksTable::~YsqlAdvisoryLocksTable() {
}

Result<client::YBTablePtr> YsqlAdvisoryLocksTable::GetTable() {
SCHECK(FLAGS_yb_enable_advisory_lock, NotSupported, "Advisory locks are not enabled");
std::lock_guard<std::mutex> l(mutex_);
if (!table_) {
static const client::YBTableName table_name(
YQL_DATABASE_CQL, master::kSystemNamespaceName, kPgAdvisoryLocksTableName);
table_ = VERIFY_RESULT(client_.OpenTable(table_name));
}
return table_;
}

} // namespace yb
38 changes: 38 additions & 0 deletions src/yb/tserver/ysql_advisory_lock_table.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
//

#pragma once

#include "yb/client/client_fwd.h"

namespace yb {

constexpr char kPgAdvisoryLocksTableName[] = "pg_advisory_locks";

// Helper class for the advisory locks table.
class YsqlAdvisoryLocksTable {
public:
explicit YsqlAdvisoryLocksTable(client::YBClient& client);
~YsqlAdvisoryLocksTable();

Result<client::YBTablePtr> GetTable() EXCLUDES(mutex_);

private:
std::mutex mutex_;
client::YBTablePtr table_ GUARDED_BY(mutex_);;
client::YBClient& client_;
};

} // namespace yb

0 comments on commit 0d5728b

Please sign in to comment.