Skip to content

Commit

Permalink
Rate-limit un-ratelimited flush/compaction code paths
Browse files Browse the repository at this point in the history
  • Loading branch information
hx235 committed Jan 24, 2024
1 parent 59f4cbe commit 1d4dae5
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 7 deletions.
8 changes: 5 additions & 3 deletions db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,8 @@ void CompactionJob::GenSubcompactionBoundaries() {
// overlap with N-1 other ranges. Since we requested a relatively large number
// (128) of ranges from each input files, even N range overlapping would
// cause relatively small inaccuracy.
const ReadOptions read_options(Env::IOActivity::kCompaction);
ReadOptions read_options(Env::IOActivity::kCompaction);
read_options.rate_limiter_priority = GetRateLimiterPriority();
auto* c = compact_->compaction;
if (c->max_subcompactions() <= 1 &&
!(c->immutable_options()->compaction_pri == kRoundRobin &&
Expand Down Expand Up @@ -736,8 +737,9 @@ Status CompactionJob::Run() {
// use_direct_io_for_flush_and_compaction is true, we will regard this
// verification as user reads since the goal is to cache it here for
// further user reads
const ReadOptions verify_table_read_options(
Env::IOActivity::kCompaction);
ReadOptions verify_table_read_options(Env::IOActivity::kCompaction);
verify_table_read_options.rate_limiter_priority =
GetRateLimiterPriority();
InternalIterator* iter = cfd->table_cache()->NewIterator(
verify_table_read_options, file_options_,
cfd->internal_comparator(), files_output[file_idx]->meta,
Expand Down
22 changes: 19 additions & 3 deletions db/db_test2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3947,6 +3947,15 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);

// To precisely control when to start bg compaction for excluding previous
// rate-limited bytes of flush read for table verification
std::shared_ptr<test::SleepingBackgroundTask> sleeping_task(
new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task.get(), Env::Priority::LOW);
sleeping_task->WaitUntilSleeping();

for (int i = 0; i < kNumL0Files; ++i) {
for (int j = 0; j <= kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
Expand All @@ -3956,13 +3965,20 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
}

size_t rate_limited_bytes_start_bytes =
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL);

sleeping_task->WakeUp();
sleeping_task->WaitUntilDone();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));

// should be slightly above 512KB due to non-data blocks read. Arbitrarily
// chose 1MB as the upper bound on the total bytes read.
size_t rate_limited_bytes = static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL));
size_t rate_limited_bytes =
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL)) -
rate_limited_bytes_start_bytes;
// The charges can exist for `IO_LOW` and `IO_USER` priorities.
size_t rate_limited_bytes_by_pri =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW) +
Expand Down
3 changes: 2 additions & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,8 @@ Status FlushJob::WriteLevel0Table() {

const std::string* const full_history_ts_low =
(full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
const ReadOptions read_options(Env::IOActivity::kFlush);
ReadOptions read_options(Env::IOActivity::kFlush);
read_options.rate_limiter_priority = io_priority;
const WriteOptions write_options(io_priority, Env::IOActivity::kFlush);
TableBuilderOptions tboptions(
*cfd_->ioptions(), mutable_cf_options_, read_options, write_options,
Expand Down

0 comments on commit 1d4dae5

Please sign in to comment.