Skip to content

Commit

Permalink
[#23905] DocDB: Persistence for Master side Table/Object locks
Browse files Browse the repository at this point in the history
Summary:
Implements persistence for master side DDL lock acquistion and release.

Handles restoring/rebuilding the Lock state at the master after master-failover/restart.

Handle bootstrapping the new TServer with the locks already taken.

----

Does not handle release of locks for a "dead" TServer.
Jira: DB-12809

Test Plan: yb_build.sh --cxx-test object_lock-test

Reviewers: bkolagani, zdrudi

Reviewed By: zdrudi

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D36794
  • Loading branch information
amitanandaiyer committed Sep 18, 2024
1 parent 10a629e commit 5f95ff9
Show file tree
Hide file tree
Showing 29 changed files with 870 additions and 361 deletions.
121 changes: 114 additions & 7 deletions src/yb/integration-tests/object_lock-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ constexpr uint64_t kSessionId = 1;
constexpr uint64_t kSessionId2 = 2;
constexpr uint64_t kDatabaseID = 1;
constexpr uint64_t kObjectId = 1;
constexpr uint64_t kObjectId2 = 2;
constexpr size_t kTimeoutMs = 5000;

tserver::AcquireObjectLockRequestPB AcquireRequestFor(
Expand Down Expand Up @@ -198,6 +199,16 @@ TEST_F(ObjectLockTest, AcquireObjectLocksWaitsOnTServer) {
ASSERT_EQ(tserver0->server()->ts_local_lock_manager()->TEST_WaitingLocksSize(), 0);
}

TEST_F(ObjectLockTest, AcquireAndReleaseDDLLock) {
auto master_proxy = ASSERT_RESULT(MasterLeaderProxy());
ASSERT_OK(AcquireLockAt(
&master_proxy, kSessionId2, kDatabaseID, kObjectId, TableLockType::ACCESS_EXCLUSIVE));
ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionId2, kDatabaseID, kObjectId));

// Release non-existent lock.
ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionId2, kDatabaseID, kObjectId2));
}

TEST_F(ObjectLockTest, AcquireObjectLocksRetriesUponMultipleTServerAddition) {
auto* tserver0 = cluster_->mini_tablet_server(0);
auto tserver0_proxy = TServerProxyFor(tserver0);
Expand All @@ -219,12 +230,11 @@ TEST_F(ObjectLockTest, AcquireObjectLocksRetriesUponMultipleTServerAddition) {
},
MonoDelta::FromMilliseconds(kTimeoutMs), "wait for blocking on TServer0"));

// Expect to see that the lock acquisition happens even at the new tserver
auto num_ts = cluster_->num_tablet_servers();
ASSERT_OK(cluster_->AddTabletServer());
ASSERT_OK(cluster_->WaitForTabletServerCount(num_ts + 1));

// Add TS-4
// Add TS-4.
auto* added_tserver1 = cluster_->mini_tablet_server(num_ts);
ASSERT_EQ(added_tserver1->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), 0);
auto added_tserver1_proxy = TServerProxyFor(added_tserver1);
Expand Down Expand Up @@ -255,13 +265,110 @@ TEST_F(ObjectLockTest, AcquireObjectLocksRetriesUponMultipleTServerAddition) {
// Add TS-5
ASSERT_OK(cluster_->AddTabletServer());
ASSERT_OK(cluster_->WaitForTabletServerCount(num_ts + 2));
auto* added_tserver2 = cluster_->mini_tablet_server(num_ts + 1);
ASSERT_EQ(added_tserver2->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), 0);

// TS-5 was added after the lock acquisition was complete. Unless we add master persistence
// and bootstrapping the lock manager during TSRegistration, we expect to see no locks on ts-5
ASSERT_EQ(added_tserver2->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), 0);
auto* added_tserver2 = cluster_->mini_tablet_server(num_ts + 1);
ASSERT_OK(WaitFor(
[added_tserver2]() {
return added_tserver2->server()->ts_local_lock_manager()->TEST_GrantedLocksSize() > 0;
},
1s, "Wait for the added TS to bootstrap"));
// DDL lock acquisition should have bootstrapped during registration and taken the lock on TS-5
// also
ASSERT_GE(added_tserver2->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), 1);
ASSERT_EQ(added_tserver2->server()->ts_local_lock_manager()->TEST_WaitingLocksSize(), 0);
}

TEST_F(ObjectLockTest, BootstrapTServersUponAddition) {
auto master_proxy = ASSERT_RESULT(MasterLeaderProxy());
ASSERT_OK(AcquireLockAt(
&master_proxy, kSessionId2, kDatabaseID, kObjectId, TableLockType::ACCESS_EXCLUSIVE));

auto num_ts = cluster_->num_tablet_servers();
ASSERT_OK(cluster_->AddTabletServer());
ASSERT_OK(cluster_->WaitForTabletServerCount(num_ts + 1));

auto* added_tserver = cluster_->mini_tablet_server(num_ts);
ASSERT_OK(WaitFor(
[added_tserver]() {
return added_tserver->server()->ts_local_lock_manager()->TEST_GrantedLocksSize() > 0;
},
1s, "Wait for the added TS to bootstrap"));

auto expected_locks =
cluster_->mini_tablet_server(0)->server()->ts_local_lock_manager()->TEST_GrantedLocksSize();
ASSERT_GE(expected_locks, 1);
// Expect to see that the lock acquisition happens even at the new tserver
LOG(INFO) << "Counts after acquiring the DDL lock and adding TServers";
for (auto ts : cluster_->mini_tablet_servers()) {
LOG(INFO) << ts->ToString() << " TestWaitingLocksSize: "
<< ts->server()->ts_local_lock_manager()->TEST_WaitingLocksSize()
<< " TestGrantedLocksSize: "
<< ts->server()->ts_local_lock_manager()->TEST_GrantedLocksSize();
ASSERT_EQ(ts->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), expected_locks);
}

ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionId2, kDatabaseID, kObjectId));

LOG(INFO) << "Counts after releasing the DDL lock";
expected_locks = 0;
for (auto ts : cluster_->mini_tablet_servers()) {
LOG(INFO) << ts->ToString() << " TestWaitingLocksSize: "
<< ts->server()->ts_local_lock_manager()->TEST_WaitingLocksSize()
<< " TestGrantedLocksSize: "
<< ts->server()->ts_local_lock_manager()->TEST_GrantedLocksSize();
ASSERT_EQ(ts->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), expected_locks);
}
}

class MultiMasterObjectLockTest : public ObjectLockTest {
protected:
int num_masters() override {
return 3;
}
};

TEST_F_EX(ObjectLockTest, AcquireAndReleaseDDLLockAcrossMasterFailover, MultiMasterObjectLockTest) {
const auto num_ts = cluster_->num_tablet_servers();
auto* leader_master1 = ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
{
LOG(INFO) << "Acquiring lock on object " << kObjectId << " from master "
<< leader_master1->ToString();
auto master_proxy = MasterProxy(leader_master1);
ASSERT_OK(AcquireLockAt(
&master_proxy, kSessionId2, kDatabaseID, kObjectId, TableLockType::ACCESS_EXCLUSIVE));
}

for (const auto& tserver : cluster_->mini_tablet_servers()) {
LOG(INFO) << tserver->ToString() << " GrantedLocks "
<< tserver->server()->ts_local_lock_manager()->TEST_GrantedLocksSize();
ASSERT_GE(tserver->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), 1);
}

LOG(INFO) << "Stepping down from " << leader_master1->ToString();
ASSERT_OK(cluster_->StepDownMasterLeader());
ASSERT_OK(cluster_->WaitForTabletServerCount(num_ts));

ASSERT_OK(cluster_->AddTabletServer());
ASSERT_OK(cluster_->WaitForTabletServerCount(num_ts + 1));

auto* added_tserver = cluster_->mini_tablet_server(num_ts);
ASSERT_OK(WaitFor(
[added_tserver]() {
return added_tserver->server()->ts_local_lock_manager()->TEST_GrantedLocksSize() > 0;
},
1s, "Wait for the added TS to bootstrap"));
LOG(INFO) << added_tserver->ToString() << " GrantedLocks "
<< added_tserver->server()->ts_local_lock_manager()->TEST_GrantedLocksSize();
ASSERT_GE(added_tserver->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), 1);

// Release lock
auto* leader_master2 = ASSERT_RESULT(cluster_->GetLeaderMiniMaster());
{
LOG(INFO) << "Releasing lock on object " << kObjectId << " at master "
<< leader_master2->ToString();
auto master_proxy = MasterProxy(leader_master2);
ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionId2, kDatabaseID, kObjectId));
}
}

} // namespace yb
2 changes: 1 addition & 1 deletion src/yb/master/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ set(MASTER_SRCS
mini_master.cc
master_snapshot_coordinator.cc
multi_step_monitored_task.cc
object_lock.cc
object_lock_info_manager.cc
post_tablet_create_task_base.cc
restoration_state.cc
restore_sys_catalog_state.cc
Expand Down
19 changes: 19 additions & 0 deletions src/yb/master/catalog_entity_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,25 @@ class UDTypeInfo : public RefCountedThreadSafe<UDTypeInfo>,
DISALLOW_COPY_AND_ASSIGN(UDTypeInfo);
};

// This wraps around the proto containing information about what locks have been taken.
// It will be used for LockObject persistence.
struct PersistentObjectLockInfo : public Persistent<SysObjectLockEntryPB> {};

class ObjectLockInfo : public MetadataCowWrapper<PersistentObjectLockInfo> {
public:
explicit ObjectLockInfo(const std::string& ts_uuid) : ts_uuid_(ts_uuid) {}
~ObjectLockInfo() = default;

// Return the user defined type's ID. Does not require synchronization.
virtual const std::string& id() const override { return ts_uuid_; }

private:
// The ID field is used in the sys_catalog table.
const std::string ts_uuid_;

DISALLOW_COPY_AND_ASSIGN(ObjectLockInfo);
};

// This wraps around the proto containing cluster level config information. It will be used for
// CowObject managed access.
struct PersistentClusterConfigInfo : public Persistent<SysClusterConfigEntryPB> {};
Expand Down
23 changes: 23 additions & 0 deletions src/yb/master/catalog_entity_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import "yb/cdc/xcluster_producer.proto";
import "yb/common/common.proto";
import "yb/common/common_net.proto";
import "yb/common/common_types.proto";
import "yb/common/transaction.proto";
import "yb/common/wire_protocol.proto";
import "yb/consensus/metadata.proto";
import "yb/master/master_types.proto";
Expand Down Expand Up @@ -392,6 +393,28 @@ message SysClusterConfigEntryPB {
optional string universe_uuid = 8;
}

message SysObjectLockEntryPB {
message LockTypesPB { repeated TableLockType lock_type = 1; }
message ObjectLocksMapPB {
// object_id -> Locks taken
map<uint64, LockTypesPB> objects = 1;
}
message DBObjectsMapPB {
// db_id -> map of objects/locks-taken
map<uint64, ObjectLocksMapPB> dbs = 1;
}
message SessionDBMapPB {
// session_id -> DB ...
map<uint64, DBObjectsMapPB> sessions = 1;
}

// host_uid is part of the key. Thus not stored
// explicitly in this proto again.

// incarnation id -> Session ...
map<uint64, SessionDBMapPB> incarnations = 1;
}

message SysXClusterConfigEntryPB {
optional uint32 version = 1;
optional xcluster.ProducerRegistryPB xcluster_producer_registry = 2;
Expand Down
3 changes: 2 additions & 1 deletion src/yb/master/catalog_entity_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class SysRowEntry;
((UNIVERSE_REPLICATION_BOOTSTRAP, SysUniverseReplicationBootstrapEntryPB)) \
((XCLUSTER_OUTBOUND_REPLICATION_GROUP, SysXClusterOutboundReplicationGroupEntryPB)) \
((CLONE_STATE, SysCloneStatePB)) \
((TSERVER_REGISTRATION, SysTServerEntryPB))
((TSERVER_REGISTRATION, SysTServerEntryPB)) \
((OBJECT_LOCK_ENTRY, SysObjectLockEntryPB))

// We should have an entry for each SysRowEntryType in the map except for UNKNOWN.
static_assert(
Expand Down
14 changes: 14 additions & 0 deletions src/yb/master/catalog_loaders.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,20 @@ Status UDTypeLoader::Visit(const UDTypeId& udtype_id, const SysUDTypeEntryPB& me
return Status::OK();
}

// key corresponds to the host_uuid.
Status ObjectLockLoader::Visit(const std::string& host_uuid, const SysObjectLockEntryPB& pb) {
std::shared_ptr<ObjectLockInfo> info = std::make_shared<ObjectLockInfo>(host_uuid);
{
auto l = info->LockForWrite();
l.mutable_data()->pb.CopyFrom(pb);
l.Commit();
catalog_manager_->object_lock_info_manager_->InsertOrAssign(host_uuid, info);
}

LOG(INFO) << "Loaded metadata for type " << info->ToString();
VLOG(1) << "Metadata for type " << info->ToString() << ": " << pb.ShortDebugString();
return Status::OK();
}
////////////////////////////////////////////////////////////
// Config Loader
////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions src/yb/master/catalog_loaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ DECLARE_LOADER_CLASS(Namespace, NamespaceId, SysNamespaceEntryPB, catalo
DECLARE_LOADER_CLASS(UDType, UDTypeId, SysUDTypeEntryPB, catalog_manager_->mutex_);
DECLARE_LOADER_CLASS(ClusterConfig, std::string, SysClusterConfigEntryPB, catalog_manager_->mutex_);
DECLARE_LOADER_CLASS(RedisConfig, std::string, SysRedisConfigEntryPB, catalog_manager_->mutex_);
DECLARE_LOADER_CLASS(ObjectLock, std::string, SysObjectLockEntryPB, catalog_manager_->mutex_);
DECLARE_LOADER_CLASS(Role, RoleName, SysRoleEntryPB,
catalog_manager_->permissions_manager()->mutex());
DECLARE_LOADER_CLASS(SysConfig, std::string, SysConfigEntryPB,
Expand Down
25 changes: 18 additions & 7 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
#include "yb/master/async_rpc_tasks.h"
#include "yb/master/backfill_index.h"
#include "yb/master/catalog_entity_info.h"
#include "yb/master/catalog_entity_info.pb.h"
#include "yb/master/catalog_entity_parser.h"
#include "yb/master/catalog_loaders.h"
#include "yb/master/catalog_manager-internal.h"
Expand All @@ -140,7 +141,7 @@
#include "yb/master/master_replication.pb.h"
#include "yb/master/master_snapshot_coordinator.h"
#include "yb/master/master_util.h"
#include "yb/master/object_lock.h"
#include "yb/master/object_lock_info_manager.h"
#include "yb/master/permissions_manager.h"
#include "yb/master/post_tablet_create_task_base.h"
#include "yb/master/scoped_leader_shared_lock-internal.h"
Expand Down Expand Up @@ -932,6 +933,7 @@ CatalogManager::CatalogManager(Master* master)
leader_lock_(RWMutex::Priority::PREFER_WRITING),
load_balance_policy_(std::make_unique<ClusterLoadBalancer>(this)),
tablegroup_manager_(std::make_unique<YsqlTablegroupManager>()),
object_lock_info_manager_(std::make_unique<ObjectLockInfoManager>(master_, this)),
permissions_manager_(std::make_unique<PermissionsManager>(this)),
tasks_tracker_(new TasksTracker(IsUserInitiated::kFalse)),
jobs_tracker_(new TasksTracker(IsUserInitiated::kTrue)),
Expand Down Expand Up @@ -1412,6 +1414,9 @@ Status CatalogManager::RunLoaders(SysCatalogLoadingState* state) {
// Clear redis config mapping.
redis_config_map_.clear();

// Clear Object lock mapping.
object_lock_info_manager_->Clear();

// Clear ysql catalog config.
ysql_catalog_config_.reset();

Expand Down Expand Up @@ -1458,6 +1463,7 @@ Status CatalogManager::RunLoaders(SysCatalogLoadingState* state) {
RETURN_NOT_OK(Load<UDTypeLoader>("user-defined types", state));
RETURN_NOT_OK(Load<ClusterConfigLoader>("cluster configuration", state));
RETURN_NOT_OK(Load<RedisConfigLoader>("Redis config", state));
RETURN_NOT_OK(Load<ObjectLockLoader>("Object locks", state));

if (!transaction_tables_config_) {
RETURN_NOT_OK(InitializeTransactionTablesConfig(state->epoch.leader_term));
Expand Down Expand Up @@ -5346,6 +5352,7 @@ std::string CatalogManager::GenerateIdUnlocked(
case SysRowEntryType::UNIVERSE_REPLICATION_BOOTSTRAP: FALLTHROUGH_INTENDED;
case SysRowEntryType::XCLUSTER_OUTBOUND_REPLICATION_GROUP: FALLTHROUGH_INTENDED;
case SysRowEntryType::TSERVER_REGISTRATION: FALLTHROUGH_INTENDED;
case SysRowEntryType::OBJECT_LOCK_ENTRY: FALLTHROUGH_INTENDED;
case SysRowEntryType::UNKNOWN:
LOG(DFATAL) << "Invalid id type: " << *entity_type;
return id;
Expand Down Expand Up @@ -6161,29 +6168,33 @@ Status CatalogManager::DeleteIndexInfoFromTable(
}

void CatalogManager::AcquireObjectLocks(
const tserver::AcquireObjectLockRequestPB* req, tserver::AcquireObjectLockResponsePB* resp,
rpc::RpcContext rpc) {
LeaderEpoch epoch, const tserver::AcquireObjectLockRequestPB* req,
tserver::AcquireObjectLockResponsePB* resp, rpc::RpcContext rpc) {
VLOG(0) << __PRETTY_FUNCTION__;
if (!FLAGS_TEST_enable_object_locking_for_table_locks) {
rpc.RespondRpcFailure(
rpc::ErrorStatusPB::ERROR_APPLICATION,
STATUS(NotSupported, "Flag enable_object_locking_for_table_locks disabled"));
return;
}
LockObject(master_, this, req, resp, std::move(rpc));
object_lock_info_manager_->LockObject(epoch, req, resp, std::move(rpc));
}

void CatalogManager::ReleaseObjectLocks(
const tserver::ReleaseObjectLockRequestPB* req, tserver::ReleaseObjectLockResponsePB* resp,
rpc::RpcContext rpc) {
LeaderEpoch epoch, const tserver::ReleaseObjectLockRequestPB* req,
tserver::ReleaseObjectLockResponsePB* resp, rpc::RpcContext rpc) {
VLOG(0) << __PRETTY_FUNCTION__;
if (!FLAGS_TEST_enable_object_locking_for_table_locks) {
rpc.RespondRpcFailure(
rpc::ErrorStatusPB::ERROR_APPLICATION,
STATUS(NotSupported, "Flag enable_object_locking_for_table_locks disabled"));
return;
}
UnlockObject(master_, this, req, resp, std::move(rpc));
object_lock_info_manager_->UnlockObject(epoch, req, resp, std::move(rpc));
}

void CatalogManager::ExportObjectLockInfo(tserver::DdlLockEntriesPB* resp) {
object_lock_info_manager_->ExportObjectLockInfo(resp);
}

Status CatalogManager::GetIndexBackfillProgress(const GetIndexBackfillProgressRequestPB* req,
Expand Down
13 changes: 9 additions & 4 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
#include "yb/master/master_encryption.fwd.h"
#include "yb/master/master_heartbeat.pb.h"
#include "yb/master/master_types.h"
#include "yb/master/object_lock_info_manager.h"
#include "yb/master/scoped_leader_shared_lock.h"
#include "yb/master/snapshot_coordinator_context.h"
#include "yb/master/sys_catalog.h"
Expand Down Expand Up @@ -406,11 +407,12 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const LeaderEpoch& epoch);

void AcquireObjectLocks(
const tserver::AcquireObjectLockRequestPB* req, tserver::AcquireObjectLockResponsePB* resp,
rpc::RpcContext rpc);
LeaderEpoch epoch, const tserver::AcquireObjectLockRequestPB* req,
tserver::AcquireObjectLockResponsePB* resp, rpc::RpcContext rpc);
void ReleaseObjectLocks(
const tserver::ReleaseObjectLockRequestPB* req, tserver::ReleaseObjectLockResponsePB* resp,
rpc::RpcContext rpc);
LeaderEpoch epoch, const tserver::ReleaseObjectLockRequestPB* req,
tserver::ReleaseObjectLockResponsePB* resp, rpc::RpcContext rpc);
void ExportObjectLockInfo(tserver::DdlLockEntriesPB* resp);

// Gets the progress of ongoing index backfills.
Status GetIndexBackfillProgress(const GetIndexBackfillProgressRequestPB* req,
Expand Down Expand Up @@ -1684,6 +1686,7 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
friend class BackendsCatalogVersionJob;
friend class AddTableToXClusterTargetTask;
friend class VerifyDdlTransactionTask;
friend class ObjectLockLoader;

FRIEND_TEST(yb::MasterPartitionedTest, VerifyOldLeaderStepsDown);

Expand Down Expand Up @@ -2393,6 +2396,8 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
std::unique_ptr<YsqlTablegroupManager> tablegroup_manager_
GUARDED_BY(mutex_);

std::unique_ptr<ObjectLockInfoManager> object_lock_info_manager_;

boost::optional<std::future<Status>> initdb_future_;
boost::optional<InitialSysCatalogSnapshotWriter> initial_snapshot_writer_;

Expand Down
Loading

0 comments on commit 5f95ff9

Please sign in to comment.