Skip to content

Commit

Permalink
allow to query DB stall status (#226)
Browse files Browse the repository at this point in the history
This PR adds a new property is-write-stalled to query whether the column family is in write stall or not.

In TiKV there is a compaction filter used for GC, in which DB::write is called. So if we can query whether the DB instance is stalled or not, we can skip to create more compaction filter instances to save some resources.

Signed-off-by: qupeng <[email protected]>
Signed-off-by: tabokie <[email protected]>
  • Loading branch information
hicqu authored and tabokie committed May 12, 2022
1 parent 52b4a97 commit 4ec4a1f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 1 deletion.
82 changes: 81 additions & 1 deletion db/compact_files_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ TEST_F(CompactFilesTest, FilterContext) {
// Small slowdown and stop trigger for experimental purpose.
options.level0_slowdown_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.level0_stop_writes_trigger = 20;
options.write_buffer_size = kWriteBufferSize;
options.level0_file_num_compaction_trigger = kLevel0Trigger;
options.compression = kNoCompression;
Expand Down Expand Up @@ -558,6 +557,87 @@ TEST_F(CompactFilesTest, GetCompactionJobInfo) {
delete db;
}

TEST_F(CompactFilesTest, IsWriteStalled) {
class SlowFilter : public CompactionFilter {
public:
SlowFilter(std::atomic<bool>* would_block) { would_block_ = would_block; }

bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
while (would_block_->load(std::memory_order_relaxed)) {
usleep(10000);
}
return false;
}

const char* Name() const override { return "SlowFilter"; }

private:
std::atomic<bool>* would_block_;
};

Options options;
options.create_if_missing = true;
options.delayed_write_rate = 1;

ColumnFamilyOptions cf_options;
cf_options.level0_slowdown_writes_trigger = 12;
cf_options.level0_stop_writes_trigger = 15;
cf_options.write_buffer_size = 1024 * 1024;

std::atomic<bool> compaction_would_block;
compaction_would_block.store(true, std::memory_order_relaxed);
cf_options.compaction_filter = new SlowFilter(&compaction_would_block);

std::vector<ColumnFamilyDescriptor> cfds;
cfds.push_back(ColumnFamilyDescriptor("default", cf_options));

DB* db = nullptr;
std::vector<ColumnFamilyHandle*> handles;
DestroyDB(db_name_, options);

Status s = DB::Open(options, db_name_, cfds, &handles, &db);
assert(s.ok());
assert(db);

int flushed_l0_files = 0;
for (; flushed_l0_files < 100;) {
WriteBatch wb;
for (int j = 0; j < 100; ++j) {
char key[16];
bzero(key, 16);
sprintf(key, "foo%.2d", j);
ASSERT_OK(wb.Put(handles[0], key, "bar"));
}

WriteOptions wopts;
wopts.no_slowdown = true;
s = db->Write(wopts, &wb);
if (s.ok()) {
FlushOptions fopts;
fopts.allow_write_stall = true;
ASSERT_OK(db->Flush(fopts));
++flushed_l0_files;
} else {
ASSERT_EQ(s.code(), Status::Code::kIncomplete);
break;
}
}

// The write loop must be terminated by write stall.
ASSERT_EQ(flushed_l0_files, 12);
uint64_t stalled = false;
db->GetIntProperty(handles[0], "rocksdb.is-write-stalled", &stalled);
ASSERT_TRUE(stalled);

compaction_would_block.store(false, std::memory_order_relaxed);
for (size_t i = 0; i < handles.size(); ++i) {
delete handles[i];
}
delete (db);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
12 changes: 12 additions & 0 deletions db/internal_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
static const std::string is_write_stopped = "is-write-stopped";
static const std::string is_write_stalled = "is-write-stalled";
static const std::string estimate_oldest_key_time = "estimate-oldest-key-time";
static const std::string block_cache_capacity = "block-cache-capacity";
static const std::string block_cache_usage = "block-cache-usage";
Expand Down Expand Up @@ -385,6 +386,8 @@ const std::string DB::Properties::kActualDelayedWriteRate =
rocksdb_prefix + actual_delayed_write_rate;
const std::string DB::Properties::kIsWriteStopped =
rocksdb_prefix + is_write_stopped;
const std::string DB::Properties::kIsWriteStalled =
rocksdb_prefix + is_write_stalled;
const std::string DB::Properties::kEstimateOldestKeyTime =
rocksdb_prefix + estimate_oldest_key_time;
const std::string DB::Properties::kBlockCacheCapacity =
Expand Down Expand Up @@ -536,6 +539,9 @@ const std::unordered_map<std::string, DBPropertyInfo>
{DB::Properties::kIsWriteStopped,
{false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr,
nullptr}},
{DB::Properties::kIsWriteStalled,
{false, nullptr, &InternalStats::HandleIsWriteStalled, nullptr,
nullptr}},
{DB::Properties::kEstimateOldestKeyTime,
{false, nullptr, &InternalStats::HandleEstimateOldestKeyTime, nullptr,
nullptr}},
Expand Down Expand Up @@ -1228,6 +1234,12 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db,
return true;
}

bool InternalStats::HandleIsWriteStalled(uint64_t* value, DBImpl* db,
Version* /*version*/) {
*value = db->write_controller().NeedsDelay() ? 1 : 0;
return true;
}

bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// TODO(yiwu): The property is currently available for fifo compaction
Expand Down
1 change: 1 addition & 0 deletions db/internal_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,7 @@ class InternalStats {
bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db,
Version* version);
bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version);
bool HandleIsWriteStalled(uint64_t* value, DBImpl* db, Version* version);
bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db,
Version* version);
bool HandleBlockCacheCapacity(uint64_t* value, DBImpl* db, Version* version);
Expand Down
4 changes: 4 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,9 @@ class DB {
// "rocksdb.is-write-stopped" - Return 1 if write has been stopped.
static const std::string kIsWriteStopped;

// "rocksdb.is-write-stalled" - Return 1 if write has been stalled.
static const std::string kIsWriteStalled;

// "rocksdb.estimate-oldest-key-time" - returns an estimation of
// oldest key timestamp in the DB. Currently only available for
// FIFO compaction with
Expand Down Expand Up @@ -1049,6 +1052,7 @@ class DB {
// "rocksdb.num-running-flushes"
// "rocksdb.actual-delayed-write-rate"
// "rocksdb.is-write-stopped"
// "rocksdb.is-write-stalled"
// "rocksdb.estimate-oldest-key-time"
// "rocksdb.block-cache-capacity"
// "rocksdb.block-cache-usage"
Expand Down

0 comments on commit 4ec4a1f

Please sign in to comment.