Skip to content

Commit

Permalink
CompactedDBImpl
Browse files Browse the repository at this point in the history
Summary:
Add a CompactedDBImpl that will enabled when calling OpenForReadOnly()
and the DB only has one level (>0) of files. As a performan comparison,
CuckooTable performs 2.1M/s with CompactedDBImpl vs. 1.78M/s with
ReadOnlyDBImpl.

Test Plan: db_bench

Reviewers: yhchiang, igor, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23553
  • Loading branch information
Lei Jin committed Sep 25, 2014
1 parent f7375f3 commit 3c68006
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 91 deletions.
40 changes: 40 additions & 0 deletions db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,8 @@ class Benchmark {
method = &Benchmark::ReadReverse;
} else if (name == Slice("readrandom")) {
method = &Benchmark::ReadRandom;
} else if (name == Slice("readrandomfast")) {
method = &Benchmark::ReadRandomFast;
} else if (name == Slice("multireadrandom")) {
method = &Benchmark::MultiReadRandom;
} else if (name == Slice("readmissing")) {
Expand Down Expand Up @@ -2071,6 +2073,44 @@ class Benchmark {
thread->stats.AddBytes(bytes);
}

void ReadRandomFast(ThreadState* thread) {
int64_t read = 0;
int64_t found = 0;
ReadOptions options(FLAGS_verify_checksum, true);
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
std::string value;
DB* db = SelectDBWithCfh(thread)->db;

int64_t pot = 1;
while (pot < FLAGS_num) {
pot <<= 1;
}

Duration duration(FLAGS_duration, reads_);
do {
for (int i = 0; i < 100; ++i) {
int64_t key_rand = thread->rand.Next() & (pot - 1);
GenerateKeyFromInt(key_rand, FLAGS_num, &key);
++read;
if (db->Get(options, key, &value).ok()) {
++found;
}
}
thread->stats.FinishedOps(db, 100);
} while (!duration.Done(100));

char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
found, read);

thread->stats.AddMessage(msg);

if (FLAGS_perf_level > 0) {
thread->stats.AddMessage(perf_context.ToString());
}
}

void ReadRandom(ThreadState* thread) {
int64_t read = 0;
int64_t found = 0;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ class DBImpl : public DB {
friend class ForwardIterator;
#endif
friend struct SuperVersion;
friend class CompactedDBImpl;
struct CompactionState;

struct WriteContext;
Expand Down
46 changes: 12 additions & 34 deletions db/db_impl_readonly.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,12 @@
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "db/db_impl_readonly.h"
#include "utilities/compacted_db/compacted_db_impl.h"
#include "db/db_impl.h"

#include <algorithm>
#include <set>
#include <string>
#include <stdint.h>
#include <stdio.h>
#include <vector>
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/merge_context.h"
#include "db/table_cache.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "rocksdb/merge_operator.h"
#include "port/port.h"
#include "table/block.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/build_version.h"
#include "db/db_iter.h"

namespace rocksdb {

Expand Down Expand Up @@ -120,15 +90,23 @@ Status DB::OpenForReadOnly(const Options& options, const std::string& dbname,
DB** dbptr, bool error_if_log_file_exist) {
*dbptr = nullptr;

// Try to first open DB as fully compacted DB
Status s;
#ifndef ROCKSDB_LITE
s = CompactedDBImpl::Open(options, dbname, dbptr);
if (s.ok()) {
return s;
}
#endif

DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;

Status s =
DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
s = DB::OpenForReadOnly(db_options, dbname, column_families, &handles, dbptr);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a
Expand Down
13 changes: 0 additions & 13 deletions db/db_impl_readonly.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,11 @@
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#pragma once
#include "db/db_impl.h"

#include <deque>
#include <set>
#include <vector>
#include <string>
#include "db/dbformat.h"
#include "db/log_writer.h"
#include "db/snapshot.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "port/port.h"

namespace rocksdb {

Expand Down
74 changes: 74 additions & 0 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,80 @@ TEST(DBTest, ReadOnlyDB) {
ASSERT_EQ("v2", Get("bar"));
}

TEST(DBTest, CompactedDB) {
const uint64_t kFileSize = 1 << 20;
Options options;
options.disable_auto_compactions = true;
options.max_mem_compaction_level = 0;
options.write_buffer_size = kFileSize;
options.target_file_size_base = kFileSize;
options.max_bytes_for_level_base = 1 << 30;
options.compression = kNoCompression;
Reopen(&options);
// 1 L0 file, use CompactedDB if max_open_files = -1
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
Flush();
Close();
ASSERT_OK(ReadOnlyReopen(&options));
Status s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported operation in read only mode.");
ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
Close();
options.max_open_files = -1;
ASSERT_OK(ReadOnlyReopen(&options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported in compacted db mode.");
ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
Close();
Reopen(&options);
// Add more L0 files
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
Flush();
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
Flush();
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
Flush();
Close();

ASSERT_OK(ReadOnlyReopen(&options));
// Fallback to read-only DB
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported operation in read only mode.");
Close();

// Full compaction
Reopen(&options);
// Add more keys
ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
db_->CompactRange(nullptr, nullptr);
ASSERT_EQ(3, NumTableFilesAtLevel(1));
Close();

// CompactedDB
ASSERT_OK(ReadOnlyReopen(&options));
s = Put("new", "value");
ASSERT_EQ(s.ToString(),
"Not implemented: Not supported in compacted db mode.");
ASSERT_EQ("NOT_FOUND", Get("abc"));
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
ASSERT_EQ("NOT_FOUND", Get("ccc"));
ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
ASSERT_EQ("NOT_FOUND", Get("ggg"));
ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
ASSERT_EQ("NOT_FOUND", Get("kkk"));
}

// Make sure that when options.block_cache is set, after a new table is
// created its index/filter blocks are added to block cache.
TEST(DBTest, IndexAndFilterBlocksOfNewTableAddedToCache) {
Expand Down
61 changes: 19 additions & 42 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,46 +626,23 @@ void Version::AddIterators(const ReadOptions& read_options,
}
}

// Callback from TableCache::Get()
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
};

namespace version_set {
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
Logger* logger;
Statistics* statistics;
};
} // namespace version_set

// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do
// IO to be certain.Set the status=kFound and value_found=false to let the
// caller know that key may exist but is not there in memory
static void MarkKeyMayExist(void* arg) {
version_set::Saver* s = reinterpret_cast<version_set::Saver*>(arg);
s->state = kFound;
void MarkKeyMayExist(void* arg) {
Version::Saver* s = reinterpret_cast<Version::Saver*>(arg);
s->state = Version::kFound;
if (s->value_found != nullptr) {
*(s->value_found) = false;
}
}

static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v) {
version_set::Saver* s = reinterpret_cast<version_set::Saver*>(arg);
bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v) {
Version::Saver* s = reinterpret_cast<Version::Saver*>(arg);
MergeContext* merge_contex = s->merge_context;
std::string merge_result; // temporary area for merge results later

Expand All @@ -676,42 +653,42 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
if (kNotFound == s->state) {
s->state = kFound;
if (Version::kNotFound == s->state) {
s->state = Version::kFound;
s->value->assign(v.data(), v.size());
} else if (kMerge == s->state) {
} else if (Version::kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = kFound;
s->state = Version::kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
s->state = Version::kCorrupt;
}
} else {
assert(false);
}
return false;

case kTypeDeletion:
if (kNotFound == s->state) {
s->state = kDeleted;
} else if (kMerge == s->state) {
s->state = kFound;
if (Version::kNotFound == s->state) {
s->state = Version::kDeleted;
} else if (Version::kMerge == s->state) {
s->state = Version::kFound;
if (!s->merge_operator->FullMerge(s->user_key, nullptr,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
s->state = Version::kCorrupt;
}
} else {
assert(false);
}
return false;

case kTypeMerge:
assert(s->state == kNotFound || s->state == kMerge);
s->state = kMerge;
assert(s->state == Version::kNotFound || s->state == Version::kMerge);
s->state = Version::kMerge;
merge_contex->PushOperand(v);
return true;

Expand Down Expand Up @@ -779,7 +756,7 @@ void Version::Get(const ReadOptions& options,
Slice user_key = k.user_key();

assert(status->ok() || status->IsMergeInProgress());
version_set::Saver saver;
Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = user_comparator_;
saver.user_key = user_key;
Expand Down
23 changes: 23 additions & 0 deletions db/version_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,33 @@ class Version {
FileMetaData* file;
};

enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
};

// Callback from TableCache::Get()
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
Logger* logger;
Statistics* statistics;
};

private:
friend class Compaction;
friend class VersionSet;
friend class DBImpl;
friend class CompactedDBImpl;
friend class ColumnFamilyData;
friend class CompactionPicker;
friend class LevelCompactionPicker;
Expand Down
Loading

0 comments on commit 3c68006

Please sign in to comment.