From cb2476a0cacbf53ca16aecda46f7b4fe8a2594f3 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Fri, 1 Jul 2016 00:16:29 -0700 Subject: [PATCH] fix rate limiter to avoid starvation Summary: The current implementation of rate limiter has the possibility to introduce resource starvation when change its limit. This diff aims to fix this problem by consuming request bytes partially. Test Plan: ``` ./rate_limiter_test [==========] Running 4 tests from 1 test case. [----------] Global test environment set-up. [----------] 4 tests from RateLimiterTest [ RUN ] RateLimiterTest.OverflowRate [ OK ] RateLimiterTest.OverflowRate (0 ms) [ RUN ] RateLimiterTest.StartStop [ OK ] RateLimiterTest.StartStop (0 ms) [ RUN ] RateLimiterTest.Rate request size [1 - 1023], limit 10 KB/sec, actual rate: 10.355712 KB/sec, elapsed 2.00 seconds request size [1 - 1023], limit 20 KB/sec, actual rate: 19.136564 KB/sec, elapsed 2.00 seconds request size [1 - 2047], limit 20 KB/sec, actual rate: 20.783976 KB/sec, elapsed 2.10 seconds request size [1 - 2047], limit 40 KB/sec, actual rate: 39.308144 KB/sec, elapsed 2.10 seconds request size [1 - 4095], limit 40 KB/sec, actual rate: 40.318349 KB/sec, elapsed 2.20 seconds request size [1 - 4095], limit 80 KB/sec, actual rate: 79.667396 KB/sec, elapsed 2.20 seconds request size [1 - 8191], limit 80 KB/sec, actual rate: 81.807158 KB/sec, elapsed 2.30 seconds request size [1 - 8191], limit 160 KB/sec, actual rate: 160.659761 KB/sec, elapsed 2.20 seconds request size [1 - 16383], limit 160 KB/sec, actual rate: 160.700990 KB/sec, elapsed 3.00 seconds request size [1 - 16383], limit 320 KB/sec, actual rate: 317.639481 KB/sec, elapsed 2.50 seconds [ OK ] RateLimiterTest.Rate (22618 ms) [ RUN ] RateLimiterTest.LimitChangeTest [COMPLETE] request size 10 KB, new limit 20KB/sec, refill period 1000 ms [COMPLETE] request size 10 KB, new limit 5KB/sec, refill period 1000 ms [COMPLETE] request size 20 KB, new limit 40KB/sec, refill period 1000 ms [COMPLETE] request size 20 KB, new limit 10KB/sec, refill period 1000 ms [COMPLETE] request size 40 KB, new limit 80KB/sec, refill period 1000 ms [COMPLETE] request size 40 KB, new limit 20KB/sec, refill period 1000 ms [COMPLETE] request size 80 KB, new limit 160KB/sec, refill period 1000 ms [COMPLETE] request size 80 KB, new limit 40KB/sec, refill period 1000 ms [COMPLETE] request size 160 KB, new limit 320KB/sec, refill period 1000 ms [COMPLETE] request size 160 KB, new limit 80KB/sec, refill period 1000 ms [ OK ] RateLimiterTest.LimitChangeTest (5002 ms) [----------] 4 tests from RateLimiterTest (27620 ms total) [----------] Global test environment tear-down [==========] 4 tests from 1 test case ran. (27621 ms total) [ PASSED ] 4 tests. ``` Reviewers: sdong, IslamAbdelRahman, yiwu, andrewkr Reviewed By: andrewkr Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D60207 --- util/rate_limiter.cc | 15 ++++++++--- util/rate_limiter_test.cc | 56 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 7 deletions(-) diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 4e836d030dc..9f2a84e4361 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -10,6 +10,7 @@ #include "util/rate_limiter.h" #include "port/port.h" #include "rocksdb/env.h" +#include "util/sync_point.h" namespace rocksdb { @@ -17,7 +18,8 @@ namespace rocksdb { // Pending request struct GenericRateLimiter::Req { explicit Req(int64_t _bytes, port::Mutex* _mu) - : bytes(_bytes), cv(_mu), granted(false) {} + : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} + int64_t request_bytes; int64_t bytes; port::CondVar cv; bool granted; @@ -70,7 +72,7 @@ void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); - + TEST_SYNC_POINT("GenericRateLimiter::Request"); MutexLock g(&request_mutex_); if (stop_) { return; @@ -175,6 +177,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { } void GenericRateLimiter::Refill() { + TEST_SYNC_POINT("GenericRateLimiter::Refill"); next_refill_us_ = env_->NowMicros() + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = @@ -189,10 +192,14 @@ void GenericRateLimiter::Refill() { auto* queue = &queue_[use_pri]; while (!queue->empty()) { auto* next_req = queue->front(); - if (available_bytes_ < next_req->bytes) { + if (available_bytes_ < next_req->request_bytes) { + // avoid starvation + next_req->request_bytes -= available_bytes_; + available_bytes_ = 0; break; } - available_bytes_ -= next_req->bytes; + available_bytes_ -= next_req->request_bytes; + next_req->request_bytes = 0; total_bytes_through_[use_pri] += next_req->bytes; queue->pop_front(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index d1152ed5649..43fa5ef4517 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -11,12 +11,13 @@ #define __STDC_FORMAT_MACROS #endif +#include "util/rate_limiter.h" #include #include -#include "util/testharness.h" -#include "util/rate_limiter.h" -#include "util/random.h" #include "rocksdb/env.h" +#include "util/random.h" +#include "util/sync_point.h" +#include "util/testharness.h" namespace rocksdb { @@ -92,6 +93,55 @@ TEST_F(RateLimiterTest, Rate) { } } +TEST_F(RateLimiterTest, LimitChangeTest) { + // starvation test when limit changes to a smaller value + int64_t refill_period = 1000 * 1000; + auto* env = Env::Default(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + struct Arg { + Arg(int32_t _request_size, Env::IOPriority _pri, + std::shared_ptr _limiter) + : request_size(_request_size), pri(_pri), limiter(_limiter) {} + int32_t request_size; + Env::IOPriority pri; + std::shared_ptr limiter; + }; + + auto writer = [](void* p) { + auto* arg = static_cast(p); + arg->limiter->Request(arg->request_size, arg->pri); + }; + + for (uint32_t i = 1; i <= 16; i <<= 1) { + int32_t target = i * 1024 * 10; + // refill per second + for (int iter = 0; iter < 2; iter++) { + std::shared_ptr limiter = + std::make_shared(target, refill_period, 10); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"GenericRateLimiter::Request", + "RateLimiterTest::LimitChangeTest:changeLimitStart"}, + {"RateLimiterTest::LimitChangeTest:changeLimitEnd", + "GenericRateLimiter::Refill"}}); + Arg arg(target, Env::IO_HIGH, limiter); + // The idea behind is to start a request first, then before it refills, + // update limit to a different value (2X/0.5X). No starvation should + // be guaranteed under any situation + // TODO(lightmark): more test cases are welcome. + env->StartThread(writer, &arg); + int32_t new_limit = (target << 1) >> (iter << 1); + TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart"); + arg.limiter->SetBytesPerSecond(new_limit); + TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd"); + env->WaitForJoin(); + fprintf(stderr, + "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32 + "KB/sec, refill period %" PRIi64 " ms\n", + target / 1024, new_limit / 1024, refill_period / 1000); + } + } +} + } // namespace rocksdb int main(int argc, char** argv) {