Skip to content

Commit

Permalink
[maintenance] Add privilege maintenance thread pool for privilege tab…
Browse files Browse the repository at this point in the history
…les and tablets

This commit add a privilege thread pool in maintenance manager for
privilege tables and tablets.

In a Kudu cluster with thousands of tables and tablets, it's hard for
a specified tablet's maintenance OPs to be launched when their scores
are not the highest, even if the table the tablet belongs to is high
priority for Kudu users. This patch allow user to specify privilege
tables and tablets by gflags, these maintenance OPs of these privilege
tables and tablets can be launched in a privilege thread pool, so they
can have greater chance to be launched.

Change-Id: I3ea3b73505157678a8fb551656123b64e6bfb304
  • Loading branch information
acelyc111 committed May 8, 2019
1 parent 7fffbc8 commit 15868cf
Show file tree
Hide file tree
Showing 15 changed files with 443 additions and 152 deletions.
1 change: 1 addition & 0 deletions src/kudu/tablet/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ class Tablet {
// This method is thread-safe.
void CancelMaintenanceOps();

const std::string& table_id() const { return metadata_->table_id(); }
const std::string& tablet_id() const { return metadata_->tablet_id(); }

// Return the metrics for this tablet.
Expand Down
2 changes: 1 addition & 1 deletion src/kudu/tablet/tablet_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class TabletMetadata : public RefCountedThreadSafe<TabletMetadata> {
return partition_;
}

std::string table_id() const {
const std::string& table_id() const {
DCHECK_NE(state_, kNotLoadedYet);
return table_id_;
}
Expand Down
3 changes: 1 addition & 2 deletions src/kudu/tablet/tablet_mm_ops-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,14 @@ class KuduTabletMmOpsTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
void StatsShouldChange(MaintenanceOp* op) {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
ASSERT_TRUE(next_time_ < stats_.last_modified());
ASSERT_LT(next_time_, stats_.last_modified());
next_time_ = stats_.last_modified();
}

void StatsShouldNotChange(MaintenanceOp* op) {
SleepFor(MonoDelta::FromMilliseconds(1));
op->UpdateStats(&stats_);
ASSERT_EQ(next_time_, stats_.last_modified());
next_time_ = stats_.last_modified();
}

void TestFirstCall(MaintenanceOp* op) {
Expand Down
18 changes: 13 additions & 5 deletions src/kudu/tablet/tablet_mm_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ string TabletOpBase::LogPrefix() const {
return tablet_->LogPrefix();
}

const std::string& TabletOpBase::table_id() const {
return tablet_->table_id();
}

const std::string& TabletOpBase::tablet_id() const {
return tablet_->tablet_id();
}

////////////////////////////////////////////////////////////
// CompactRowSetsOp
////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -262,12 +270,12 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
// cached stats.
TabletMetrics* metrics = tablet_->metrics();
if (metrics) {
int64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
int64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
int64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
int64_t new_num_rs_minor_delta_compacted =
uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
uint64_t new_num_dms_flushed = metrics->flush_dms_duration->TotalCount();
uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
uint64_t new_num_rs_minor_delta_compacted =
metrics->delta_minor_compact_rs_duration->TotalCount();
int64_t new_num_rs_major_delta_compacted =
uint64_t new_num_rs_major_delta_compacted =
metrics->delta_major_compact_rs_duration->TotalCount();
if (prev_stats_.valid() &&
new_num_mrs_flushed == last_num_mrs_flushed_ &&
Expand Down
43 changes: 23 additions & 20 deletions src/kudu/tablet/tablet_mm_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class TabletOpBase : public MaintenanceOp {
std::string LogPrefix() const;

protected:
const std::string& table_id() const OVERRIDE;
const std::string& tablet_id() const OVERRIDE;

Tablet* const tablet_;
};

Expand All @@ -57,15 +60,15 @@ class CompactRowSetsOp : public TabletOpBase {
public:
explicit CompactRowSetsOp(Tablet* tablet);

virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

virtual bool Prepare() OVERRIDE;
bool Prepare() OVERRIDE;

virtual void Perform() OVERRIDE;
void Perform() OVERRIDE;

virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
mutable simple_spinlock lock_;
Expand All @@ -83,15 +86,15 @@ class MinorDeltaCompactionOp : public TabletOpBase {
public:
explicit MinorDeltaCompactionOp(Tablet* tablet);

virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

virtual bool Prepare() OVERRIDE;
bool Prepare() OVERRIDE;

virtual void Perform() OVERRIDE;
void Perform() OVERRIDE;

virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
mutable simple_spinlock lock_;
Expand All @@ -109,15 +112,15 @@ class MajorDeltaCompactionOp : public TabletOpBase {
public:
explicit MajorDeltaCompactionOp(Tablet* tablet);

virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

virtual bool Prepare() OVERRIDE;
bool Prepare() OVERRIDE;

virtual void Perform() OVERRIDE;
void Perform() OVERRIDE;

virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
mutable simple_spinlock lock_;
Expand All @@ -138,19 +141,19 @@ class UndoDeltaBlockGCOp : public TabletOpBase {
// Estimates the number of bytes that may potentially be in ancient delta
// undo blocks. Over time, as Perform() is invoked, this estimate gets more
// accurate.
void UpdateStats(MaintenanceOpStats* stats) override;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

bool Prepare() override;
bool Prepare() OVERRIDE;

// Deletes ancient history data from disk. This also initializes undo delta
// blocks greedily (in a budgeted manner controlled by the
// --undo_delta_block_gc_init_budget_millis gflag) that makes the estimate
// performed in UpdateStats() more accurate.
void Perform() override;
void Perform() OVERRIDE;

scoped_refptr<Histogram> DurationHistogram() const override;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
std::string LogPrefix() const;
Expand Down
1 change: 0 additions & 1 deletion src/kudu/tablet/tablet_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ TabletReplica::TabletReplica(
Callback<void(const std::string& reason)> mark_dirty_clbk)
: meta_(DCHECK_NOTNULL(std::move(meta))),
cmeta_manager_(DCHECK_NOTNULL(std::move(cmeta_manager))),
tablet_id_(meta_->tablet_id()),
local_peer_pb_(std::move(local_peer_pb)),
log_anchor_registry_(new LogAnchorRegistry()),
apply_pool_(apply_pool),
Expand Down
7 changes: 2 additions & 5 deletions src/kudu/tablet/tablet_replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,8 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
return log_anchor_registry_;
}

// Returns the tablet_id of the tablet managed by this TabletReplica.
// Returns the correct tablet_id even if the underlying tablet is not available
// yet.
const std::string& tablet_id() const { return tablet_id_; }
const std::string& table_id() const { return meta_->table_id(); }
const std::string& tablet_id() const { return meta_->tablet_id(); }

// Convenience method to return the permanent_uuid of this peer.
std::string permanent_uuid() const { return tablet_->metadata()->fs_manager()->uuid(); }
Expand Down Expand Up @@ -322,7 +320,6 @@ class TabletReplica : public RefCountedThreadSafe<TabletReplica>,
const scoped_refptr<TabletMetadata> meta_;
const scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager_;

const std::string tablet_id_;
const consensus::RaftPeerPB local_peer_pb_;
scoped_refptr<log::LogAnchorRegistry> log_anchor_registry_; // Assigned in tablet_replica-test

Expand Down
26 changes: 23 additions & 3 deletions src/kudu/tablet/tablet_replica_mm_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <mutex>
#include <ostream>
#include <string>
#include <utility>

#include <gflags/gflags.h>
#include <glog/logging.h>
Expand Down Expand Up @@ -121,6 +122,24 @@ void FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(MaintenanceOpStats
}
}

//
// TabletReplicaOpBase.
//
TabletReplicaOpBase::TabletReplicaOpBase(std::string name,
IOUsage io_usage,
TabletReplica* tablet_replica)
: MaintenanceOp(std::move(name), io_usage),
tablet_replica_(tablet_replica) {
}

const std::string& TabletReplicaOpBase::table_id() const {
return tablet_replica_->table_id();
}

const std::string& TabletReplicaOpBase::tablet_id() const {
return tablet_replica_->tablet_id();
}

//
// FlushMRSOp.
//
Expand Down Expand Up @@ -260,9 +279,10 @@ scoped_refptr<AtomicGauge<uint32_t> > FlushDeltaMemStoresOp::RunningGauge() cons
//

LogGCOp::LogGCOp(TabletReplica* tablet_replica)
: MaintenanceOp(StringPrintf("LogGCOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::LOW_IO_USAGE),
tablet_replica_(tablet_replica),
: TabletReplicaOpBase(StringPrintf("LogGCOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::LOW_IO_USAGE,
tablet_replica),
log_gc_duration_(METRIC_log_gc_duration.Instantiate(
tablet_replica->tablet()->GetMetricEntity())),
log_gc_running_(METRIC_log_gc_running.Instantiate(
Expand Down
67 changes: 37 additions & 30 deletions src/kudu/tablet/tablet_replica_mm_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,86 +47,93 @@ class FlushOpPerfImprovementPolicy {
FlushOpPerfImprovementPolicy() {}
};

class TabletReplicaOpBase : public MaintenanceOp {
public:
explicit TabletReplicaOpBase(std::string name, IOUsage io_usage, TabletReplica* tablet_replica);

protected:
const std::string& table_id() const OVERRIDE;
const std::string& tablet_id() const OVERRIDE;

TabletReplica *const tablet_replica_;
};

// Maintenance op for MRS flush. Only one can happen at a time.
class FlushMRSOp : public MaintenanceOp {
class FlushMRSOp : public TabletReplicaOpBase {
public:
explicit FlushMRSOp(TabletReplica* tablet_replica)
: MaintenanceOp(StringPrintf("FlushMRSOp(%s)", tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::HIGH_IO_USAGE),
tablet_replica_(tablet_replica) {
: TabletReplicaOpBase(StringPrintf("FlushMRSOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::HIGH_IO_USAGE,
tablet_replica) {
time_since_flush_.start();
}

virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

virtual bool Prepare() OVERRIDE;
bool Prepare() OVERRIDE;

virtual void Perform() OVERRIDE;
void Perform() OVERRIDE;

virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
// Lock protecting time_since_flush_.
mutable simple_spinlock lock_;
Stopwatch time_since_flush_;

TabletReplica *const tablet_replica_;
};

// Maintenance op for DMS flush.
// Reports stats for all the DMS this tablet contains but only flushes one in Perform().
class FlushDeltaMemStoresOp : public MaintenanceOp {
class FlushDeltaMemStoresOp : public TabletReplicaOpBase {
public:
explicit FlushDeltaMemStoresOp(TabletReplica* tablet_replica)
: MaintenanceOp(StringPrintf("FlushDeltaMemStoresOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::HIGH_IO_USAGE),
tablet_replica_(tablet_replica) {
: TabletReplicaOpBase(StringPrintf("FlushDeltaMemStoresOp(%s)",
tablet_replica->tablet()->tablet_id().c_str()),
MaintenanceOp::HIGH_IO_USAGE,
tablet_replica) {
time_since_flush_.start();
}

virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

virtual bool Prepare() OVERRIDE {
bool Prepare() OVERRIDE {
return true;
}

virtual void Perform() OVERRIDE;
void Perform() OVERRIDE;

virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
// Lock protecting time_since_flush_
mutable simple_spinlock lock_;
Stopwatch time_since_flush_;

TabletReplica *const tablet_replica_;
};

// Maintenance task that runs log GC. Reports log retention that represents the amount of data
// that can be GC'd.
//
// Only one LogGC op can run at a time.
class LogGCOp : public MaintenanceOp {
class LogGCOp : public TabletReplicaOpBase {
public:
explicit LogGCOp(TabletReplica* tablet_replica);

virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;
void UpdateStats(MaintenanceOpStats* stats) OVERRIDE;

virtual bool Prepare() OVERRIDE;
bool Prepare() OVERRIDE;

virtual void Perform() OVERRIDE;
void Perform() OVERRIDE;

virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;
scoped_refptr<Histogram> DurationHistogram() const OVERRIDE;

virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;
scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE;

private:
TabletReplica *const tablet_replica_;
scoped_refptr<Histogram> log_gc_duration_;
scoped_refptr<AtomicGauge<uint32_t> > log_gc_running_;
mutable Semaphore sem_;
Expand Down
1 change: 1 addition & 0 deletions src/kudu/tserver/tserver_path_handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ void TabletServerPathHandlers::HandleMaintenanceManagerPage(const Webserver::Web
for (const auto& op_pb : pb.completed_operations()) {
EasyJson completed_op = completed_ops.PushBack(EasyJson::kObject);
completed_op["name"] = op_pb.name();
completed_op["privilege"] = op_pb.as_privilege();
completed_op["duration"] =
HumanReadableElapsedTime::ToShortString(op_pb.duration_millis() / 1000.0);
completed_op["time_since_start"] =
Expand Down
Loading

0 comments on commit 15868cf

Please sign in to comment.