Skip to content

Commit

Permalink
fix: AllocationTracker::Remove return value was reversed (#3341)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
romange authored and dranikpg committed Jul 21, 2024
1 parent be59b5e commit 1a42921
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 126 deletions.
2 changes: 1 addition & 1 deletion src/core/allocation_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ bool AllocationTracker::Remove(size_t lower_bound, size_t upper_bound) {
}),
tracking_.end());

return before_size == tracking_.size();
return before_size != tracking_.size();
}

void AllocationTracker::Clear() {
Expand Down
2 changes: 2 additions & 0 deletions src/core/allocation_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class AllocationTracker {
bool Add(const TrackingInfo& info);

// Removes all tracking exactly matching lower_bound and upper_bound.
// Returns true if the tracking range [lower_bound, upper_bound] was removed
// and false, otherwise.
bool Remove(size_t lower_bound, size_t upper_bound);

// Clears *all* tracking.
Expand Down
1 change: 1 addition & 0 deletions src/core/string_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class StringSet : public DenseSet {
class iterator : private IteratorBase {
public:
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = sds;
using pointer = sds*;
using reference = sds&;
Expand Down
209 changes: 84 additions & 125 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,47 +74,69 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
return is;
}

pair<unsigned, bool> RemoveStrSet(uint32_t now_sec, facade::ArgRange vals, CompactObj* set) {
unsigned removed = 0;
bool isempty = false;
DCHECK(IsDenseEncoding(*set));
struct StringSetWrapper {
StringSetWrapper(const CompactObj& obj, const DbContext& db_cntx)
: StringSetWrapper(obj.RObjPtr(), db_cntx) {
DCHECK(IsDenseEncoding(obj));
}

if (true) {
StringSet* ss = ((StringSet*)set->RObjPtr());
ss->set_time(now_sec);
StringSetWrapper(const SetType& st, const DbContext& db_cntx)
: StringSetWrapper(st.first, db_cntx) {
DCHECK_EQ(st.second, kEncodingStrMap2);
}

for (string_view member : vals) {
removed += ss->Erase(member);
}
static void Init(CompactObj* obj) {
obj->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
}

isempty = ss->Empty();
unsigned Add(const NewEntries& entries, uint32_t ttl_sec) const {
unsigned res = 0;
for (string_view member : EntriesRange(entries))
res += ss->Add(member, ttl_sec);
return res;
}

return make_pair(removed, isempty);
}
pair<unsigned, bool> Remove(const facade::ArgRange& entries) const {
unsigned removed = 0;
for (string_view member : entries)
removed += ss->Erase(member);
return {removed, ss->Empty()};
}

unsigned AddStrSet(const DbContext& db_context, const NewEntries& vals, uint32_t ttl_sec,
CompactObj* dest) {
unsigned res = 0;
DCHECK(IsDenseEncoding(*dest));
uint64_t Scan(uint64_t curs, const ScanOpts& scan_op, StringVec* res) const {
uint32_t count = scan_op.limit;
long maxiterations = count * 10;

if (true) {
StringSet* ss = (StringSet*)dest->RObjPtr();
uint32_t time_now = MemberTimeSeconds(db_context.time_now_ms);
do {
auto scan_callback = [&](const sds ptr) {
if (string_view str{ptr, sdslen(ptr)}; scan_op.Matches(str))
res->emplace_back(str);
};
curs = ss->Scan(curs, scan_callback);
} while (curs && maxiterations-- && res->size() < count);
return curs;
}

ss->set_time(time_now);
StringSet* operator->() const {
return ss;
}

for (auto member : EntriesRange(vals)) {
res += ss->Add(member, ttl_sec);
}
auto Range() const {
return base::it::Transform(
[](const sds ptr) {
return string_view{ptr, sdslen(ptr)};
},
base::it::Range(ss->begin(), ss->end()));
}

return res;
}
private:
StringSetWrapper(void* robj_ptr, const DbContext& db_cntx) {
ss = static_cast<StringSet*>(robj_ptr);
ss->set_time(MemberTimeSeconds(db_cntx.time_now_ms));
}

void InitStrSet(CompactObj* set) {
set->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
}
StringSet* ss;
};

// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange vals,
Expand All @@ -137,10 +159,11 @@ pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange val
}
isempty = (intsetLen(is) == 0);
set->SetRObjPtr(is);

return make_pair(removed, isempty);
} else {
return RemoveStrSet(MemberTimeSeconds(db_context.time_now_ms), vals, set);
return StringSetWrapper{*set, db_context}.Remove(vals);
}
return make_pair(removed, isempty);
}

void InitSet(const NewEntries& vals, CompactObj* set) {
Expand All @@ -158,44 +181,15 @@ void InitSet(const NewEntries& vals, CompactObj* set) {
intset* is = intsetNew();
set->InitRobj(OBJ_SET, kEncodingIntSet, is);
} else {
InitStrSet(set);
StringSetWrapper::Init(set);
}
}

uint64_t ScanStrSet(const DbContext& db_context, const CompactObj& co, uint64_t curs,
const ScanOpts& scan_op, StringVec* res) {
uint32_t count = scan_op.limit;
long maxiterations = count * 10;
DCHECK(IsDenseEncoding(co));

if (true) {
StringSet* set = (StringSet*)co.RObjPtr();
set->set_time(MemberTimeSeconds(db_context.time_now_ms));

do {
auto scan_callback = [&](const sds ptr) {
string_view str{ptr, sdslen(ptr)};
if (scan_op.Matches(str)) {
res->push_back(std::string(str));
}
};

curs = set->Scan(curs, scan_callback);

} while (curs && maxiterations-- && res->size() < count);
}
return curs;
}

uint32_t SetTypeLen(const DbContext& db_context, const SetType& set) {
if (set.second == kEncodingIntSet) {
return intsetLen((const intset*)set.first);
}

if (true) {
StringSet* ss = (StringSet*)set.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
return ss->UpperBoundSize();
} else {
return StringSetWrapper(set, db_context)->UpperBoundSize();
}
}

Expand All @@ -207,11 +201,7 @@ bool IsInSet(const DbContext& db_context, const SetType& st, int64_t val) {
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
string_view str{buf, size_t(next - buf)};

if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
return ss->Contains(str);
}
return StringSetWrapper(st, db_context)->Contains(str);
}

bool IsInSet(const DbContext& db_context, const SetType& st, string_view member) {
Expand All @@ -221,13 +211,8 @@ bool IsInSet(const DbContext& db_context, const SetType& st, string_view member)
return false;

return intsetFind((intset*)st.first, llval);
}

if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));

return ss->Contains(member);
} else {
return StringSetWrapper(st, db_context)->Contains(member);
}
}

Expand All @@ -239,12 +224,8 @@ int32_t GetExpiry(const DbContext& db_context, const SetType& st, string_view me
return -3;

return -1;
}

if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));

} else {
StringSetWrapper ss{st, db_context};
auto it = ss->Find(member);
if (it == ss->end())
return -3;
Expand All @@ -264,31 +245,21 @@ void FindInSet(StringVec& memberships, const DbContext& db_context, const SetTyp
// Removes arg from result.
void DiffStrSet(const DbContext& db_context, const SetType& st,
absl::flat_hash_set<string>* result) {
if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
for (sds ptr : *ss) {
result->erase(string_view{ptr, sdslen(ptr)});
}
}
for (string_view entry : StringSetWrapper{st, db_context}.Range())
result->erase(entry);
}

void InterStrSet(const DbContext& db_context, const vector<SetType>& vec, StringVec* result) {
if (true) {
StringSet* ss = (StringSet*)vec.front().first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
for (const sds ptr : *ss) {
std::string_view str{ptr, sdslen(ptr)};
size_t j = 1;
for (j = 1; j < vec.size(); ++j) {
if (vec[j].first != ss && !IsInSet(db_context, vec[j], str)) {
break;
}
for (string_view str : StringSetWrapper{vec.front(), db_context}.Range()) {
size_t j = 1;
for (j = 1; j < vec.size(); ++j) {
if (vec[j].first != vec.front().first && !IsInSet(db_context, vec[j], str)) {
break;
}
}

if (j == vec.size()) {
result->push_back(std::string(str));
}
if (j == vec.size()) {
result->emplace_back(str);
}
}
}
Expand All @@ -305,22 +276,14 @@ StringVec RandMemberStrSet(const DbContext& db_context, const CompactObj& co,
StringVec result;
result.reserve(picks_count);

StringSet* ss = static_cast<StringSet*>(co.RObjPtr());
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));

std::uint32_t ss_entry_index = 0;
container_utils::IterateSet(
co, [&result, &times_index_is_picked, &ss_entry_index](container_utils::ContainerEntry ce) {
auto it = times_index_is_picked.find(ss_entry_index++);
if (it != times_index_is_picked.end()) {
std::uint32_t t = it->second;
while (t--) {
result.emplace_back(ce.ToString());
}
}
return true;
});

for (string_view str : StringSetWrapper{co, db_context}.Range()) {
auto it = times_index_is_picked.find(ss_entry_index++);
if (it != times_index_is_picked.end()) {
while (it->second--)
result.emplace_back(str);
}
}
/* Equal elements in the result are always successive. So, it is necessary to shuffle them */
absl::BitGen gen;
std::shuffle(result.begin(), result.end(), gen);
Expand Down Expand Up @@ -504,11 +467,10 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
InitSet(vals, &co);
}

void* inner_obj = co.RObjPtr();
uint32_t res = 0;

if (co.Encoding() == kEncodingIntSet) {
intset* is = (intset*)inner_obj;
intset* is = (intset*)co.RObjPtr();
bool success = true;

for (auto val : vals_it) {
Expand All @@ -526,7 +488,6 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE

// frees 'is' on a way.
co.InitRobj(OBJ_SET, kEncodingStrMap2, ss);
inner_obj = co.RObjPtr();
break;
}
}
Expand All @@ -536,7 +497,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
}

if (co.Encoding() != kEncodingIntSet) {
res = AddStrSet(op_args.db_cntx, vals, UINT32_MAX, &co);
res = StringSetWrapper{co, op_args.db_cntx}.Add(vals, UINT32_MAX);
}

if (journal_update && op_args.shard->journal()) {
Expand All @@ -563,7 +524,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
CompactObj& co = add_res.it->second;

if (add_res.is_new) {
InitStrSet(&co);
StringSetWrapper::Init(&co);
} else {
// for non-overwrite case it must be set.
if (co.ObjType() != OBJ_SET)
Expand All @@ -582,9 +543,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
CHECK(IsDenseEncoding(co));
}

uint32_t res = AddStrSet(op_args.db_cntx, vals, ttl_sec, &co);

return res;
return StringSetWrapper{co, op_args.db_cntx}.Add(vals, ttl_sec);
}

OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, facade::ArgRange vals,
Expand Down Expand Up @@ -982,7 +941,7 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, string_view key, uint64_t* cur
}
*cursor = 0;
} else {
*cursor = ScanStrSet(op_args.db_cntx, it->second, *cursor, scan_op, &res);
*cursor = StringSetWrapper{it->second, op_args.db_cntx}.Scan(*cursor, scan_op, &res);
}

return res;
Expand Down

0 comments on commit 1a42921

Please sign in to comment.