From e0b99d4f5db70513b94ab91c594afea272493568 Mon Sep 17 00:00:00 2001 From: Raghav Pisolkar Date: Thu, 4 Sep 2014 10:48:24 -0700 Subject: [PATCH] created a new ReadOptions parameter 'iterate_upper_bound' --- HISTORY.md | 1 + db/c.cc | 7 ++ db/db_impl.cc | 6 +- db/db_iter.cc | 113 ++++++++++++++++++++----------- db/db_iter.h | 5 +- db/db_test.cc | 138 ++++++++++++++++++++++++++++++++++++++ include/rocksdb/c.h | 4 ++ include/rocksdb/options.h | 14 ++++ 8 files changed, 243 insertions(+), 45 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index c6c566ede22..922d3e2c93f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -20,6 +20,7 @@ * Support Multiple DB paths in universal style compactions * Add feature of storing plain table index and bloom filter in SST file. * CompactRange() will never output compacted files to level 0. This used to be the case when all the compaction input files were at level 0. +* Added iterate_upper_bound to define the extent upto which the forward iterator will return entries. This will prevent iterating over delete markers and overwritten entries for edge cases where you want to break out the iterator anyways. This may improve perfomance in case there are a large number of delete markers or overwritten entries. ### Public API changes * DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size diff --git a/db/c.cc b/db/c.cc index 3114f350042..9ea54964602 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1844,6 +1844,13 @@ void rocksdb_readoptions_set_snapshot( opt->rep.snapshot = (snap ? snap->rep : nullptr); } +void rocksdb_readoptions_set_iterate_upper_bound( + rocksdb_readoptions_t* opt, + const char* key, size_t keylen) { + Slice prefix = Slice(key, keylen); + opt->rep.iterate_upper_bound = &prefix; +} + void rocksdb_readoptions_set_read_tier( rocksdb_readoptions_t* opt, int v) { opt->rep.read_tier = static_cast(v); diff --git a/db/db_impl.cc b/db/db_impl.cc index 7c65e9a61d3..f18bb2141eb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3677,7 +3677,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, // TODO(ljin): remove tailing iterator auto iter = new ForwardIterator(this, options, cfd); return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, - kMaxSequenceNumber); + kMaxSequenceNumber, options.iterate_upper_bound); // return new TailingIterator(env_, this, options, cfd); #endif } else { @@ -3733,7 +3733,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, // likely that any iterator pointer is close to the iterator it points to so // that they are likely to be in the same cache line and/or page. ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, *cfd->options(), cfd->user_comparator(), snapshot); + env_, *cfd->options(), cfd->user_comparator(), + snapshot, options.iterate_upper_bound); + Iterator* internal_iter = NewInternalIterator(options, cfd, sv, db_iter->GetArena()); db_iter->SetIterUnderDBIter(internal_iter); diff --git a/db/db_iter.cc b/db/db_iter.cc index 599a56a99c0..bfdcd4edb8c 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -59,7 +59,8 @@ class DBIter: public Iterator { }; DBIter(Env* env, const Options& options, const Comparator* cmp, - Iterator* iter, SequenceNumber s, bool arena_mode) + Iterator* iter, SequenceNumber s, bool arena_mode, + const Slice* iterate_upper_bound = nullptr) : arena_mode_(arena_mode), env_(env), logger_(options.info_log.get()), @@ -70,9 +71,10 @@ class DBIter: public Iterator { direction_(kForward), valid_(false), current_entry_is_merged_(false), - statistics_(options.statistics.get()) { + statistics_(options.statistics.get()), + iterate_upper_bound_(iterate_upper_bound) { RecordTick(statistics_, NO_ITERATORS); - has_prefix_extractor_ = (options.prefix_extractor.get() != nullptr); + prefix_extractor_ = options.prefix_extractor.get(); max_skip_ = options.max_sequential_skip_in_iterations; } virtual ~DBIter() { @@ -132,7 +134,7 @@ class DBIter: public Iterator { } } - bool has_prefix_extractor_; + const SliceTransform* prefix_extractor_; bool arena_mode_; Env* const env_; Logger* logger_; @@ -149,6 +151,7 @@ class DBIter: public Iterator { bool current_entry_is_merged_; Statistics* statistics_; uint64_t max_skip_; + const Slice* iterate_upper_bound_; // No copying allowed DBIter(const DBIter&); @@ -207,36 +210,44 @@ void DBIter::FindNextUserEntryInternal(bool skipping) { uint64_t num_skipped = 0; do { ParsedInternalKey ikey; - if (ParseKey(&ikey) && ikey.sequence <= sequence_) { - if (skipping && - user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { - num_skipped++; // skip this entry - PERF_COUNTER_ADD(internal_key_skipped_count, 1); - } else { - skipping = false; - switch (ikey.type) { - case kTypeDeletion: - // Arrange to skip all upcoming entries for this key since - // they are hidden by this deletion. - saved_key_.SetKey(ikey.user_key); - skipping = true; - num_skipped = 0; - PERF_COUNTER_ADD(internal_delete_skipped_count, 1); - break; - case kTypeValue: - valid_ = true; - saved_key_.SetKey(ikey.user_key); - return; - case kTypeMerge: - // By now, we are sure the current ikey is going to yield a value - saved_key_.SetKey(ikey.user_key); - current_entry_is_merged_ = true; - valid_ = true; - MergeValuesNewToOld(); // Go to a different state machine - return; - default: - assert(false); - break; + + if (ParseKey(&ikey)) { + if (iterate_upper_bound_ != nullptr && + ikey.user_key.compare(*iterate_upper_bound_) >= 0) { + break; + } + + if (ikey.sequence <= sequence_) { + if (skipping && + user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { + num_skipped++; // skip this entry + PERF_COUNTER_ADD(internal_key_skipped_count, 1); + } else { + skipping = false; + switch (ikey.type) { + case kTypeDeletion: + // Arrange to skip all upcoming entries for this key since + // they are hidden by this deletion. + saved_key_.SetKey(ikey.user_key); + skipping = true; + num_skipped = 0; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); + break; + case kTypeValue: + valid_ = true; + saved_key_.SetKey(ikey.user_key); + return; + case kTypeMerge: + // By now, we are sure the current ikey is going to yield a value + saved_key_.SetKey(ikey.user_key); + current_entry_is_merged_ = true; + valid_ = true; + MergeValuesNewToOld(); // Go to a different state machine + return; + default: + assert(false); + break; + } } } } @@ -398,6 +409,7 @@ bool DBIter::FindValueForCurrentKey() { case kTypeDeletion: operands.clear(); last_not_merge_type = kTypeDeletion; + PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeMerge: assert(user_merge_operator_ != nullptr); @@ -407,6 +419,7 @@ bool DBIter::FindValueForCurrentKey() { assert(false); } + PERF_COUNTER_ADD(internal_key_skipped_count, 1); assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0); iter_->Prev(); ++num_skipped; @@ -553,6 +566,20 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) { void DBIter::Seek(const Slice& target) { StopWatch sw(env_, statistics_, DB_SEEK); + // total ordering is not guaranteed if prefix_extractor is set + // hence prefix based seeks will not give correct results + if (iterate_upper_bound_ != nullptr && prefix_extractor_ != nullptr) { + if (!prefix_extractor_->InDomain(*iterate_upper_bound_) || + !prefix_extractor_->InDomain(target) || + prefix_extractor_->Transform(*iterate_upper_bound_).compare( + prefix_extractor_->Transform(target)) != 0) { + status_ = Status::InvalidArgument("read_options.iterate_*_bound " + " and seek target need to have the same prefix."); + valid_ = false; + return; + } + } + saved_key_.Clear(); // now savved_key is used to store internal key. saved_key_.SetInternalKey(target, sequence_); @@ -574,7 +601,7 @@ void DBIter::Seek(const Slice& target) { void DBIter::SeekToFirst() { // Don't use iter_::Seek() if we set a prefix extractor // because prefix seek wiil be used. - if (has_prefix_extractor_) { + if (prefix_extractor_ != nullptr) { max_skip_ = std::numeric_limits::max(); } direction_ = kForward; @@ -595,7 +622,7 @@ void DBIter::SeekToFirst() { void DBIter::SeekToLast() { // Don't use iter_::Seek() if we set a prefix extractor // because prefix seek wiil be used. - if (has_prefix_extractor_) { + if (prefix_extractor_ != nullptr) { max_skip_ = std::numeric_limits::max(); } direction_ = kReverse; @@ -612,9 +639,10 @@ void DBIter::SeekToLast() { Iterator* NewDBIterator(Env* env, const Options& options, const Comparator* user_key_comparator, Iterator* internal_iter, - const SequenceNumber& sequence) { + const SequenceNumber& sequence, + const Slice* iterate_upper_bound) { return new DBIter(env, options, user_key_comparator, internal_iter, sequence, - false); + false, iterate_upper_bound); } ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } @@ -643,13 +671,16 @@ void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const Options& options, const Comparator* user_key_comparator, - const SequenceNumber& sequence) { + const SequenceNumber& sequence, + const Slice* iterate_upper_bound) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); Arena* arena = iter->GetArena(); auto mem = arena->AllocateAligned(sizeof(DBIter)); - DBIter* db_iter = new (mem) - DBIter(env, options, user_key_comparator, nullptr, sequence, true); + DBIter* db_iter = new (mem) DBIter(env, options, user_key_comparator, + nullptr, sequence, true, iterate_upper_bound); + iter->SetDBIter(db_iter); + return iter; } diff --git a/db/db_iter.h b/db/db_iter.h index cb9840324ff..ffea34fa951 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -27,7 +27,8 @@ extern Iterator* NewDBIterator( const Options& options, const Comparator *user_key_comparator, Iterator* internal_iter, - const SequenceNumber& sequence); + const SequenceNumber& sequence, + const Slice* iterate_upper_bound = nullptr); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -68,6 +69,6 @@ class ArenaWrappedDBIter : public Iterator { // Generate the arena wrapped iterator class. extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const Options& options, const Comparator* user_key_comparator, - const SequenceNumber& sequence); + const SequenceNumber& sequence, const Slice* iterate_upper_bound = nullptr); } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index 6295f5921ee..0b0365211d2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -7743,6 +7743,144 @@ TEST(DBTest, TableOptionsSanitizeTest) { ASSERT_TRUE(TryReopen(&options).IsNotSupported()); } +TEST(DBTest, DBIteratorBoundTest) { + Options options; + options.env = env_; + options.create_if_missing = true; + + options.prefix_extractor = nullptr; + DestroyAndReopen(&options); + ASSERT_OK(Put("a", "0")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Put("foo1", "bar1")); + ASSERT_OK(Put("g1", "0")); + + // testing basic case with no iterate_upper_bound and no prefix_extractor + { + ReadOptions ro; + ro.iterate_upper_bound = nullptr; + + std::unique_ptr iter(db_->NewIterator(ro)); + + iter->Seek("foo"); + + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("foo")), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("foo1")), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("g1")), 0); + } + + // testing iterate_upper_bound and forward iterator + // to make sure it stops at bound + { + ReadOptions ro; + // iterate_upper_bound points beyond the last expected entry + ro.iterate_upper_bound = new Slice("foo2"); + + std::unique_ptr iter(db_->NewIterator(ro)); + + iter->Seek("foo"); + + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("foo")), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(("foo1")), 0); + + iter->Next(); + // should stop here... + ASSERT_TRUE(!iter->Valid()); + } + + // prefix is the first letter of the key + options.prefix_extractor.reset(NewFixedPrefixTransform(1)); + + DestroyAndReopen(&options); + ASSERT_OK(Put("a", "0")); + ASSERT_OK(Put("foo", "bar")); + ASSERT_OK(Put("foo1", "bar1")); + ASSERT_OK(Put("g1", "0")); + + // testing with iterate_upper_bound and prefix_extractor + // Seek target and iterate_upper_bound are not is same prefix + // This should be an error + { + ReadOptions ro; + ro.iterate_upper_bound = new Slice("g1"); + + std::unique_ptr iter(db_->NewIterator(ro)); + + iter->Seek("foo"); + + ASSERT_TRUE(!iter->Valid()); + ASSERT_TRUE(iter->status().IsInvalidArgument()); + } + + // testing that iterate_upper_bound prevents iterating over deleted items + // if the bound has already reached + { + options.prefix_extractor = nullptr; + DestroyAndReopen(&options); + ASSERT_OK(Put("a", "0")); + ASSERT_OK(Put("b", "0")); + ASSERT_OK(Put("b1", "0")); + ASSERT_OK(Put("c", "0")); + ASSERT_OK(Put("d", "0")); + ASSERT_OK(Put("e", "0")); + ASSERT_OK(Delete("c")); + ASSERT_OK(Delete("d")); + + // base case with no bound + ReadOptions ro; + ro.iterate_upper_bound = nullptr; + + std::unique_ptr iter(db_->NewIterator(ro)); + + iter->Seek("b"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("b")), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(("b1")), 0); + + perf_context.Reset(); + iter->Next(); + + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(static_cast(perf_context.internal_delete_skipped_count), 2); + + // now testing with iterate_bound + ro.iterate_upper_bound = new Slice("c"); + + iter.reset(db_->NewIterator(ro)); + + perf_context.Reset(); + + iter->Seek("b"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(Slice("b")), 0); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(("b1")), 0); + + iter->Next(); + // the iteration should stop as soon as the the bound key is reached + // even though the key is deleted + // hence internal_delete_skipped_count should be 0 + ASSERT_TRUE(!iter->Valid()); + ASSERT_EQ(static_cast(perf_context.internal_delete_skipped_count), 0); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index c54e6707f05..e4b1bb75365 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -698,6 +698,10 @@ extern void rocksdb_readoptions_set_fill_cache( extern void rocksdb_readoptions_set_snapshot( rocksdb_readoptions_t*, const rocksdb_snapshot_t*); +extern void rocksdb_readoptions_set_iterate_upper_bound( + rocksdb_readoptions_t*, + const char* key, + size_t keylen); extern void rocksdb_readoptions_set_read_tier( rocksdb_readoptions_t*, int); extern void rocksdb_readoptions_set_tailing( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 11d976fb2b0..fbb3b6ddb4a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -903,6 +903,18 @@ struct ReadOptions { // ! DEPRECATED // const Slice* prefix; + // "iterate_upper_bound" defines the extent upto which the forward iterator + // can returns entries. Once the bound is reached, Valid() will be false. + // "iterate_upper_bound" is exclusive ie the bound value is + // not a valid entry. If iterator_extractor is not null, the Seek target + // and iterator_upper_bound need to have the same prefix. + // This is because ordering is not guaranteed outside of prefix domain. + // There is no lower bound on the iterator. If needed, that can be easily + // implemented + // + // Default: nullptr + const Slice* iterate_upper_bound; + // Specify if this read request should process data that ALREADY // resides on a particular cache. If the required data is not // found at the specified cache, then Status::Incomplete is returned. @@ -926,6 +938,7 @@ struct ReadOptions { : verify_checksums(true), fill_cache(true), snapshot(nullptr), + iterate_upper_bound(nullptr), read_tier(kReadAllTier), tailing(false), total_order_seek(false) {} @@ -933,6 +946,7 @@ struct ReadOptions { : verify_checksums(cksum), fill_cache(cache), snapshot(nullptr), + iterate_upper_bound(nullptr), read_tier(kReadAllTier), tailing(false), total_order_seek(false) {}