Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:big key metricx #3000

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
rm -rf ./deps
rm -rf ./buildtrees

- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: ${{ env.ARTIFACT_PIKA_NAME }}
path: ${{ github.workspace }}/build/pika
Expand Down Expand Up @@ -221,7 +221,7 @@ jobs:
with:
images: pikadb/pika

- uses: actions/download-artifact@v3
- uses: actions/download-artifact@v4
with:
name: ${{ env.ARTIFACT_PIKA_NAME }}
path: artifact/
Expand Down
8 changes: 8 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,7 @@ void InfoCmd::InfoStats(std::string& info) {
tmp_stream << "is_compact:" << (g_pika_server->IsCompacting() ? "Yes" : "No") << "\r\n";
tmp_stream << "compact_cron:" << g_pika_conf->compact_cron() << "\r\n";
tmp_stream << "compact_interval:" << g_pika_conf->compact_interval() << "\r\n";
tmp_stream << "# Big Key Statistics\r\n";
time_t current_time_s = time(nullptr);
PikaServer::BGSlotsReload bgslotsreload_info = g_pika_server->bgslots_reload();
bool is_reloading = g_pika_server->GetSlotsreloading();
Expand Down Expand Up @@ -1372,6 +1373,7 @@ void InfoCmd::InfoData(std::string& info) {
uint64_t memtable_usage = 0;
uint64_t table_reader_usage = 0;
std::shared_lock db_rwl(g_pika_server->dbs_rw_);
uint64_t total_big_key_count = 0;
for (const auto& db_item : g_pika_server->dbs_) {
if (!db_item.second) {
continue;
Expand All @@ -1382,6 +1384,11 @@ void InfoCmd::InfoData(std::string& info) {
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_CUR_SIZE_ALL_MEM_TABLES, &memtable_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_ESTIMATE_TABLE_READER_MEM, &table_reader_usage);
db_item.second->storage()->GetUsage(storage::PROPERTY_TYPE_ROCKSDB_BACKGROUND_ERRORS, &background_errors);

uint64_t big_key_count = 0;
db_item.second->storage()->GetBigKeyStatistics("bigkey_property", &big_key_count);
total_big_key_count += big_key_count;

db_item.second->DBUnlockShared();
total_memtable_usage += memtable_usage;
total_table_reader_usage += table_reader_usage;
Expand All @@ -1401,6 +1408,7 @@ void InfoCmd::InfoData(std::string& info) {
tmp_stream << "db_tablereader_usage:" << total_table_reader_usage << "\r\n";
tmp_stream << "db_fatal:" << (total_background_errors != 0 ? "1" : "0") << "\r\n";
tmp_stream << "db_fatal_msg:" << (total_background_errors != 0 ? db_fatal_msg_stream.str() : "nullptr") << "\r\n";
tmp_stream << "big_key_count:" << total_big_key_count << "\r\n";

info.append(tmp_stream.str());
}
Expand Down
1 change: 0 additions & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,6 @@ void PikaServer::PurgeDir(const std::string& path) {
PurgeDirTaskSchedule(&DoPurgeDir, static_cast<void*>(dir_path));
}


void PikaServer::PurgeDirTaskSchedule(void (*function)(void*), void* arg) {
purge_thread_.StartThread();
purge_thread_.Schedule(function, arg);
Expand Down
1 change: 1 addition & 0 deletions src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,7 @@ class Storage {
Status GetUsage(const std::string& property, uint64_t* result);
Status GetUsage(const std::string& property, std::map<int, uint64_t>* type_result);
uint64_t GetProperty(const std::string& property);
Status GetBigKeyStatistics(const std::string& property, uint64_t* out);

Status GetKeyNum(std::vector<KeyInfo>* key_infos);
Status StopScanKeyNum();
Expand Down
19 changes: 19 additions & 0 deletions src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,22 @@ class Redis {
void ScanZsets();
void ScanSets();

void CheckBigKeyAndLog(const std::string& key, uint64_t size) {
static const uint64_t kBigKeyThreshold = 10000;
if (size > kBigKeyThreshold) {
std::lock_guard<std::mutex> lock(big_key_access_mutex_);
big_key_access_count_[key]++;
std::cerr << "[BIGKEY DETECTED] Key: " << key
<< ", Size: " << size
<< ", Access Count: " << big_key_access_count_[key] << std::endl;
}
}
Comment on lines +362 to +371
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider improving the big key monitoring implementation

The current implementation has several areas that could be enhanced:

  1. The threshold should be configurable rather than hardcoded
  2. Logging to stderr may not be suitable for production environments
  3. Consider adding rate limiting to prevent excessive logging
  4. Add a cleanup mechanism for big_key_access_count_ to prevent unbounded memory growth

Consider these improvements:

+  // Configurable threshold with a reasonable default
+  static const uint64_t kDefaultBigKeyThreshold = 10000;
+  uint64_t big_key_threshold_;
+
   void CheckBigKeyAndLog(const std::string& key, uint64_t size) {
-    static const uint64_t kBigKeyThreshold = 10000;
-    if (size > kBigKeyThreshold) {
+    if (size > big_key_threshold_) {
       std::lock_guard<std::mutex> lock(big_key_access_mutex_);
+      
+      // Rate limit logging using a token bucket
+      if (ShouldLog()) {
         big_key_access_count_[key]++;
-        std::cerr << "[BIGKEY DETECTED] Key: " << key
-                << ", Size: " << size
-                << ", Access Count: " << big_key_access_count_[key] << std::endl;
+        LOG(WARNING) << "[BIGKEY DETECTED] Key: " << key
+                    << ", Size: " << size
+                    << ", Access Count: " << big_key_access_count_[key];
+      }
+      
+      // Cleanup old entries periodically
+      MaybeCleanupBigKeyMap();
     }
   }

Committable suggestion skipped: line range outside the PR's diff.


std::unordered_map<std::string, int> GetBigKeyStatistics() {
std::lock_guard<std::mutex> lock(big_key_access_mutex_);
return big_key_access_count_;
}

TypeIterator* CreateIterator(const DataType& type, const std::string& pattern, const Slice* lower_bound, const Slice* upper_bound) {
return CreateIterator(DataTypeTag[static_cast<int>(type)], pattern, lower_bound, upper_bound);
}
Expand Down Expand Up @@ -538,6 +554,9 @@ class Redis {
Status UpdateSpecificKeyStatistics(const DataType& dtype, const std::string& key, uint64_t count);
Status UpdateSpecificKeyDuration(const DataType& dtype, const std::string& key, uint64_t duration);
Status AddCompactKeyTaskIfNeeded(const DataType& dtype, const std::string& key, uint64_t count, uint64_t duration);

std::unordered_map<std::string, int> big_key_access_count_;
std::mutex big_key_access_mutex_;
};

} // namespace storage
Expand Down
22 changes: 22 additions & 0 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ Status Redis::HDel(const Slice& key, const std::vector<std::string>& fields, int
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Verify consistent placement of CheckBigKeyAndLog calls.

The CheckBigKeyAndLog function has been added consistently across all hash operations. The placement and usage pattern matches that of the list operations, which is good for maintainability.

However, there's one inconsistency in the HashesExpire method at line 1198 where the CheckBigKeyAndLog call is placed after the stale check, unlike other methods.

Move the CheckBigKeyAndLog call before the stale check for consistency:

  if (s.ok()) {
    ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
    CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
-   if (parsed_hashes_meta_value.IsStale()) {
-     return Status::NotFound("Stale");
-   } else if (parsed_hashes_meta_value.Count() == 0) {
-     return Status::NotFound();
-   }
+   if (parsed_hashes_meta_value.IsStale()) {
+     return Status::NotFound("Stale");
+   } else if (parsed_hashes_meta_value.Count() == 0) {
+     return Status::NotFound();
+   }

Also applies to: 163-163, 204-204, 248-248, 306-306, 400-400, 485-485, 529-529, 569-569, 634-634, 710-710, 786-786, 850-850, 914-914, 992-992, 1063-1063, 1135-1135, 1198-1198, 1239-1239, 1277-1277, 1317-1317, 1357-1357

if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
*ret = 0;
return Status::OK();
Expand Down Expand Up @@ -159,6 +160,7 @@ Status Redis::HGet(const Slice& key, const Slice& field, std::string* value) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -199,6 +201,7 @@ Status Redis::HGetall(const Slice& key, std::vector<FieldValue>* fvs) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -242,6 +245,7 @@ Status Redis::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fvs, int
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.Count() == 0) {
return Status::NotFound();
} else if (parsed_hashes_meta_value.IsStale()) {
Expand Down Expand Up @@ -299,6 +303,7 @@ Status Redis::HIncrby(const Slice& key, const Slice& field, int64_t value, int64
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.SetCount(1);
Expand Down Expand Up @@ -392,6 +397,7 @@ Status Redis::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
version = parsed_hashes_meta_value.UpdateVersion();
parsed_hashes_meta_value.SetCount(1);
Expand Down Expand Up @@ -476,6 +482,7 @@ Status Redis::HKeys(const Slice& key, std::vector<std::string>* fields) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -519,6 +526,7 @@ Status Redis::HLen(const Slice& key, int32_t* ret, std::string&& prefetch_meta)
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
*ret = 0;
return Status::NotFound("Stale");
Expand Down Expand Up @@ -558,6 +566,7 @@ Status Redis::HMGet(const Slice& key, const std::vector<std::string>& fields, st
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if ((is_stale = parsed_hashes_meta_value.IsStale()) || parsed_hashes_meta_value.Count() == 0) {
for (size_t idx = 0; idx < fields.size(); ++idx) {
vss->push_back({std::string(), Status::NotFound()});
Expand Down Expand Up @@ -622,6 +631,7 @@ Status Redis::HMSet(const Slice& key, const std::vector<FieldValue>& fvs) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
if (!parsed_hashes_meta_value.check_set_count(static_cast<int32_t>(filtered_fvs.size()))) {
Expand Down Expand Up @@ -697,6 +707,7 @@ Status Redis::HSet(const Slice& key, const Slice& field, const Slice& value, int
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.SetCount(1);
Expand Down Expand Up @@ -772,6 +783,7 @@ Status Redis::HSetnx(const Slice& key, const Slice& field, const Slice& value, i
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
version = parsed_hashes_meta_value.InitialMetaValue();
parsed_hashes_meta_value.SetCount(1);
Expand Down Expand Up @@ -835,6 +847,7 @@ Status Redis::HVals(const Slice& key, std::vector<std::string>* values) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -898,6 +911,7 @@ Status Redis::HScan(const Slice& key, int64_t cursor, const std::string& pattern
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
*next_cursor = 0;
return Status::NotFound();
Expand Down Expand Up @@ -975,6 +989,7 @@ Status Redis::HScanx(const Slice& key, const std::string& start_field, const std
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
*next_field = "";
return Status::NotFound();
Expand Down Expand Up @@ -1045,6 +1060,7 @@ Status Redis::PKHScanRange(const Slice& key, const Slice& field_start, const std
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1116,6 +1132,7 @@ Status Redis::PKHRScanRange(const Slice& key, const Slice& field_start, const st
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale() || parsed_hashes_meta_value.Count() == 0) {
return Status::NotFound();
} else {
Expand Down Expand Up @@ -1178,6 +1195,7 @@ Status Redis::HashesExpire(const Slice& key, int64_t ttl_millsec, std::string&&
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -1218,6 +1236,7 @@ Status Redis::HashesDel(const Slice& key, std::string&& prefetch_meta) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -1255,6 +1274,7 @@ Status Redis::HashesExpireat(const Slice& key, int64_t timestamp_millsec, std::s
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -1294,6 +1314,7 @@ Status Redis::HashesPersist(const Slice& key, std::string&& prefetch_meta) {
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
return Status::NotFound("Stale");
} else if (parsed_hashes_meta_value.Count() == 0) {
Expand Down Expand Up @@ -1333,6 +1354,7 @@ Status Redis::HashesTTL(const Slice& key, int64_t* ttl_millsec, std::string&& pr
}
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
CheckBigKeyAndLog(key.ToString(), parsed_hashes_meta_value.Count());
if (parsed_hashes_meta_value.IsStale()) {
*ttl_millsec = -2;
return Status::NotFound("Stale");
Expand Down
Loading
Loading