diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index edc2fb6a82c1..41477539254a 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -1,6 +1,7 @@ add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc external_alloc.cc interpreter.cc json_object.cc mi_memory_resource.cc sds_utils.cc - segment_allocator.cc simple_lru_counter.cc small_string.cc tx_queue.cc dense_set.cc + segment_allocator.cc simple_lru_counter.cc small_string.cc sorted_map.cc + tx_queue.cc dense_set.cc string_set.cc string_map.cc detail/bitpacking.cc) cxx_link(dfly_core base query_parser absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules @@ -19,5 +20,6 @@ cxx_test(json_test dfly_core TRDP::jsoncons LABELS DFLY) cxx_test(simple_lru_counter_test dfly_core LABELS DFLY) cxx_test(string_set_test dfly_core LABELS DFLY) cxx_test(string_map_test dfly_core LABELS DFLY) +cxx_test(sorted_map_test dfly_core LABELS DFLY) add_subdirectory(search) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 7ef41c059635..e77acc752363 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -25,6 +25,7 @@ extern "C" { #include "base/logging.h" #include "base/pod_array.h" #include "core/detail/bitpacking.h" +#include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" @@ -46,14 +47,6 @@ size_t QlMAllocSize(quicklist* ql) { return res + ql->count * 16; // we account for each member 16 bytes. } -// Approximated dictionary size. -size_t DictMallocSize(dict* d) { - size_t res = zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) + - znallocx(sizeof(dict)); - - return res + dictSize(d) * 16; // approximation. -} - inline void FreeObjSet(unsigned encoding, void* ptr, MemoryResource* mr) { switch (encoding) { case kEncodingStrMap: { @@ -107,8 +100,8 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) { case OBJ_ENCODING_LISTPACK: return lpBytes(reinterpret_cast(ptr)); case OBJ_ENCODING_SKIPLIST: { - zset* zs = (zset*)ptr; - return DictMallocSize(zs->dict); + detail::SortedMap* ss = (detail::SortedMap*)ptr; + return ss->MallocSize(); // DictMallocSize(zs->dict); } } LOG(DFATAL) << "Unknown set encoding type " << encoding; @@ -134,11 +127,10 @@ inline void FreeObjHash(unsigned encoding, void* ptr) { } inline void FreeObjZset(unsigned encoding, void* ptr) { - zset* zs = (zset*)ptr; + detail::SortedMap* zs = (detail::SortedMap*)ptr; switch (encoding) { case OBJ_ENCODING_SKIPLIST: - zs = (zset*)ptr; - zsetFree(zs); + delete zs; break; case OBJ_ENCODING_LISTPACK: zfree(ptr); @@ -238,8 +230,8 @@ size_t RobjWrapper::Size() const { case OBJ_ZSET: { switch (encoding_) { case OBJ_ENCODING_SKIPLIST: { - zset* zs = (zset*)inner_obj_; - return zs->zsl->length; + SortedMap* ss = (SortedMap*)inner_obj_; + return ss->Size(); } case OBJ_ENCODING_LISTPACK: return lpLength((uint8_t*)inner_obj_) / 2; @@ -427,13 +419,9 @@ int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, do * becomes too long *before* executing zzlInsert. */ if (zl_len + 1 > server.zset_max_listpack_entries || sdslen(ele) > server.zset_max_listpack_value || !lpSafeToAdd(lp, sdslen(ele))) { - robj self{.type = type_, - .encoding = encoding_, - .lru = 0, - .refcount = OBJ_STATIC_REFCOUNT, - .ptr = inner_obj_}; - zsetConvert(&self, OBJ_ENCODING_SKIPLIST); - inner_obj_ = self.ptr; + unique_ptr ss = SortedMap::FromListPack(lp); + lpFree(lp); + inner_obj_ = ss.release(); encoding_ = OBJ_ENCODING_SKIPLIST; } else { inner_obj_ = zzlInsert(lp, ele, score); @@ -449,18 +437,8 @@ int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, do } CHECK_EQ(encoding_, OBJ_ENCODING_SKIPLIST); - - // TODO: to factor out OBJ_ENCODING_SKIPLIST functionality into a separate class. - robj self{.type = type_, - .encoding = encoding_, - .lru = 0, - .refcount = OBJ_STATIC_REFCOUNT, - .ptr = inner_obj_}; - - int res = zsetAdd(&self, score, ele, in_flags, out_flags, newscore); - inner_obj_ = self.ptr; - encoding_ = self.encoding; - return res; + SortedMap* ss = (SortedMap*)inner_obj_; + return ss->Add(score, ele, in_flags, out_flags, newscore); } bool RobjWrapper::Reallocate(MemoryResource* mr) { diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 035c99565da6..40e3d80c86cf 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -6,7 +6,7 @@ #include -#ifdef __clang__ +#if defined(__clang__) && defined(__APPLE__) #include namespace PMR_NS = std::experimental::pmr; #else @@ -48,6 +48,10 @@ class RobjWrapper { void SetString(std::string_view s, MemoryResource* mr); void Init(unsigned type, unsigned encoding, void* inner); + // Equivalent to zsetAdd + int AddZsetMember(std::string_view member, double score, int in_flags, int* out_flags, + double* newscore); + unsigned type() const { return type_; } @@ -440,5 +444,3 @@ class CompactObjectView { }; } // namespace dfly - -#undef PMR_NS diff --git a/src/core/dash.h b/src/core/dash.h index 04ee082741bf..c225be81a5a4 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -3,7 +3,7 @@ // #pragma once -#ifdef __clang__ +#if defined(__clang__) && defined(__APPLE__) #include namespace PMR_NS = std::experimental::pmr; #else diff --git a/src/core/dense_set.h b/src/core/dense_set.h index a23aa4e6d9fb..45aa8c51e318 100644 --- a/src/core/dense_set.h +++ b/src/core/dense_set.h @@ -8,7 +8,7 @@ #include #include -#ifdef __clang__ +#if defined(__clang__) && defined(__APPLE__) #include namespace PMR_NS = std::experimental::pmr; #else diff --git a/src/core/mi_memory_resource.h b/src/core/mi_memory_resource.h index 99fd196154dc..4974833d7ab7 100644 --- a/src/core/mi_memory_resource.h +++ b/src/core/mi_memory_resource.h @@ -6,7 +6,7 @@ #include -#ifdef __clang__ +#if defined(__clang__) && defined(__APPLE__) #include namespace PMR_NS = std::experimental::pmr; #else diff --git a/src/core/sorted_map.cc b/src/core/sorted_map.cc new file mode 100644 index 000000000000..1a3c35049628 --- /dev/null +++ b/src/core/sorted_map.cc @@ -0,0 +1,407 @@ +// Copyright 2023, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/sorted_map.h" + +#include + +extern "C" { +#include "redis/listpack.h" +#include "redis/redis_aux.h" +#include "redis/util.h" +#include "redis/zmalloc.h" +} + +#include "base/logging.h" + +using namespace std; +namespace dfly { +namespace detail { + +namespace { + +// Approximated dictionary size. +size_t DictMallocSize(dict* d) { + size_t res = zmalloc_usable_size(d->ht_table[0]) + zmalloc_usable_size(d->ht_table[1]) + + znallocx(sizeof(dict)); + + return res + dictSize(d) * 16; // approximation. +} + +unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) { + unsigned char* sptr; + char scorebuf[128]; + int scorelen; + + scorelen = d2string(scorebuf, sizeof(scorebuf), score); + if (eptr == NULL) { + zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele)); + zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen); + } else { + /* Insert member before the element 'eptr'. */ + zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr); + + /* Insert score after the member. */ + zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL); + } + return zl; +} + +inline zskiplistNode* Next(bool reverse, zskiplistNode* ln) { + return reverse ? ln->backward : ln->level[0].forward; +} + +inline bool IsUnder(bool reverse, double score, const zrangespec& spec) { + return reverse ? zslValueGteMin(score, &spec) : zslValueLteMax(score, &spec); +} + +} // namespace + +SortedMap::SortedMap() { + dict_ = dictCreate(&zsetDictType); + zsl_ = zslCreate(); +} + +SortedMap::~SortedMap() { + dictRelease(dict_); + zslFree(zsl_); +} + +size_t SortedMap::MallocSize() const { + // TODO: introduce a proper malloc usage tracking. + return DictMallocSize(dict_) + zmalloc_size(zsl_); +} + +bool SortedMap::Insert(double score, sds member) { + zskiplistNode* znode = zslInsert(zsl_, score, member); + int ret = dictAdd(dict_, member, &znode->score); + return ret == DICT_OK; +} + +int SortedMap::Add(double score, sds ele, int in_flags, int* out_flags, double* newscore) { + zskiplistNode* znode; + + /* Turn options into simple to check vars. */ + const bool incr = (in_flags & ZADD_IN_INCR) != 0; + const bool nx = (in_flags & ZADD_IN_NX) != 0; + const bool xx = (in_flags & ZADD_IN_XX) != 0; + const bool gt = (in_flags & ZADD_IN_GT) != 0; + const bool lt = (in_flags & ZADD_IN_LT) != 0; + + *out_flags = 0; /* We'll return our response flags. */ + double curscore; + + dictEntry* de = dictFind(dict_, ele); + if (de != NULL) { + /* NX? Return, same element already exists. */ + if (nx) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + curscore = *(double*)dictGetVal(de); + + /* Prepare the score for the increment if needed. */ + if (incr) { + score += curscore; + if (isnan(score)) { + *out_flags |= ZADD_OUT_NAN; + return 0; + } + } + + /* GT/LT? Only update if score is greater/less than current. */ + if ((lt && score >= curscore) || (gt && score <= curscore)) { + *out_flags |= ZADD_OUT_NOP; + return 1; + } + + if (newscore) + *newscore = score; + + /* Remove and re-insert when score changes. */ + if (score != curscore) { + znode = zslUpdateScore(zsl_, curscore, ele, score); + /* Note that we did not removed the original element from + * the hash table representing the sorted set, so we just + * update the score. */ + dictGetVal(de) = &znode->score; /* Update score ptr. */ + *out_flags |= ZADD_OUT_UPDATED; + } + return 1; + } else if (!xx) { + ele = sdsdup(ele); + znode = zslInsert(zsl_, score, ele); + CHECK_EQ(DICT_OK, dictAdd(dict_, ele, &znode->score)); + + *out_flags |= ZADD_OUT_ADDED; + if (newscore) + *newscore = score; + return 1; + } + + *out_flags |= ZADD_OUT_NOP; + return 1; +} + +// taken from zsetConvert +unique_ptr SortedMap::FromListPack(const uint8_t* lp) { + uint8_t* zl = (uint8_t*)lp; + unsigned char *eptr, *sptr; + unsigned char* vstr; + unsigned int vlen; + long long vlong; + sds ele; + + unique_ptr zs(new SortedMap()); + + eptr = lpSeek(zl, 0); + if (eptr != NULL) { + sptr = lpNext(zl, eptr); + CHECK(sptr != NULL); + } + + while (eptr != NULL) { + double score = zzlGetScore(sptr); + vstr = lpGetValue(eptr, &vlen, &vlong); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen((char*)vstr, vlen); + + CHECK(zs->Insert(score, ele)); + zzlNext(zl, &eptr, &sptr); + } + + return zs; +} + +// taken from zsetConvert +uint8_t* SortedMap::ToListPack() const { + uint8_t* lp = lpNew(0); + + /* Approach similar to zslFree(), since we want to free the skiplist at + * the same time as creating the listpack. */ + zskiplistNode* node = zsl_->header->level[0].forward; + + while (node) { + lp = zzlInsertAt(lp, NULL, node->ele, node->score); + node = node->level[0].forward; + } + + return lp; +} + +// returns true if the element was deleted. +bool SortedMap::Delete(sds ele) { + // Taken from zsetRemoveFromSkiplist. + + dictEntry* de = dictUnlink(dict_, ele); + if (de == NULL) + return false; + + /* Get the score in order to delete from the skiplist later. */ + double score = *(double*)dictGetVal(de); + + /* Delete from the hash table and later from the skiplist. + * Note that the order is important: deleting from the skiplist + * actually releases the SDS string representing the element, + * which is shared between the skiplist and the hash table, so + * we need to delete from the skiplist as the final step. */ + dictFreeUnlinkedEntry(dict_, de); + if (htNeedsResize(dict_)) + dictResize(dict_); + + /* Delete from skiplist. */ + int retval = zslDelete(zsl_, score, ele, NULL); + DCHECK(retval); + + return true; +} + +std::optional SortedMap::GetScore(sds ele) const { + dictEntry* de = dictFind(dict_, ele); + if (de == NULL) + return std::nullopt; + + return *(double*)dictGetVal(de); +} + +std::optional SortedMap::GetRank(sds ele, bool reverse) const { + dictEntry* de = dictFind(dict_, ele); + if (de == NULL) + return nullopt; + + double score = *(double*)dictGetVal(de); + unsigned rank = zslGetRank(zsl_, score, ele); + + /* Existing elements always have a rank. */ + DCHECK(rank != 0); + if (reverse) + return zsl_->length - rank; + else + return rank - 1; +} + +auto SortedMap::GetRange(const zrangespec& range, unsigned offset, unsigned limit, + bool reverse) const -> ScoredArray { + /* If reversed, get the last node in range as starting point. */ + zskiplistNode* ln; + + if (reverse) { + ln = zslLastInRange(zsl_, &range); + } else { + ln = zslFirstInRange(zsl_, &range); + } + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (ln && offset--) { + ln = Next(reverse, ln); + } + + ScoredArray result; + while (ln && limit--) { + /* Abort when the node is no longer in range. */ + if (!IsUnder(reverse, ln->score, range)) + break; + + result.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); + + /* Move to next node */ + ln = Next(reverse, ln); + } + return result; +} + +auto SortedMap::GetLexRange(const zlexrangespec& range, unsigned offset, unsigned limit, + bool reverse) const -> ScoredArray { + zskiplistNode* ln; + /* If reversed, get the last node in range as starting point. */ + if (reverse) { + ln = zslLastInLexRange(zsl_, &range); + } else { + ln = zslFirstInLexRange(zsl_, &range); + } + + /* If there is an offset, just traverse the number of elements without + * checking the score because that is done in the next loop. */ + while (ln && offset--) { + ln = Next(reverse, ln); + } + + ScoredArray result; + while (ln && limit--) { + /* Abort when the node is no longer in range. */ + if (reverse) { + if (!zslLexValueGteMin(ln->ele, &range)) + break; + } else { + if (!zslLexValueLteMax(ln->ele, &range)) + break; + } + + result.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); + + /* Move to next node */ + ln = Next(reverse, ln); + } + + return result; +} + +auto SortedMap::PopTopScores(unsigned count, bool reverse) -> ScoredArray { + zskiplistNode* ln; + if (reverse) { + ln = zsl_->tail; + } else { + ln = zsl_->header->level[0].forward; + } + + ScoredArray result; + while (ln && count--) { + result.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); + + /* we can delete the element now */ + CHECK(Delete(ln->ele)); + + ln = Next(reverse, ln); + } + return result; +} + +size_t SortedMap::Count(const zrangespec& range) const { + /* Find first element in range */ + zskiplistNode* zn = zslFirstInRange(zsl_, &range); + + /* Use rank of first element, if any, to determine preliminary count */ + if (zn == NULL) + return 0; + + unsigned long rank = zslGetRank(zsl_, zn->score, zn->ele); + size_t count = (zsl_->length - (rank - 1)); + + /* Find last element in range */ + zn = zslLastInRange(zsl_, &range); + + /* Use rank of last element, if any, to determine the actual count */ + if (zn != NULL) { + rank = zslGetRank(zsl_, zn->score, zn->ele); + count -= (zsl_->length - rank); + } + + return count; +} + +size_t SortedMap::LexCount(const zlexrangespec& range) const { + /* Find first element in range */ + zskiplistNode* zn = zslFirstInLexRange(zsl_, &range); + + /* Use rank of first element, if any, to determine preliminary count */ + if (zn == NULL) + return 0; + + unsigned long rank = zslGetRank(zsl_, zn->score, zn->ele); + size_t count = (zsl_->length - (rank - 1)); + + /* Find last element in range */ + zn = zslLastInLexRange(zsl_, &range); + + /* Use rank of last element, if any, to determine the actual count */ + if (zn != NULL) { + rank = zslGetRank(zsl_, zn->score, zn->ele); + count -= (zsl_->length - rank); + } + return count; +} + +bool SortedMap::Iterate(unsigned start_rank, unsigned len, bool reverse, + absl::FunctionRef cb) const { + zskiplistNode* ln; + + /* Check if starting point is trivial, before doing log(N) lookup. */ + if (reverse) { + ln = zsl_->tail; + unsigned long llen = zsl_->length; + if (start_rank > 0) + ln = zslGetElementByRank(zsl_, llen - start_rank); + } else { + ln = zsl_->header->level[0].forward; + if (start_rank > 0) + ln = zslGetElementByRank(zsl_, start_rank + 1); + } + + bool success = true; + while (success && len--) { + DCHECK(ln != NULL); + success = cb(ln->ele, ln->score); + ln = reverse ? ln->backward : ln->level[0].forward; + if (!ln) + break; + } + return success; +} + +} // namespace detail +} // namespace dfly diff --git a/src/core/sorted_map.h b/src/core/sorted_map.h new file mode 100644 index 000000000000..60668a8dfd7e --- /dev/null +++ b/src/core/sorted_map.h @@ -0,0 +1,103 @@ +// Copyright 2023, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include +#include +#include +#include + +extern "C" { +#include "redis/dict.h" +#include "redis/zset.h" +} + +namespace dfly { +namespace detail { + +/** + * @brief SortedMap is a sorted map implementation based on zset.h. It holds unique strings that + * are ordered by score and lexicographically. The score is a double value and has higher priority. + * The map is implemented as a skip list and a hash table. For more details see + * zset.h and t_zset.c files in Redis. + */ +class SortedMap { + public: + using ScoredMember = std::pair; + using ScoredArray = std::vector; + + SortedMap(); + SortedMap(const SortedMap&) = delete; + SortedMap& operator=(const SortedMap&) = delete; + + SortedMap(SortedMap&& other) noexcept = default; + SortedMap& operator=(SortedMap&& other) noexcept = default; + + ~SortedMap(); + + // The ownership for the returned SortedMap stays with the caller + static std::unique_ptr FromListPack(const uint8_t* lp); + + size_t Size() const { + return zsl_->length; + } + + bool Reserve(size_t sz) { + return dictExpand(dict_, sz) == DICT_OK; + } + + // Interface equivalent to zsetAdd. + int Add(double score, sds ele, int in_flags, int* out_flags, double* newscore); + + bool Insert(double score, sds member); + + uint8_t* ToListPack() const; + size_t MallocSize() const; + + dict* GetDict() const { + return dict_; + } + + size_t DeleteRangeByRank(unsigned start, unsigned end) { + return zslDeleteRangeByRank(zsl_, start + 1, end + 1, dict_); + } + + size_t DeleteRangeByScore(const zrangespec& range) { + return zslDeleteRangeByScore(zsl_, &range, dict_); + } + + size_t DeleteRangeByLex(const zlexrangespec& range) { + return zslDeleteRangeByLex(zsl_, &range, dict_); + } + + // returns true if the element was deleted. + bool Delete(sds ele); + + std::optional GetScore(sds ele) const; + std::optional GetRank(sds ele, bool reverse) const; + + ScoredArray GetRange(const zrangespec& range, unsigned offset, unsigned limit, + bool reverse) const; + ScoredArray GetLexRange(const zlexrangespec& range, unsigned offset, unsigned limit, + bool reverse) const; + + ScoredArray PopTopScores(unsigned count, bool reverse); + size_t Count(const zrangespec& range) const; + size_t LexCount(const zlexrangespec& range) const; + + // Runs cb for each element in the range [start_rank, start_rank + len). + // Stops iteration if cb returns false. Returns false in this case. + bool Iterate(unsigned start_rank, unsigned len, bool reverse, + absl::FunctionRef cb) const; + + private: + dict* dict_ = nullptr; + zskiplist* zsl_ = nullptr; +}; + +} // namespace detail +} // namespace dfly diff --git a/src/core/sorted_map_test.cc b/src/core/sorted_map_test.cc new file mode 100644 index 000000000000..a5913df5d413 --- /dev/null +++ b/src/core/sorted_map_test.cc @@ -0,0 +1,56 @@ +// Copyright 2023, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include +#include + +#include "base/gtest.h" +#include "base/logging.h" +#include "core/mi_memory_resource.h" + +extern "C" { +#include "redis/zmalloc.h" +#include "redis/zset.h" +} + +namespace dfly { + +// TODO: to actually add tests covering sorted_map. +class SortedMapTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + // configure redis lib zmalloc which requires mimalloc heap to work. + auto* tlh = mi_heap_get_backing(); + init_zmalloc_threadlocal(tlh); + } + + void AddMember(zskiplist* zsl, double score, const char* str) { + zslInsert(zsl, score, sdsnew(str)); + } +}; + +// not a real test, just to see how much memory is used by zskiplist. +TEST_F(SortedMapTest, MemoryUsage) { + zskiplist* zsl = zslCreate(); + LOG(INFO) << "zskiplist before: " << zmalloc_used_memory_tl << " bytes"; + + for (int i = 0; i < 10'000; ++i) { + AddMember(zsl, i, "fooba"); + } + LOG(INFO) << zmalloc_used_memory_tl << " bytes"; + zslFree(zsl); + + LOG(INFO) << "zskiplist after: " << zmalloc_used_memory_tl << " bytes"; + MiMemoryResource mi_alloc(mi_heap_get_backing()); + using AllocType = PMR_NS::polymorphic_allocator>; + AllocType alloc(&mi_alloc); + absl::btree_map, AllocType> btree(alloc); + LOG(INFO) << "btree before: " << zmalloc_used_memory_tl + mi_alloc.used() << " bytes"; + for (int i = 0; i < 10000; ++i) { + btree.emplace(i, sdsnew("fooba")); + } + LOG(INFO) << "btree after: " << zmalloc_used_memory_tl + mi_alloc.used() << " bytes"; +} + +} // namespace dfly diff --git a/src/redis/geo.c b/src/redis/geo.c index 9fd65354299f..6698cde4ee69 100644 --- a/src/redis/geo.c +++ b/src/redis/geo.c @@ -99,17 +99,6 @@ int decodeGeohash(double bits, double *xy) { } -/* Input Argument Helper */ -/* Decode lat/long from a zset member's score. - * Returns C_OK on successful decoding, otherwise C_ERR is returned. */ -int longLatFromMember(robj *zobj, robj *member, double *xy) { - double score = 0; - - if (zsetScore(zobj, member->ptr, &score) == C_ERR) return C_ERR; - if (!decodeGeohash(score, xy)) return C_ERR; - return C_OK; -} - /* Helper function for geoGetPointsInRange(): given a sorted set score * representing a point, and a GeoShape, checks if the point is within the search area. * @@ -141,6 +130,8 @@ int geoWithinShape(GeoShape *shape, double score, double *xy, double *distance) return C_OK; } +#if 0 + /* Query a Redis sorted set to extract all the elements between 'min' and * 'max', appending them into the array of geoPoint structures 'geoArray'. * The command returns the number of elements added to the array. @@ -317,7 +308,6 @@ int membersOfAllNeighbors(robj *zobj, const GeoHashRadius *n, GeoShape *shape, g return count; } -#if 0 /* Sort comparators for qsort() */ static int sort_gp_asc(const void *a, const void *b) { const struct geoPoint *gpa = a, *gpb = b; diff --git a/src/redis/object.c b/src/redis/object.c index a97eb61dd67d..cdcd750ca7c9 100644 --- a/src/redis/object.c +++ b/src/redis/object.c @@ -313,23 +313,6 @@ void freeSetObject(robj *o) { } } -void freeZsetObject(robj *o) { - zset *zs; - switch (o->encoding) { - case OBJ_ENCODING_SKIPLIST: - zs = o->ptr; - dictRelease(zs->dict); - zslFree(zs->zsl); - zfree(zs); - break; - case OBJ_ENCODING_LISTPACK: - zfree(o->ptr); - break; - default: - serverPanic("Unknown sorted set encoding"); - } -} - void freeHashObject(robj *o) { switch (o->encoding) { case OBJ_ENCODING_HT: @@ -374,8 +357,7 @@ void decrRefCount(robj *o) { switch(o->type) { case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; - case OBJ_SET: freeSetObject(o); break; - case OBJ_ZSET: freeZsetObject(o); break; + case OBJ_SET: freeSetObject(o); break; case OBJ_HASH: freeHashObject(o); break; case OBJ_MODULE: serverPanic("Unsupported OBJ_MODULE type"); @@ -454,32 +436,6 @@ void dismissSetObject(robj *o, size_t size_hint) { } } -/* See dismissObject() */ -void dismissZsetObject(robj *o, size_t size_hint) { - if (o->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = o->ptr; - zskiplist *zsl = zs->zsl; - serverAssert(zsl->length != 0); - /* We iterate all nodes only when average member size is bigger than a - * page size, and there's a high chance we'll actually dismiss something. */ - if (size_hint / zsl->length >= server.page_size) { - zskiplistNode *zn = zsl->tail; - while (zn != NULL) { - dismissSds(zn->ele); - zn = zn->backward; - } - } - - /* Dismiss hash table memory. */ - dict *d = zs->dict; - dismissMemory(d->ht_table[0], DICTHT_SIZE(d->ht_size_exp[0])*sizeof(dictEntry*)); - dismissMemory(d->ht_table[1], DICTHT_SIZE(d->ht_size_exp[1])*sizeof(dictEntry*)); - } else if (o->encoding == OBJ_ENCODING_LISTPACK) { - dismissMemory(o->ptr, lpBytes((unsigned char*)o->ptr)); - } else { - serverPanic("Unknown zset encoding type"); - } -} /* See dismissObject() */ void dismissHashObject(robj *o, size_t size_hint) { @@ -735,8 +691,6 @@ size_t streamRadixTreeMemoryUsage(rax *rax) { return size; } -#endif - /* Returns the size in bytes consumed by the key's value in RAM. * Note that the returned value is just an approximation, especially in the * case of aggregated data types where only "sample_size" elements @@ -910,6 +864,8 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } return asize; } +#endif + #ifdef ROMAN_CLIENT_DISABLE /* Release data obtained with getMemoryOverheadData(). */ diff --git a/src/redis/t_zset.c b/src/redis/t_zset.c index 143f414bf4bc..558d258f1597 100644 --- a/src/redis/t_zset.c +++ b/src/redis/t_zset.c @@ -1108,444 +1108,4 @@ unsigned char *zzlDeleteRangeByLex(unsigned char *zl, const zlexrangespec *range if (deleted != NULL) *deleted = num; return zl; -} - -/* Delete all the elements with rank between start and end from the skiplist. - * Start and end are inclusive. Note that start and end need to be 1-based */ -unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsigned int end, unsigned long *deleted) { - unsigned int num = (end-start)+1; - if (deleted) *deleted = num; - zl = lpDeleteRange(zl,2*(start-1),2*num); - return zl; -} - -/*----------------------------------------------------------------------------- - * Common sorted set API - *----------------------------------------------------------------------------*/ - -zset* zsetCreate(void) { - zset *zs = zmalloc(sizeof(*zs)); - zs->dict = dictCreate(&zsetDictType); - zs->zsl = zslCreate(); - return zs; -} - -void zsetFree(zset *zs) { - dictRelease(zs->dict); - zslFree(zs->zsl); - zfree(zs); -} - -unsigned long zsetLength(const robj *zobj) { - unsigned long length = 0; - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - length = zzlLength(zobj->ptr); - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - length = ((const zset*)zobj->ptr)->zsl->length; - } else { - serverPanic("Unknown sorted set encoding"); - } - return length; -} - -void zsetConvert(robj *zobj, int encoding) { - zset *zs; - zskiplistNode *node, *next; - sds ele; - double score; - - if (zobj->encoding == encoding) return; - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *zl = zobj->ptr; - unsigned char *eptr, *sptr; - unsigned char *vstr; - unsigned int vlen; - long long vlong; - - if (encoding != OBJ_ENCODING_SKIPLIST) - serverPanic("Unknown target encoding"); - - zs = zmalloc(sizeof(*zs)); - zs->dict = dictCreate(&zsetDictType); - zs->zsl = zslCreate(); - - eptr = lpSeek(zl,0); - if (eptr != NULL) { - sptr = lpNext(zl,eptr); - serverAssertWithInfo(NULL,zobj,sptr != NULL); - } - - while (eptr != NULL) { - score = zzlGetScore(sptr); - vstr = lpGetValue(eptr,&vlen,&vlong); - if (vstr == NULL) - ele = sdsfromlonglong(vlong); - else - ele = sdsnewlen((char*)vstr,vlen); - - node = zslInsert(zs->zsl,score,ele); - serverAssert(dictAdd(zs->dict,ele,&node->score) == DICT_OK); - zzlNext(zl,&eptr,&sptr); - } - - zfree(zobj->ptr); - zobj->ptr = zs; - zobj->encoding = OBJ_ENCODING_SKIPLIST; - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - unsigned char *zl = lpNew(0); - - if (encoding != OBJ_ENCODING_LISTPACK) - serverPanic("Unknown target encoding"); - - /* Approach similar to zslFree(), since we want to free the skiplist at - * the same time as creating the listpack. */ - zs = zobj->ptr; - dictRelease(zs->dict); - node = zs->zsl->header->level[0].forward; - zfree(zs->zsl->header); - zfree(zs->zsl); - - while (node) { - zl = zzlInsertAt(zl,NULL,node->ele,node->score); - next = node->level[0].forward; - zslFreeNode(node); - node = next; - } - - zfree(zs); - zobj->ptr = zl; - zobj->encoding = OBJ_ENCODING_LISTPACK; - } else { - serverPanic("Unknown sorted set encoding"); - } -} - -/* Convert the sorted set object into a listpack if it is not already a listpack - * and if the number of elements and the maximum element size and total elements size - * are within the expected ranges. */ -void zsetConvertToListpackIfNeeded(robj *zobj, size_t maxelelen, size_t totelelen) { - if (zobj->encoding == OBJ_ENCODING_LISTPACK) return; - zset *zset = zobj->ptr; - - if (zset->zsl->length <= server.zset_max_listpack_entries && - maxelelen <= server.zset_max_listpack_value && - lpSafeToAdd(NULL, totelelen)) - { - zsetConvert(zobj,OBJ_ENCODING_LISTPACK); - } -} - -/* Return (by reference) the score of the specified member of the sorted set - * storing it into *score. If the element does not exist C_ERR is returned - * otherwise C_OK is returned and *score is correctly populated. - * If 'zobj' or 'member' is NULL, C_ERR is returned. */ -int zsetScore(robj *zobj, sds member, double *score) { - if (!zobj || !member) return C_ERR; - - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - if (zzlFind(zobj->ptr, member, score) == NULL) return C_ERR; - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - dictEntry *de = dictFind(zs->dict, member); - if (de == NULL) return C_ERR; - *score = *(double*)dictGetVal(de); - } else { - serverPanic("Unknown sorted set encoding"); - } - return C_OK; -} - -/* Add a new element or update the score of an existing element in a sorted - * set, regardless of its encoding. - * - * The set of flags change the command behavior. - * - * The input flags are the following: - * - * ZADD_INCR: Increment the current element score by 'score' instead of updating - * the current element score. If the element does not exist, we - * assume 0 as previous score. - * ZADD_NX: Perform the operation only if the element does not exist. - * ZADD_XX: Perform the operation only if the element already exist. - * ZADD_GT: Perform the operation on existing elements only if the new score is - * greater than the current score. - * ZADD_LT: Perform the operation on existing elements only if the new score is - * less than the current score. - * - * When ZADD_INCR is used, the new score of the element is stored in - * '*newscore' if 'newscore' is not NULL. - * - * The returned flags are the following: - * - * ZADD_NAN: The resulting score is not a number. - * ZADD_ADDED: The element was added (not present before the call). - * ZADD_UPDATED: The element score was updated. - * ZADD_NOP: No operation was performed because of NX or XX. - * - * Return value: - * - * The function returns 1 on success, and sets the appropriate flags - * ADDED or UPDATED to signal what happened during the operation (note that - * none could be set if we re-added an element using the same score it used - * to have, or in the case a zero increment is used). - * - * The function returns 0 on error, currently only when the increment - * produces a NAN condition, or when the 'score' value is NAN since the - * start. - * - * The command as a side effect of adding a new element may convert the sorted - * set internal encoding from listpack to hashtable+skiplist. - * - * Memory management of 'ele': - * - * The function does not take ownership of the 'ele' SDS string, but copies - * it if needed. */ -int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) { - /* Turn options into simple to check vars. */ - int incr = (in_flags & ZADD_IN_INCR) != 0; - int nx = (in_flags & ZADD_IN_NX) != 0; - int xx = (in_flags & ZADD_IN_XX) != 0; - int gt = (in_flags & ZADD_IN_GT) != 0; - int lt = (in_flags & ZADD_IN_LT) != 0; - *out_flags = 0; /* We'll return our response flags. */ - double curscore; - - /* NaN as input is an error regardless of all the other parameters. */ - if (isnan(score)) { - *out_flags = ZADD_OUT_NAN; - return 0; - } - - /* Update the sorted set according to its encoding. */ - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *eptr; - - if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { - /* NX? Return, same element already exists. */ - if (nx) { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - - /* Prepare the score for the increment if needed. */ - if (incr) { - score += curscore; - if (isnan(score)) { - *out_flags |= ZADD_OUT_NAN; - return 0; - } - } - - /* GT/LT? Only update if score is greater/less than current. */ - if ((lt && score >= curscore) || (gt && score <= curscore)) { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - - if (newscore) *newscore = score; - - /* Remove and re-insert when score changed. */ - if (score != curscore) { - zobj->ptr = zzlDelete(zobj->ptr,eptr); - zobj->ptr = zzlInsert(zobj->ptr,ele,score); - *out_flags |= ZADD_OUT_UPDATED; - } - return 1; - } else if (!xx) { - /* check if the element is too large or the list - * becomes too long *before* executing zzlInsert. */ - if (zzlLength(zobj->ptr)+1 > server.zset_max_listpack_entries || - sdslen(ele) > server.zset_max_listpack_value || - !lpSafeToAdd(zobj->ptr, sdslen(ele))) - { - zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); - } else { - zobj->ptr = zzlInsert(zobj->ptr,ele,score); - if (newscore) *newscore = score; - *out_flags |= ZADD_OUT_ADDED; - return 1; - } - } else { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - } - - /* Note that the above block handling listpack would have either returned or - * converted the key to skiplist. */ - if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - zskiplistNode *znode; - dictEntry *de; - - de = dictFind(zs->dict,ele); - if (de != NULL) { - /* NX? Return, same element already exists. */ - if (nx) { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - - curscore = *(double*)dictGetVal(de); - - /* Prepare the score for the increment if needed. */ - if (incr) { - score += curscore; - if (isnan(score)) { - *out_flags |= ZADD_OUT_NAN; - return 0; - } - } - - /* GT/LT? Only update if score is greater/less than current. */ - if ((lt && score >= curscore) || (gt && score <= curscore)) { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - - if (newscore) *newscore = score; - - /* Remove and re-insert when score changes. */ - if (score != curscore) { - znode = zslUpdateScore(zs->zsl,curscore,ele,score); - /* Note that we did not removed the original element from - * the hash table representing the sorted set, so we just - * update the score. */ - dictGetVal(de) = &znode->score; /* Update score ptr. */ - *out_flags |= ZADD_OUT_UPDATED; - } - return 1; - } else if (!xx) { - ele = sdsdup(ele); - znode = zslInsert(zs->zsl,score,ele); - serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); - *out_flags |= ZADD_OUT_ADDED; - if (newscore) *newscore = score; - return 1; - } else { - *out_flags |= ZADD_OUT_NOP; - return 1; - } - } else { - serverPanic("Unknown sorted set encoding"); - } - return 0; /* Never reached. */ -} - -/* Deletes the element 'ele' from the sorted set encoded as a skiplist+dict, - * returning 1 if the element existed and was deleted, 0 otherwise (the - * element was not there). It does not resize the dict after deleting the - * element. */ -int zsetRemoveFromSkiplist(zset *zs, sds ele) { - dictEntry *de; - double score; - - de = dictUnlink(zs->dict,ele); - if (de != NULL) { - /* Get the score in order to delete from the skiplist later. */ - score = *(double*)dictGetVal(de); - - /* Delete from the hash table and later from the skiplist. - * Note that the order is important: deleting from the skiplist - * actually releases the SDS string representing the element, - * which is shared between the skiplist and the hash table, so - * we need to delete from the skiplist as the final step. */ - dictFreeUnlinkedEntry(zs->dict,de); - - /* Delete from skiplist. */ - int retval = zslDelete(zs->zsl,score,ele,NULL); - serverAssert(retval); - - return 1; - } - - return 0; -} - -/* Delete the element 'ele' from the sorted set, returning 1 if the element - * existed and was deleted, 0 otherwise (the element was not there). */ -int zsetDel(robj *zobj, sds ele) { - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *eptr; - - if ((eptr = zzlFind(zobj->ptr,ele,NULL)) != NULL) { - zobj->ptr = zzlDelete(zobj->ptr,eptr); - return 1; - } - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - if (zsetRemoveFromSkiplist(zs, ele)) { - if (htNeedsResize(zs->dict)) dictResize(zs->dict); - return 1; - } - } else { - serverPanic("Unknown sorted set encoding"); - } - return 0; /* No such element found. */ -} - -/* Given a sorted set object returns the 0-based rank of the object or - * -1 if the object does not exist. - * - * For rank we mean the position of the element in the sorted collection - * of elements. So the first element has rank 0, the second rank 1, and so - * forth up to length-1 elements. - * - * If 'reverse' is false, the rank is returned considering as first element - * the one with the lowest score. Otherwise if 'reverse' is non-zero - * the rank is computed considering as element with rank 0 the one with - * the highest score. */ -long zsetRank(robj *zobj, sds ele, int reverse) { - unsigned long llen; - unsigned long rank; - - llen = zsetLength(zobj); - - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *zl = zobj->ptr; - unsigned char *eptr, *sptr; - - eptr = lpSeek(zl,0); - serverAssert(eptr != NULL); - sptr = lpNext(zl,eptr); - serverAssert(sptr != NULL); - - rank = 1; - while(eptr != NULL) { - if (lpCompare(eptr,(unsigned char*)ele,sdslen(ele))) - break; - rank++; - zzlNext(zl,&eptr,&sptr); - } - - if (eptr != NULL) { - if (reverse) - return llen-rank; - else - return rank-1; - } else { - return -1; - } - } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = zobj->ptr; - zskiplist *zsl = zs->zsl; - dictEntry *de; - double score; - - de = dictFind(zs->dict,ele); - if (de != NULL) { - score = *(double*)dictGetVal(de); - rank = zslGetRank(zsl,score,ele); - /* Existing elements always have a rank. */ - serverAssert(rank != 0); - if (reverse) - return llen-rank; - else - return rank-1; - } else { - return -1; - } - } else { - serverPanic("Unknown sorted set encoding"); - } -} +} \ No newline at end of file diff --git a/src/redis/zset.h b/src/redis/zset.h index 361d5f3e8ebb..39da4039e153 100644 --- a/src/redis/zset.h +++ b/src/redis/zset.h @@ -21,6 +21,8 @@ #define ZADD_OUT_ADDED (1 << 2) /* The element was new and was added. */ #define ZADD_OUT_UPDATED (1 << 3) /* The element already existed, score updated. */ +typedef struct dict dict; + /* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; @@ -38,13 +40,6 @@ typedef struct zskiplist { int level; } zskiplist; -struct dict; - -typedef struct zset { - struct dict* dict; - zskiplist* zsl; -} zset; - /* Struct to hold an inclusive/exclusive range spec by score comparison. */ typedef struct { double min, max; @@ -63,26 +58,14 @@ zskiplist* zslCreate(void); void zslFree(zskiplist* zsl); zskiplistNode* zslInsert(zskiplist* zsl, double score, sds ele); unsigned char* zzlInsert(unsigned char* zl, sds ele, double score); -// int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node); +int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node); zskiplistNode* zslFirstInRange(zskiplist* zsl, const zrangespec* range); zskiplistNode* zslLastInRange(zskiplist* zsl, const zrangespec* range); -// double zzlGetScore(unsigned char *sptr); -// void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); -// void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore); unsigned char *zzlFind(unsigned char *lp, sds ele, double *score); unsigned char* zzlFirstInRange(unsigned char* zl, const zrangespec* range); unsigned char* zzlLastInRange(unsigned char* zl, const zrangespec* range); -zset* zsetCreate(void); -void zsetFree(zset *o); -unsigned long zsetLength(const robj* zobj); -int zsetRemoveFromSkiplist(zset *zs, sds ele); -void zsetConvert(robj* zobj, int encoding); -void zsetConvertToZiplistIfNeeded(robj* zobj, size_t maxelelen); -int zsetScore(robj* zobj, sds member, double* score); unsigned long zslGetRank(zskiplist *zsl, double score, sds ele); -int zsetAdd(robj* zobj, double score, sds ele, int in_flags, int* out_flags, double* newscore); -long zsetRank(robj* zobj, sds ele, int reverse); -int zsetDel(robj* zobj, sds ele); void zzlPrev(unsigned char* zl, unsigned char** eptr, unsigned char** sptr); void zzlNext(unsigned char* zl, unsigned char** eptr, unsigned char** sptr); @@ -99,7 +82,6 @@ int zzlLexValueGteMin(unsigned char* p, const zlexrangespec* spec); int zzlLexValueLteMax(unsigned char* p, const zlexrangespec* spec); int zslLexValueGteMin(sds value, const zlexrangespec* spec); int zslLexValueLteMax(sds value, const zlexrangespec* spec); -int zsetZiplistValidateIntegrity(unsigned char* zl, size_t size, int deep); zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank); unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict); diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index e9e711f39b9c..4e39567fcd87 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -4,6 +4,7 @@ #include "server/container_utils.h" #include "base/logging.h" +#include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" #include "server/engine_shard_set.h" @@ -139,29 +140,10 @@ bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSort return success; } else { CHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST); - zset* zs = static_cast(robj_wrapper->inner_obj()); - zskiplist* zsl = zs->zsl; - zskiplistNode* ln; - - /* Check if starting point is trivial, before doing log(N) lookup. */ - if (reverse) { - ln = zsl->tail; - unsigned long llen = robj_wrapper->Size(); - if (start > 0) - ln = zslGetElementByRank(zsl, llen - start); - } else { - ln = zsl->header->level[0].forward; - if (start > 0) - ln = zslGetElementByRank(zsl, start + 1); - } - - bool success = true; - while (success && rangelen--) { - DCHECK(ln != NULL); - success = func(ContainerEntry{ln->ele, sdslen(ln->ele)}, ln->score); - ln = reverse ? ln->backward : ln->level[0].forward; - } - return success; + detail::SortedMap* smap = (detail::SortedMap*)robj_wrapper->inner_obj(); + return smap->Iterate(start, rangelen, reverse, [&](sds ele, double score) { + return func(ContainerEntry{ele, sdslen(ele)}, score); + }); } return false; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index f7aa4ba44967..ac5b500ba8f8 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -28,6 +28,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "core/json_object.h" +#include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" #include "server/engine_shard_set.h" @@ -69,76 +70,6 @@ inline void YieldIfNeeded(size_t i) { } } -// taken from zset.c -unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) { - unsigned char* sptr; - char scorebuf[128]; - int scorelen; - - scorelen = d2string(scorebuf, sizeof(scorebuf), score); - if (eptr == NULL) { - zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele)); - zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen); - } else { - /* Insert member before the element 'eptr'. */ - zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr); - - /* Insert score after the member. */ - zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL); - } - return zl; -} - -// taken from zset.c -uint8_t* ToListPack(const zskiplist* zsl) { - uint8_t* lp = lpNew(0); - - /* Approach similar to zslFree(), since we want to free the skiplist at - * the same time as creating the listpack. */ - zskiplistNode* node = zsl->header->level[0].forward; - - while (node) { - lp = zzlInsertAt(lp, NULL, node->ele, node->score); - node = node->level[0].forward; - } - - return lp; -} - -// taken from zsetConvert -zset* FromListPack(const uint8_t* lp) { - uint8_t* zl = (uint8_t*)lp; - unsigned char *eptr, *sptr; - unsigned char* vstr; - unsigned int vlen; - long long vlong; - sds ele; - - eptr = lpSeek(zl, 0); - if (eptr != NULL) { - sptr = lpNext(zl, eptr); - CHECK(sptr != NULL); - } - - zset* zs = zsetCreate(); - - while (eptr != NULL) { - double score = zzlGetScore(sptr); - vstr = lpGetValue(eptr, &vlen, &vlong); - if (vstr == NULL) - ele = sdsfromlonglong(vlong); - else - ele = sdsnewlen((char*)vstr, vlen); - - zskiplistNode* node = zslInsert(zs->zsl, score, ele); - CHECK_EQ(DICT_OK, dictAdd(zs->dict, ele, &node->score)); - - zzlNext(zl, &eptr, &sptr); - } - - return zs; -} - class error_category : public std::error_category { public: const char* name() const noexcept final { @@ -763,11 +694,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { } void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { - zset* zs = zsetCreate(); - auto cleanup = absl::Cleanup([&] { zsetFree(zs); }); - size_t zsetlen = ltrace->blob_count(); - if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) { + detail::SortedMap* zs = new detail::SortedMap; + unsigned encoding = OBJ_ENCODING_SKIPLIST; + auto cleanup = absl::MakeCleanup([&] { delete zs; }); + + if (zsetlen > DICT_HT_INITIAL_SIZE && !zs->Reserve(zsetlen)) { LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; ec_ = RdbError(errc::out_of_memory); return; @@ -787,9 +719,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { maxelelen = sdslen(sdsele); totelelen += sdslen(sdsele); - zskiplistNode* znode = zslInsert(zs->zsl, score, sdsele); - int ret = dictAdd(zs->dict, sdsele, &znode->score); - if (ret != DICT_OK) { + if (!zs->Insert(score, sdsele)) { LOG(ERROR) << "Duplicate zset fields detected"; sdsfree(sdsele); ec_ = RdbError(errc::rdb_file_corrupted); @@ -802,18 +732,17 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { if (ec_) return; - unsigned enc = OBJ_ENCODING_SKIPLIST; void* inner = zs; - - if (zs->zsl->length <= server.zset_max_listpack_entries && + if (zs->Size() <= server.zset_max_listpack_entries && maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) { - enc = OBJ_ENCODING_LISTPACK; - inner = ToListPack(zs->zsl); - } else { - std::move(cleanup).Cancel(); + encoding = OBJ_ENCODING_LISTPACK; + inner = zs->ToListPack(); + delete zs; } - pv_->InitRobj(OBJ_ZSET, enc, inner); + std::move(cleanup).Cancel(); + + pv_->InitRobj(OBJ_ZSET, encoding, inner); } void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { @@ -1006,11 +935,12 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { unsigned encoding = OBJ_ENCODING_LISTPACK; void* inner; if (lpBytes(lp) > server.zset_max_listpack_entries) { - inner = FromListPack(lp); - zfree(lp); + inner = detail::SortedMap::FromListPack(lp).release(); + lpFree(lp); encoding = OBJ_ENCODING_SKIPLIST; } else { - inner = lpShrinkToFit(lp); + lp = lpShrinkToFit(lp); + inner = lp; } pv_->InitRobj(OBJ_ZSET, encoding, inner); return; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index e6fcbb81a37e..11aa8dfd28ab 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -27,6 +27,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "core/json_object.h" +#include "core/sorted_map.h" #include "core/string_map.h" #include "core/string_set.h" #include "server/engine_shard_set.h" @@ -445,10 +446,10 @@ error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) { DCHECK_EQ(OBJ_ZSET, pv.ObjType()); const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper(); if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) { - zset* zs = (zset*)robj_wrapper->inner_obj(); - zskiplist* zsl = zs->zsl; + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); - RETURN_ON_ERR(SaveLen(zsl->length)); + RETURN_ON_ERR(SaveLen(zs->Size())); + std::error_code ec; /* We save the skiplist elements from the greatest to the smallest * (that's trivial since the elements are already ordered in the @@ -456,12 +457,15 @@ error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) { * element will always be the smaller, so adding to the skiplist * will always immediately stop at the head, making the insertion * O(1) instead of O(log(N)). */ - zskiplistNode* zn = zsl->tail; - while (zn != NULL) { - RETURN_ON_ERR(SaveString(string_view{zn->ele, sdslen(zn->ele)})); - RETURN_ON_ERR(SaveBinaryDouble(zn->score)); - zn = zn->backward; - } + zs->Iterate(0, zs->Size(), true, [&](sds ele, double score) { + ec = SaveString(string_view{ele, sdslen(ele)}); + if (ec) + return false; + ec = SaveBinaryDouble(score); + if (ec) + return false; + return true; + }); } else { CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding"; uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj(); diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index e8b722f574fd..4129cdf2c06b 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -16,6 +16,7 @@ extern "C" { #include "base/logging.h" #include "base/stl_util.h" +#include "core/sorted_map.h" #include "facade/error.h" #include "server/blocking_controller.h" #include "server/command_registry.h" @@ -101,12 +102,9 @@ int ZsetDel(detail::RobjWrapper* robj_wrapper, sds ele) { return 1; } } else if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) { - zset* zs = (zset*)robj_wrapper->inner_obj(); - if (zsetRemoveFromSkiplist(zs, ele)) { - if (htNeedsResize(zs->dict)) - dictResize(zs->dict); + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + if (zs->Delete(ele)) return 1; - } } return 0; /* No such element found. */ } @@ -121,12 +119,8 @@ std::optional GetZsetScore(detail::RobjWrapper* robj_wrapper, sds member } if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) { - zset* zs = (zset*)robj_wrapper->inner_obj(); - dictEntry* de = dictFind(zs->dict, member); - if (de == NULL) - return std::nullopt; - - return *(double*)dictGetVal(de); + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + return zs->GetScore(member); } LOG(FATAL) << "Unknown sorted set encoding"; @@ -170,7 +164,7 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args if (add_res.second || zparams.override) { if (member_len > kMaxListPackValue) { - zset* zs = zsetCreate(); + detail::SortedMap* zs = new detail::SortedMap(); pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, zs); } else { unsigned char* lp = lpNew(0); @@ -286,10 +280,6 @@ class IntervalVisitor { } } - zskiplistNode* Next(zskiplistNode* ln) const { - return params_.reverse ? ln->backward : ln->level[0].forward; - } - bool IsUnder(double score, const zrangespec& spec) const { return params_.reverse ? zslValueGteMin(score, &spec) : zslValueLteMax(score, &spec); } @@ -419,8 +409,8 @@ void IntervalVisitor::ActionRem(unsigned start, unsigned end) { robj_wrapper_->set_inner_obj(zl); } else { CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); - zset* zs = (zset*)robj_wrapper_->inner_obj(); - removed_ = zslDeleteRangeByRank(zs->zsl, start + 1, end + 1, zs->dict); + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + removed_ = zs->DeleteRangeByRank(start, end); } } @@ -433,8 +423,8 @@ void IntervalVisitor::ActionRem(const zrangespec& range) { removed_ = deleted; } else { CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); - zset* zs = (zset*)robj_wrapper_->inner_obj(); - removed_ = zslDeleteRangeByScore(zs->zsl, &range, zs->dict); + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + removed_ = zs->DeleteRangeByScore(range); } } @@ -447,8 +437,8 @@ void IntervalVisitor::ActionRem(const zlexrangespec& range) { removed_ = deleted; } else { CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); - zset* zs = (zset*)robj_wrapper_->inner_obj(); - removed_ = zslDeleteRangeByLex(zs->zsl, &range, zs->dict); + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + removed_ = zs->DeleteRangeByLex(range); } } @@ -506,35 +496,12 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) { } void IntervalVisitor::ExtractSkipList(const zrangespec& range) { - zset* zs = (zset*)robj_wrapper_->inner_obj(); - zskiplist* zsl = zs->zsl; - zskiplistNode* ln; + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); + unsigned offset = params_.offset; unsigned limit = params_.limit; - /* If reversed, get the last node in range as starting point. */ - if (params_.reverse) { - ln = zslLastInRange(zsl, &range); - } else { - ln = zslFirstInRange(zsl, &range); - } - - /* If there is an offset, just traverse the number of elements without - * checking the score because that is done in the next loop. */ - while (ln && offset--) { - ln = Next(ln); - } - - while (ln && limit--) { - /* Abort when the node is no longer in range. */ - if (!IsUnder(ln->score, range)) - break; - - result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); - - /* Move to next node */ - ln = Next(ln); - } + result_ = zs->GetRange(range, offset, limit, params_.reverse); } void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { @@ -586,40 +553,10 @@ void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { } void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) { - zset* zs = (zset*)robj_wrapper_->inner_obj(); - zskiplist* zsl = zs->zsl; - zskiplistNode* ln; + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); unsigned offset = params_.offset; unsigned limit = params_.limit; - - /* If reversed, get the last node in range as starting point. */ - if (params_.reverse) { - ln = zslLastInLexRange(zsl, &range); - } else { - ln = zslFirstInLexRange(zsl, &range); - } - - /* If there is an offset, just traverse the number of elements without - * checking the score because that is done in the next loop. */ - while (ln && offset--) { - ln = Next(ln); - } - - while (ln && limit--) { - /* Abort when the node is no longer in range. */ - if (params_.reverse) { - if (!zslLexValueGteMin(ln->ele, &range)) - break; - } else { - if (!zslLexValueLteMax(ln->ele, &range)) - break; - } - - result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); - - /* Move to next node */ - ln = Next(ln); - } + result_ = zs->GetLexRange(range, offset, limit, params_.reverse); } void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) { @@ -662,25 +599,10 @@ void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) { } void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) { - zset* zs = (zset*)robj_wrapper_->inner_obj(); - zskiplist* zsl = zs->zsl; - zskiplistNode* ln; + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper_->inner_obj(); /* We start from the header, or the tail if reversed. */ - if (params_.reverse) { - ln = zsl->tail; - } else { - ln = zsl->header->level[0].forward; - } - - while (ln && sc--) { - result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); - - /* we can delete the element now */ - ZsetDel(robj_wrapper_, ln->ele); - - ln = Next(ln); - } + result_ = zs->PopTopScores(sc, params_.reverse); } void IntervalVisitor::AddResult(const uint8_t* vstr, unsigned vlen, long long vlong, double score) { @@ -1435,21 +1357,14 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view me return rank - 1; } DCHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST); - - robj self{ - .type = OBJ_ZSET, - .encoding = robj_wrapper->encoding(), - .lru = 0, - .refcount = OBJ_STATIC_REFCOUNT, - .ptr = robj_wrapper->inner_obj(), - }; - + detail::SortedMap* ss = (detail::SortedMap*)robj_wrapper->inner_obj(); op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size()); - long res = zsetRank(&self, op_args.shard->tmp_str1, reverse); - if (res < 0) + + std::optional rank = ss->GetRank(op_args.shard->tmp_str1, reverse); + if (!rank) return OpStatus::KEY_NOTFOUND; - return res; + return *rank; } OpResult OpCount(const OpArgs& op_args, std::string_view key, @@ -1495,29 +1410,8 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, } } else { CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), robj_wrapper->encoding()); - zset* zs = (zset*)robj_wrapper->inner_obj(); - zskiplist* zsl = zs->zsl; - zskiplistNode* zn; - unsigned long rank; - - /* Find first element in range */ - zn = zslFirstInRange(zsl, &range); - - /* Use rank of first element, if any, to determine preliminary count */ - if (zn == NULL) - return 0; - - rank = zslGetRank(zsl, zn->score, zn->ele); - count = (zsl->length - (rank - 1)); - - /* Find last element in range */ - zn = zslLastInRange(zsl, &range); - - /* Use rank of last element, if any, to determine the actual count */ - if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->ele); - count -= (zsl->length - rank); - } + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + count = zs->Count(range); } return count; @@ -1559,28 +1453,8 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, } } else { DCHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper->encoding()); - zset* zs = (zset*)robj_wrapper->inner_obj(); - zskiplist* zsl = zs->zsl; - zskiplistNode* zn; - unsigned long rank; - - /* Find first element in range */ - zn = zslFirstInLexRange(zsl, &range); - - /* Use rank of first element, if any, to determine preliminary count */ - if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->ele); - count = (zsl->length - (rank - 1)); - - /* Find last element in range */ - zn = zslLastInLexRange(zsl, &range); - - /* Use rank of last element, if any, to determine the actual count */ - if (zn != NULL) { - rank = zslGetRank(zsl, zn->score, zn->ele); - count -= (zsl->length - rank); - } - } + detail::SortedMap* zs = (detail::SortedMap*)robj_wrapper->inner_obj(); + count = zs->LexCount(range); } zslFreeLexRange(&range); @@ -1679,9 +1553,9 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t } else { CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), pv.Encoding()); uint32_t count = scan_op.limit; - zset* zs = (zset*)pv.RObjPtr(); + detail::SortedMap* zs = (detail::SortedMap*)pv.RObjPtr(); - dict* ht = zs->dict; + dict* ht = zs->GetDict(); long maxiterations = count * 10; struct ScanArgs { diff --git a/src/server/zset_family_test.cc b/src/server/zset_family_test.cc index 85e9522661b2..9026cb27055d 100644 --- a/src/server/zset_family_test.cc +++ b/src/server/zset_family_test.cc @@ -355,6 +355,14 @@ TEST_F(ZSetFamilyTest, ZUnion) { resp = Run({"zunion", "1", "z1", "weights", "2", "aggregate", "max", "withscores"}); EXPECT_THAT(resp.GetVec(), ElementsAre("a", "2", "b", "6")); + + for (unsigned i = 0; i < 256; ++i) { + Run({"zadd", "large1", "1000", absl::StrCat("aaaaaaaaaa", i)}); + Run({"zadd", "large2", "1000", absl::StrCat("bbbbbbbbbb", i)}); + Run({"zadd", "large2", "1000", absl::StrCat("aaaaaaaaaa", i)}); + } + resp = Run({"zunion", "2", "large2", "large1"}); + EXPECT_THAT(resp, ArrLen(512)); } TEST_F(ZSetFamilyTest, ZUnionStore) {