Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[catalog_manager] KUDU-3344 clean up deleted tables and tablets #3

Merged
merged 1 commit into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/kudu/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers)
}

Status KuduClient::ListTables(vector<string>* tables, const string& filter) {
tables->clear();
vector<Data::TableInfo> tables_info;
RETURN_NOT_OK(data_->ListTablesWithInfo(this, &tables_info, filter));
for (auto& info : tables_info) {
Expand Down
24 changes: 17 additions & 7 deletions src/kudu/integration-tests/master-stress-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <ostream>
#include <string>
#include <thread>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -116,7 +117,7 @@ static const MonoDelta kTransientStateBackoff = MonoDelta::FromMilliseconds(50);

// Parameterized based on HmsMode.
class MasterStressTest : public ExternalMiniClusterITestBase,
public ::testing::WithParamInterface<HmsMode> {
public ::testing::WithParamInterface<std::tuple<HmsMode, bool>> {
public:
MasterStressTest()
: done_(1),
Expand Down Expand Up @@ -146,10 +147,17 @@ class MasterStressTest : public ExternalMiniClusterITestBase,
opts.start_process_timeout = MonoDelta::FromSeconds(60);
opts.rpc_negotiation_timeout = MonoDelta::FromSeconds(30);

opts.hms_mode = GetParam();
opts.hms_mode = std::get<0>(GetParam());
// Tune down the notification log poll period in order to speed up catalog convergence.
opts.extra_master_flags.emplace_back("--hive_metastore_notification_log_poll_period_seconds=1");

if (std::get<1>(GetParam())) {
// Set shorter intervals to trigger frequent cleanup tasks.
opts.extra_master_flags.emplace_back("--enable_deleted_tables_and_tablets_cleanup=true");
opts.extra_master_flags.emplace_back("--catalog_manager_bg_task_wait_ms=10");
opts.extra_master_flags.emplace_back("--deleted_table_and_tablet_reserved_secs=1");
}

// Set max missed heartbeats periods to 1.0 (down from 3.0).
opts.extra_master_flags.emplace_back("--leader_failure_max_missed_heartbeat_periods=1.0");

Expand Down Expand Up @@ -205,7 +213,7 @@ class MasterStressTest : public ExternalMiniClusterITestBase,
new MasterServiceProxy(cluster_->messenger(), addr, addr.host()));
ASSERT_OK(CreateTabletServerMap(m_proxy, cluster_->messenger(), &ts_map_));

if (GetParam() == HmsMode::ENABLE_METASTORE_INTEGRATION) {
if (std::get<0>(GetParam()) == HmsMode::ENABLE_METASTORE_INTEGRATION) {
thrift::ClientOptions hms_opts;
hms_opts.service_principal = "hive";
hms_client_.reset(new HmsClient(cluster_->hms()->address(), hms_opts));
Expand Down Expand Up @@ -525,10 +533,12 @@ class MasterStressTest : public ExternalMiniClusterITestBase,
std::unordered_map<string, itest::TServerDetails*> ts_map_;
};

// Run the test with the HMS integration enabled and disabled.
INSTANTIATE_TEST_SUITE_P(HmsConfigurations, MasterStressTest, ::testing::ValuesIn(
vector<HmsMode> { HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION }
));
INSTANTIATE_TEST_SUITE_P(
CatalogManagerConfigurations,
MasterStressTest,
::testing::Combine(/*hms_mode*/ ::testing::ValuesIn(vector<HmsMode>{
HmsMode::NONE, HmsMode::ENABLE_METASTORE_INTEGRATION}),
/*enable_cleanup_deleted_table(t)s*/ ::testing::Bool()));

TEST_P(MasterStressTest, Test) {
OverrideFlagForSlowTests("num_create_table_threads", "10");
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/integration-tests/ts_tablet_manager-itest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ TEST_F(TsTabletManagerITest, TestTableStats) {
ASSERT_OK(l.first_failed_status());
// Get the TableInfo.
vector<scoped_refptr<TableInfo>> table_infos;
ASSERT_OK(catalog->GetAllTables(&table_infos));
NO_FATALS(catalog->GetAllTables(&table_infos));
ASSERT_EQ(1, table_infos.size());
// Run the check function.
NO_FATALS(check_function(table_infos[0].get(), live_row_count));
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/master/auto_rebalancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ Status AutoRebalancerTask::BuildClusterRawInfo(
{
CatalogManager::ScopedLeaderSharedLock leader_lock(catalog_manager_);
RETURN_NOT_OK(leader_lock.first_failed_status());
RETURN_NOT_OK(catalog_manager_->GetAllTables(&table_infos));
catalog_manager_->GetAllTables(&table_infos);
}

table_summaries.reserve(table_infos.size());
Expand Down
180 changes: 169 additions & 11 deletions src/kudu/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,25 @@ DEFINE_double(table_write_limit_ratio, 0.95,
"Set the ratio of how much write limit can be reached");
TAG_FLAG(table_write_limit_ratio, experimental);

DEFINE_bool(enable_deleted_tables_and_tablets_cleanup, false,
"Whether to clean up deleted tables and tablets from master's in-memory map and the "
"'sys.catalog' table.");
TAG_FLAG(enable_deleted_tables_and_tablets_cleanup, experimental);
TAG_FLAG(enable_deleted_tables_and_tablets_cleanup, runtime);

DEFINE_int32(deleted_table_and_tablet_reserved_secs, 60 * 60,
"After this amount of time, a deleted table/tablet could be actually removed by "
"catalog manager if --enable_deleted_tables_and_tablets_cleanup=true. ");
TAG_FLAG(deleted_table_and_tablet_reserved_secs, experimental);
TAG_FLAG(deleted_table_and_tablet_reserved_secs, runtime);

DEFINE_bool(enable_chucked_tablet_writes, true,
"Whether to split tablet actions into chunks when persisting them in sys.catalog "
"table. If disabled, any update of the sys.catalog table would be rejected if exceeds "
"--rpc_max_message_size.");
TAG_FLAG(enable_chucked_tablet_writes, experimental);
TAG_FLAG(enable_chucked_tablet_writes, runtime);

DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int64(tsk_rotation_seconds);
DECLARE_string(ranger_config_path);
Expand Down Expand Up @@ -778,6 +797,25 @@ void CatalogManagerBgTasks::Run() {
}
}

if (FLAGS_enable_deleted_tables_and_tablets_cleanup) {
vector<scoped_refptr<TableInfo>> tables_to_delete;
vector<scoped_refptr<TabletInfo>> tablets_to_delete;
catalog_manager_->ExtractDeletedTablesAndTablets(&tables_to_delete, &tablets_to_delete);
Status s = Status::OK();
// Cleaning up deleted tablets and tables in the reverse order of loading them,
// in order to avoid the case where we failed to remove a tablet but its table has
// been removed.
if (!tablets_to_delete.empty()) {
s = catalog_manager_->ProcessDeletedTablets(tablets_to_delete);
}
if (s.ok() && !tables_to_delete.empty()) {
s = catalog_manager_->ProcessDeletedTables(tables_to_delete);
}
if (!s.ok()) {
LOG(ERROR) << "Error processing tables/tablets deletions: " << s.ToString();
}
}

// If this is the leader master, check if it's time to generate
// and store a new TSK (Token Signing Key).
Status s = catalog_manager_->TryGenerateNewTskUnlocked();
Expand Down Expand Up @@ -2393,8 +2431,10 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
}

TRACE("Modifying in-memory table state");
string deletion_msg = "Table deleted at " + LocalTimeAsString();
const time_t timestamp = time(nullptr);
string deletion_msg = "Table deleted at " + TimestampAsString(timestamp);
l.mutable_data()->set_state(SysTablesEntryPB::REMOVED, deletion_msg);
l.mutable_data()->pb.set_delete_timestamp(timestamp);

// 2. Look up the tablets, lock them, and mark them as deleted.
{
Expand All @@ -2408,6 +2448,7 @@ Status CatalogManager::DeleteTable(const DeleteTableRequestPB& req,
for (const auto& t : tablets) {
t->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
t->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}

// 3. Update sys-catalog with the removed table and tablet state.
Expand Down Expand Up @@ -3207,7 +3248,8 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
LocalTimeAsString()));
}

const string deletion_msg = "Partition dropped at " + LocalTimeAsString();
const time_t timestamp = time(nullptr);
const string deletion_msg = "Partition dropped at " + TimestampAsString(timestamp);
TabletMetadataGroupLock tablets_to_add_lock(LockMode::WRITE);
TabletMetadataGroupLock tablets_to_drop_lock(LockMode::RELEASED);

Expand All @@ -3229,6 +3271,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB& req,
for (auto& tablet : tablets_to_drop) {
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::DELETED, deletion_msg);
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);
}
actions.tablets_to_update = tablets_to_drop;

Expand Down Expand Up @@ -3472,7 +3515,9 @@ Status CatalogManager::ListTables(const ListTablesRequestPB* req,
}
}
InsertOrUpdate(&table_info_by_name, table_name, table_info);
EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
if (user) {
EmplaceIfNotPresent(&table_owner_map, table_name, owner == *user);
}
}

MAYBE_INJECT_FIXED_LATENCY(FLAGS_catalog_manager_inject_latency_list_authz_ms);
Expand Down Expand Up @@ -3626,14 +3671,20 @@ Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableI
return Status::OK();
}

Status CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
void CatalogManager::GetAllTables(vector<scoped_refptr<TableInfo>>* tables) {
leader_lock_.AssertAcquiredForReading();

tables->clear();
shared_lock<LockType> l(lock_);
AppendValuesFromMap(table_ids_map_, tables);
}

return Status::OK();
void CatalogManager::GetAllTablets(vector<scoped_refptr<TabletInfo>>* tablets) {
leader_lock_.AssertAcquiredForReading();

tablets->clear();
shared_lock<LockType> l(lock_);
AppendValuesFromMap(tablet_map_, tablets);
}

Status CatalogManager::TableNameExists(const string& table_name, bool* exists) {
Expand Down Expand Up @@ -4646,9 +4697,9 @@ Status CatalogManager::ProcessTabletReport(
// It'd be unsafe to ask the tserver to delete this tablet without first
// replicating something to our followers (i.e. to guarantee that we're
// the leader). For example, if we were a rogue master, we might be
// deleting a tablet created by a new master accidentally. But masters
// retain metadata for deleted tablets forever, so a tablet can only be
// truly unknown in the event of a serious misconfiguration, such as a
// deleting a tablet created by a new master accidentally. Though masters
// don't always retain metadata for deleted tablets forever, a tablet
// may be unknown in the event of a serious misconfiguration, such as a
// tserver heartbeating to the wrong cluster. Therefore, it should be
// reasonable to ignore it and wait for an operator fix the situation.
LOG(WARNING) << "Ignoring report from unknown tablet " << tablet_id;
Expand Down Expand Up @@ -5140,6 +5191,26 @@ void CatalogManager::ExtractTabletsToProcess(
}
}

void CatalogManager::ExtractDeletedTablesAndTablets(
vector<scoped_refptr<TableInfo>>* tables_to_delete,
vector<scoped_refptr<TabletInfo>>* tablets_to_delete) {
shared_lock<LockType> l(lock_);
for (const auto& table_entry : table_ids_map_) {
scoped_refptr<TableInfo> table = table_entry.second;
TableMetadataLock table_lock(table.get(), LockMode::READ);
if (table_lock.data().is_deleted()) {
tables_to_delete->emplace_back(table);
}
}
for (const auto& tablet_entry : tablet_map_) {
scoped_refptr<TabletInfo> tablet = tablet_entry.second;
TabletMetadataLock tablet_lock(tablet.get(), LockMode::READ);
if (tablet_lock.data().is_deleted()) {
tablets_to_delete->emplace_back(tablet);
}
}
}

// Check if it's time to roll TokenSigner's key. There's a bit of subtlety here:
// we shouldn't start exporting a key until it is properly persisted.
// So, the protocol is:
Expand Down Expand Up @@ -5256,10 +5327,11 @@ void CatalogManager::HandleAssignCreatingTablet(const scoped_refptr<TabletInfo>&
tablet->ToString(), replacement->id());

// Mark old tablet as replaced.
const time_t timestamp = time(nullptr);
tablet->mutable_metadata()->mutable_dirty()->set_state(
SysTabletsEntryPB::REPLACED,
Substitute("Replaced by $0 at $1",
replacement->id(), LocalTimeAsString()));
SysTabletsEntryPB::REPLACED,
Substitute("Replaced by $0 at $1", replacement->id(), TimestampAsString(timestamp)));
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(timestamp);

// Mark new tablet as being created.
replacement->mutable_metadata()->mutable_dirty()->set_state(
Expand Down Expand Up @@ -5421,6 +5493,7 @@ Status CatalogManager::ProcessPendingAssignments(
for (const auto& tablet : deferred.tablets_to_update) {
TabletMetadataLock l(tablet.get(), LockMode::READ);
if (l.data().is_deleted()) {
tablet->table()->AddRemoveTablets({}, {tablet});
SendDeleteTabletRequest(tablet, l, l.data().pb.state_msg());
}
}
Expand Down Expand Up @@ -5503,6 +5576,90 @@ void CatalogManager::SendCreateTabletRequest(const scoped_refptr<TabletInfo>& ta
}
}

Status CatalogManager::ProcessDeletedTablets(const vector<scoped_refptr<TabletInfo>>& tablets) {
TabletMetadataGroupLock tablets_lock(LockMode::RELEASED);
tablets_lock.AddMutableInfos(tablets);
tablets_lock.Lock(LockMode::WRITE);

SysCatalogTable::Actions actions;
vector<scoped_refptr<TabletInfo>> tablets_to_clean_up;
const time_t now = time(nullptr);
for (const auto& tablet : tablets) {
DCHECK(tablet->metadata().state().is_deleted());
if (!tablet->metadata().state().pb.has_delete_timestamp()) {
// Set current timestamp as delete timestamp of the tablet.
tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(now);
actions.tablets_to_update.emplace_back(tablet);
} else if (now - tablet->metadata().state().pb.delete_timestamp() >
FLAGS_deleted_table_and_tablet_reserved_secs) {
tablets_to_clean_up.emplace_back(tablet);
actions.tablets_to_delete.emplace_back(tablet);
}
}
// Persist the changes to the sys.catalog table.
const auto write_mode = FLAGS_enable_chucked_tablet_writes ? SysCatalogTable::WriteMode::CHUNKED
: SysCatalogTable::WriteMode::ATOMIC;
Status s = sys_catalog_->Write(std::move(actions), write_mode);
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndPrepend("an error occurred while writing to the sys-catalog");
LOG(WARNING) << s.ToString();
return s;
}
// Remove expired tablets from the global map.
{
std::lock_guard<LockType> l(lock_);
for (const auto& t : tablets_to_clean_up) {
DCHECK(ContainsKey(tablet_map_, t->id()));
tablet_map_.erase(t->id());
VLOG(1) << "Cleaned up deleted tablet: " << t->id();
}
}

tablets_lock.Commit();
return Status::OK();
}

Status CatalogManager::ProcessDeletedTables(const vector<scoped_refptr<TableInfo>>& tables) {
TableMetadataGroupLock tables_lock(LockMode::RELEASED);
tables_lock.AddMutableInfos(tables);
tables_lock.Lock(LockMode::WRITE);

const time_t now = time(nullptr);
bool needs_cleanup = false;
for (const auto& table : tables) {
DCHECK(table->metadata().state().is_deleted());
SysCatalogTable::Actions action;
if (!table->metadata().state().pb.has_delete_timestamp()) {
// Set current timestamp as delete timestamp of the table.
table->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(now);
action.table_to_update = table;
} else if (now - table->metadata().state().pb.delete_timestamp() >
FLAGS_deleted_table_and_tablet_reserved_secs) {
needs_cleanup = true;
action.table_to_delete = table;
}

Status s = sys_catalog_->Write(std::move(action));
if (PREDICT_FALSE(!s.ok())) {
s = s.CloneAndPrepend("an error occurred while writing to the sys-catalog");
LOG(WARNING) << s.ToString();
return s;
}

if (!needs_cleanup) {
continue;
}

std::lock_guard<LockType> l(lock_);
DCHECK(ContainsKey(table_ids_map_, table->id()));
table_ids_map_.erase(table->id());
VLOG(1) << "Cleaned up deleted table: " << table->ToString();
}

tables_lock.Commit();
return Status::OK();
}

Status CatalogManager::BuildLocationsForTablet(
const scoped_refptr<TabletInfo>& tablet,
ReplicaTypeFilter filter,
Expand Down Expand Up @@ -5663,6 +5820,7 @@ Status CatalogManager::ReplaceTablet(const string& tablet_id, ReplaceTabletRespo
const string replace_msg = Substitute("replaced by tablet $0", new_tablet->id());
old_tablet->mutable_metadata()->mutable_dirty()->set_state(SysTabletsEntryPB::DELETED,
replace_msg);
old_tablet->mutable_metadata()->mutable_dirty()->pb.set_delete_timestamp(time(nullptr));

// Persist the changes to the syscatalog table.
{
Expand Down
Loading