Skip to content

Commit

Permalink
chore(server): snapshot stats fix (#653)
Browse files Browse the repository at this point in the history
Fixed stats accounting. Now loop_serialized + side_saved should correspond to number of saved items in a shard.

Signed-off-by: Roman Gershman <[email protected]>
  • Loading branch information
romange authored Jan 8, 2023
1 parent 2af9843 commit 5bc2167
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 33 deletions.
10 changes: 5 additions & 5 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,12 @@ RdbSerializer::RdbSerializer(CompressionMode compression_mode)
}

RdbSerializer::~RdbSerializer() {
VLOG(1) << "compression mode: " << uint32_t(compression_mode_);
VLOG(2) << "compression mode: " << uint32_t(compression_mode_);
if (compression_stats_) {
VLOG(1) << "compression not effective: " << compression_stats_->compression_no_effective;
VLOG(1) << "small string none compression applied: " << compression_stats_->small_str_count;
VLOG(1) << "compression failed: " << compression_stats_->compression_failed;
VLOG(1) << "compressed blobs:" << compression_stats_->compressed_blobs;
VLOG(2) << "compression not effective: " << compression_stats_->compression_no_effective;
VLOG(2) << "small string none compression applied: " << compression_stats_->small_str_count;
VLOG(2) << "compression failed: " << compression_stats_->compression_failed;
VLOG(2) << "compressed blobs:" << compression_stats_->compressed_blobs;
}
}

Expand Down
41 changes: 17 additions & 24 deletions src/server/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ void SliceSnapshot::Stop() {
}

void SliceSnapshot::Cancel() {
VLOG(1) << "SliceSnapshot::Cancel";

CloseRecordChannel();
if (journal_cb_id_) {
db_slice_->shard_owner()->journal()->UnregisterOnChange(journal_cb_id_);
Expand Down Expand Up @@ -125,7 +127,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
current_db_ = db_indx;
}

VLOG(1) << "Start traversing " << pt->size() << " items";
VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
do {
if (cll->IsCancelled())
return;
Expand All @@ -135,12 +137,12 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
cursor = next;
FlushDefaultBuffer(false);

if (stats_.serialized >= last_yield + 100) {
if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
fibers_ext::Yield();
DVLOG(2) << "After sleep";

last_yield = stats_.serialized;
last_yield = stats_.loop_serialized;
// flush in case other fibers (writes commands that pushed previous values)
// filled the buffer.
FlushDefaultBuffer(false);
Expand All @@ -160,20 +162,12 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
CHECK(!default_serializer_->SendFullSyncCut());
FlushDefaultBuffer(true);

VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << stats_.serialized << "/"
<< stats_.side_saved << "/" << stats_.savecb_calls;
// serialized + side_saved must be equal to the total saved.
VLOG(1) << "Exit SnapshotSerializer (loop_serialized/side_saved/cbcalls): "
<< stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls;
}

bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
// if we touched that physical bucket - skip it.
// We must make sure we TraverseBucket exactly once for each physical bucket.
// This test is the first one because it's likely to be the fastest one:
// physical_mask_ is likely to be loaded in L1 and bucket_id() does not require accesing the
// prime_table.
/*if (physical_mask_.test(it.bucket_id())) {
return false;
}*/

++stats_.savecb_calls;

uint64_t v = it.GetVersion();
Expand All @@ -185,7 +179,7 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
return false;
}

++stats_.serialized += SerializeBucket(current_db_, it);
stats_.loop_serialized += SerializeBucket(current_db_, it);
return false;
}

Expand Down Expand Up @@ -216,6 +210,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite

if (tmp_serializer) {
PushBytesToChannel(db_index, tmp_serializer->Flush());
VLOG(1) << "Pushed " << result << " entries via tmp_serializer";
}

return result;
Expand All @@ -237,7 +232,13 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
}

void SliceSnapshot::PushBytesToChannel(DbIndex db_index, io::Bytes bytes) {
dest_->Push(GetDbRecord(db_index, std::string{io::View(bytes)}));
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;

stats_.channel_bytes += bytes.size();
DbRecord db_rec{.db_index = db_index, .id = id, .value = string(io::View(bytes))};

dest_->Push(std::move(db_rec));
}

bool SliceSnapshot::FlushDefaultBuffer(bool force) {
Expand Down Expand Up @@ -310,12 +311,4 @@ void SliceSnapshot::CloseRecordChannel() {
}
}

SliceSnapshot::DbRecord SliceSnapshot::GetDbRecord(DbIndex db_index, std::string value) {
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;

stats_.channel_bytes += value.size();
return DbRecord{.db_index = db_index, .id = id, .value = std::move(value)};
}

} // namespace dfly
5 changes: 1 addition & 4 deletions src/server/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,6 @@ class SliceSnapshot {
// Return if flushed.
bool FlushDefaultBuffer(bool force);

// Convert value into DbRecord.
DbRecord GetDbRecord(DbIndex db_index, std::string value);

public:
uint64_t snapshot_version() const {
return snapshot_version_;
Expand Down Expand Up @@ -156,7 +153,7 @@ class SliceSnapshot {

struct Stats {
size_t channel_bytes = 0;
size_t serialized = 0, skipped = 0, side_saved = 0;
size_t loop_serialized = 0, skipped = 0, side_saved = 0;
size_t savecb_calls = 0;
} stats_;
};
Expand Down

0 comments on commit 5bc2167

Please sign in to comment.