Skip to content

Commit

Permalink
Allow users to profile a query and see bottleneck of the query
Browse files Browse the repository at this point in the history
Summary:
Provide a framework to profile a query in detail to figure out latency bottleneck. Currently, in Get(), Put() and iterators, 2-3 simple timing is used. We can easily add more profile counters to the framework later.

Test Plan: Enable this profiling in seveal existing tests.

Reviewers: haobo, dhruba, kailiu, emayanke, vamsi, igor

CC: leveldb

Differential Revision: https://reviews.facebook.net/D14001
  • Loading branch information
Siying Dong authored and siying committed Nov 22, 2013
1 parent 56589ab commit 3d8ac31
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 21 deletions.
8 changes: 7 additions & 1 deletion db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rocksdb/memtablerep.h"
#include "rocksdb/write_batch.h"
#include "rocksdb/statistics.h"
#include "rocksdb/perf_context.h"
#include "port/port.h"
#include "util/bit_set.h"
#include "util/crc32c.h"
Expand Down Expand Up @@ -350,6 +351,8 @@ DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when "
DEFINE_int32(stats_per_interval, 0, "Reports additional stats per interval when"
" this is greater than 0.");

DEFINE_int32(perf_level, 0, "Level of perf collection");

static bool ValidateRateLimit(const char* flagname, double value) {
static constexpr double EPSILON = 1e-10;
if ( value < -EPSILON ) {
Expand Down Expand Up @@ -689,6 +692,7 @@ struct SharedState {
port::Mutex mu;
port::CondVar cv;
int total;
int perf_level;

// Each thread goes through the following states:
// (1) initializing
Expand All @@ -700,7 +704,7 @@ struct SharedState {
long num_done;
bool start;

SharedState() : cv(&mu) { }
SharedState() : cv(&mu), perf_level(FLAGS_perf_level) { }
};

// Per-thread state for concurrent executions of the same benchmark.
Expand Down Expand Up @@ -810,6 +814,7 @@ class Benchmark {
fprintf(stdout, "Memtablerep: vector\n");
break;
}
fprintf(stdout, "Perf Level: %d\n", FLAGS_perf_level);

PrintWarnings();
fprintf(stdout, "------------------------------------------------\n");
Expand Down Expand Up @@ -1150,6 +1155,7 @@ class Benchmark {
}
}

SetPerfLevel(static_cast<PerfLevel> (shared->perf_level));
thread->stats.Start(thread->tid);
(arg->bm->*(arg->method))(thread);
thread->stats.Stop();
Expand Down
34 changes: 31 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
(unsigned long)m->GetLogNumber());
list.push_back(m->NewIterator());
}
Iterator* iter = NewMergingIterator(&internal_comparator_, &list[0],
Iterator* iter = NewMergingIterator(env_, &internal_comparator_, &list[0],
list.size());
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
Expand Down Expand Up @@ -2519,7 +2519,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
// Collect iterators for files in L0 - Ln
versions_->current()->AddIterators(options, storage_options_, &list);
Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size());
NewMergingIterator(env_, &internal_comparator_, &list[0], list.size());
versions_->current()->Ref();

cleanup->mu = &mutex_;
Expand Down Expand Up @@ -2555,6 +2555,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
Status s;

StopWatch sw(env_, options_.statistics, DB_GET);
StopWatchNano snapshot_timer(env_, false);
StartPerfTimer(&snapshot_timer);
SequenceNumber snapshot;
mutex_.Lock();
if (options.snapshot != nullptr) {
Expand Down Expand Up @@ -2583,15 +2585,23 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// s is both in/out. When in, s could either be OK or MergeInProgress.
// merge_operands will contain the sequence of merges in the latter case.
LookupKey lkey(key, snapshot);
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);
if (mem->Get(lkey, value, &s, &merge_operands, options_)) {
// Done
} else if (imm.Get(lkey, value, &s, &merge_operands, options_)) {
// Done
} else {
StopWatchNano from_files_timer(env_, false);
StartPerfTimer(&from_files_timer);

current->Get(options, lkey, value, &s, &merge_operands, &stats,
options_, value_found);
have_stat_update = true;
BumpPerfTime(&perf_context.get_from_output_files_time, &from_files_timer);
}

StopWatchNano post_process_timer(env_, false);
StartPerfTimer(&post_process_timer);
mutex_.Lock();

if (!options_.disable_seek_compaction &&
Expand All @@ -2607,6 +2617,8 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Note, tickers are atomic now - no lock protection needed any more.
RecordTick(options_.statistics, NUMBER_KEYS_READ);
RecordTick(options_.statistics, BYTES_READ, value->size());
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);

return s;
}

Expand All @@ -2615,6 +2627,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
std::vector<std::string>* values) {

StopWatch sw(env_, options_.statistics, DB_MULTIGET);
StopWatchNano snapshot_timer(env_, false);
StartPerfTimer(&snapshot_timer);
SequenceNumber snapshot;
mutex_.Lock();
if (options.snapshot != nullptr) {
Expand Down Expand Up @@ -2646,6 +2660,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,

// Keep track of bytes that we read for statistics-recording later
uint64_t bytesRead = 0;
BumpPerfTime(&perf_context.get_snapshot_time, &snapshot_timer);

// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
Expand All @@ -2672,6 +2687,8 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
}

// Post processing (decrement reference counts and record statistics)
StopWatchNano post_process_timer(env_, false);
StartPerfTimer(&post_process_timer);
mutex_.Lock();
if (!options_.disable_seek_compaction &&
have_stat_update && current->UpdateStats(stats)) {
Expand All @@ -2686,6 +2703,7 @@ std::vector<Status> DBImpl::MultiGet(const ReadOptions& options,
RecordTick(options_.statistics, NUMBER_MULTIGET_CALLS);
RecordTick(options_.statistics, NUMBER_MULTIGET_KEYS_READ, numKeys);
RecordTick(options_.statistics, NUMBER_MULTIGET_BYTES_READ, bytesRead);
BumpPerfTime(&perf_context.get_post_process_time, &post_process_timer);

return statList;
}
Expand Down Expand Up @@ -2754,6 +2772,8 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
}

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
StopWatchNano pre_post_process_timer(env_, false);
StartPerfTimer(&pre_post_process_timer);
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
Expand Down Expand Up @@ -2800,12 +2820,13 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (options.disableWAL) {
flush_on_destroy_ = true;
}
BumpPerfTime(&perf_context.write_pre_and_post_process_time,
&pre_post_process_timer);

if (!options.disableWAL) {
StopWatchNano timer(env_);
StartPerfTimer(&timer);
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
BumpPerfTime(&perf_context.wal_write_time, &timer);
if (status.ok() && options.sync) {
if (options_.use_fsync) {
StopWatch(env_, options_.statistics, WAL_FILE_SYNC_MICROS);
Expand All @@ -2815,10 +2836,14 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
status = log_->file()->Sync();
}
}
BumpPerfTime(&perf_context.write_wal_time, &timer);
}
if (status.ok()) {
StopWatchNano write_memtable_timer(env_, false);
StartPerfTimer(&write_memtable_timer);
status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this,
options_.filter_deletes);
BumpPerfTime(&perf_context.write_memtable_time, &write_memtable_timer);
if (!status.ok()) {
// Panic for in-memory corruptions
// Note that existing logic was not sound. Any partial failure writing
Expand All @@ -2828,6 +2853,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
}
SetTickerCount(options_.statistics, SEQUENCE_NUMBER, last_sequence);
}
StartPerfTimer(&pre_post_process_timer);
LogFlush(options_.info_log);
mutex_.Lock();
if (status.ok()) {
Expand Down Expand Up @@ -2855,6 +2881,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
BumpPerfTime(&perf_context.write_pre_and_post_process_time,
&pre_post_process_timer);
return status;
}

Expand Down
22 changes: 20 additions & 2 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ class DBIter: public Iterator {
virtual void SeekToLast();

private:
void FindNextUserEntry(bool skipping);
inline void FindNextUserEntry(bool skipping);
void FindNextUserEntryInternal(bool skipping);
void FindPrevUserEntry();
bool ParseKey(ParsedInternalKey* key);
void MergeValuesNewToOld();
Expand Down Expand Up @@ -191,7 +192,15 @@ void DBIter::Next() {
//
// NOTE: In between, saved_key_ can point to a user key that has
// a delete marker
void DBIter::FindNextUserEntry(bool skipping) {
inline void DBIter::FindNextUserEntry(bool skipping) {
StopWatchNano timer(env_, false);
StartPerfTimer(&timer);
FindNextUserEntryInternal(skipping);
BumpPerfTime(&perf_context.find_next_user_entry_time, &timer);
}

// Actual implementation of DBIter::FindNextUserEntry()
void DBIter::FindNextUserEntryInternal(bool skipping) {
// Loop until we hit an acceptable entry to yield
assert(iter_->Valid());
assert(direction_ == kForward);
Expand Down Expand Up @@ -431,7 +440,10 @@ void DBIter::Seek(const Slice& target) {
saved_key_.clear();
AppendInternalKey(
&saved_key_, ParsedInternalKey(target, sequence_, kValueTypeForSeek));
StopWatchNano internal_seek_timer(env_, false);
StartPerfTimer(&internal_seek_timer);
iter_->Seek(saved_key_);
BumpPerfTime(&perf_context.seek_internal_seek_time, &internal_seek_timer);
if (iter_->Valid()) {
FindNextUserEntry(false /*not skipping */);
} else {
Expand All @@ -442,7 +454,10 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekToFirst() {
direction_ = kForward;
ClearSavedValue();
StopWatchNano internal_seek_timer(env_, false);
StartPerfTimer(&internal_seek_timer);
iter_->SeekToFirst();
BumpPerfTime(&perf_context.seek_internal_seek_time, &internal_seek_timer);
if (iter_->Valid()) {
FindNextUserEntry(false /* not skipping */);
} else {
Expand All @@ -461,7 +476,10 @@ void DBIter::SeekToLast() {

direction_ = kReverse;
ClearSavedValue();
StopWatchNano internal_seek_timer(env_, false);
StartPerfTimer(&internal_seek_timer);
iter_->SeekToLast();
BumpPerfTime(&perf_context.seek_internal_seek_time, &internal_seek_timer);
FindPrevUserEntry();
}

Expand Down
41 changes: 41 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/table.h"
#include "rocksdb/perf_context.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
Expand Down Expand Up @@ -1215,7 +1216,13 @@ TEST(DBTest, IterMulti) {
ASSERT_EQ(IterStatus(iter), "a->va");
iter->Seek("ax");
ASSERT_EQ(IterStatus(iter), "b->vb");

SetPerfLevel(kEnableTime);
perf_context.Reset();
iter->Seek("b");
ASSERT_TRUE((int) perf_context.seek_internal_seek_time > 0);
ASSERT_TRUE((int) perf_context.find_next_user_entry_time > 0);
SetPerfLevel(kDisable);
ASSERT_EQ(IterStatus(iter), "b->vb");
iter->Seek("z");
ASSERT_EQ(IterStatus(iter), "(invalid)");
Expand All @@ -1230,7 +1237,12 @@ TEST(DBTest, IterMulti) {
// Switch from forward to reverse
iter->SeekToFirst();
iter->Next();
SetPerfLevel(kEnableTime);
perf_context.Reset();
iter->Next();
ASSERT_EQ(0, (int) perf_context.seek_internal_seek_time);
ASSERT_TRUE((int) perf_context.find_next_user_entry_time > 0);
SetPerfLevel(kDisable);
iter->Prev();
ASSERT_EQ(IterStatus(iter), "b->vb");

Expand Down Expand Up @@ -1590,22 +1602,42 @@ TEST(DBTest, NumImmutableMemTable) {

std::string big_value(1000000, 'x');
std::string num;
SetPerfLevel(kEnableTime);;

ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "0");
perf_context.Reset();
Get("k1");
ASSERT_EQ(1, (int) perf_context.get_from_memtable_count);

ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "1");
perf_context.Reset();
Get("k1");
ASSERT_EQ(2, (int) perf_context.get_from_memtable_count);
perf_context.Reset();
Get("k2");
ASSERT_EQ(1, (int) perf_context.get_from_memtable_count);

ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value));
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "2");
perf_context.Reset();
Get("k2");
ASSERT_EQ(2, (int) perf_context.get_from_memtable_count);
perf_context.Reset();
Get("k3");
ASSERT_EQ(1, (int) perf_context.get_from_memtable_count);
perf_context.Reset();
Get("k1");
ASSERT_EQ(3, (int) perf_context.get_from_memtable_count);

dbfull()->Flush(FlushOptions());
ASSERT_TRUE(dbfull()->GetProperty("rocksdb.num-immutable-mem-table", &num));
ASSERT_EQ(num, "0");
SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}

Expand All @@ -1614,11 +1646,16 @@ TEST(DBTest, FLUSH) {
Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions();
writeOpt.disableWAL = true;
SetPerfLevel(kEnableTime);;
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
// this will now also flush the last 2 writes
dbfull()->Flush(FlushOptions());
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v1"));

perf_context.Reset();
Get("foo");
ASSERT_TRUE((int) perf_context.get_from_output_files_time > 0);

Reopen();
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v1", Get("bar"));
Expand All @@ -1630,7 +1667,9 @@ TEST(DBTest, FLUSH) {

Reopen();
ASSERT_EQ("v2", Get("bar"));
perf_context.Reset();
ASSERT_EQ("v2", Get("foo"));
ASSERT_TRUE((int) perf_context.get_from_output_files_time > 0);

writeOpt.disableWAL = false;
ASSERT_OK(dbfull()->Put(writeOpt, "bar", "v3"));
Expand All @@ -1642,6 +1681,8 @@ TEST(DBTest, FLUSH) {
// has WAL enabled.
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v3", Get("bar"));

SetPerfLevel(kDisable);
} while (ChangeCompactOptions());
}

Expand Down
Loading

0 comments on commit 3d8ac31

Please sign in to comment.