Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(server): Introduce StringSetWrapper #3347

Merged
merged 1 commit into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/core/string_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class StringSet : public DenseSet {
class iterator : private IteratorBase {
public:
using iterator_category = std::forward_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = sds;
using pointer = sds*;
using reference = sds&;
Expand Down
210 changes: 81 additions & 129 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,58 +74,74 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
return is;
}

pair<unsigned, bool> RemoveStrSet(uint32_t now_sec, facade::ArgRange vals, CompactObj* set) {
unsigned removed = 0;
bool isempty = false;
DCHECK(IsDenseEncoding(*set));
struct StringSetWrapper {
StringSetWrapper(const CompactObj& obj, const DbContext& db_cntx)
: StringSetWrapper(obj.RObjPtr(), db_cntx.time_now_ms) {
DCHECK(IsDenseEncoding(obj));
}

if (true) {
StringSet* ss = ((StringSet*)set->RObjPtr());
ss->set_time(now_sec);
StringSetWrapper(const SetType& st, const DbContext& db_cntx)
: StringSetWrapper(st.first, db_cntx.time_now_ms) {
DCHECK_EQ(st.second, kEncodingStrMap2);
}

for (string_view member : vals) {
removed += ss->Erase(member);
}
static void Init(CompactObj* obj) {
obj->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
}

isempty = ss->Empty();
unsigned Add(const NewEntries& entries, uint32_t ttl_sec) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use unsigned an not uint32_t or uint64_t ? It's less verbose 🤣

unsigned res = 0;
for (string_view member : EntriesRange(entries))
res += ss->Add(member, ttl_sec);
return res;
}

return make_pair(removed, isempty);
}
pair<unsigned, bool> Remove(const facade::ArgRange& entries) const {
unsigned removed = 0;
for (string_view member : entries)
removed += ss->Erase(member);
return {removed, ss->Empty()};
}

unsigned AddStrSet(const DbContext& db_context, const NewEntries& vals, uint32_t ttl_sec,
CompactObj* dest) {
unsigned res = 0;
DCHECK(IsDenseEncoding(*dest));
uint64_t Scan(uint64_t curs, const ScanOpts& scan_op, StringVec* res) const {
uint32_t count = scan_op.limit;
long maxiterations = count * 10;

if (true) {
StringSet* ss = (StringSet*)dest->RObjPtr();
uint32_t time_now = MemberTimeSeconds(db_context.time_now_ms);
do {
auto scan_callback = [&](const sds ptr) {
if (string_view str{ptr, sdslen(ptr)}; scan_op.Matches(str))
res->emplace_back(str);
};
curs = ss->Scan(curs, scan_callback);
} while (curs && maxiterations-- && res->size() < count);
return curs;
}

ss->set_time(time_now);
StringSet* operator->() const {
return ss;
}

for (auto member : EntriesRange(vals)) {
res += ss->Add(member, ttl_sec);
}
auto Range() const {
auto transform = [](const sds ptr) { return string_view{ptr, sdslen(ptr)}; };
return base::it::Transform(transform, base::it::Range(ss->begin(), ss->end()));
}

return res;
}
private:
StringSetWrapper(void* robj_ptr, uint64_t now_ms) : ss(static_cast<StringSet*>(robj_ptr)) {
ss->set_time(MemberTimeSeconds(now_ms));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make sure that we don't call this on paths that don't require this. I went over the cases and they seemed fine but just double checking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked too. And if a command path existed, it was likely missing, not abundant 🤷🏻‍♂️

}

void InitStrSet(CompactObj* set) {
set->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>());
}
StringSet* const ss;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

west side vs east side 👀

};

// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange vals,
CompactObj* set) {
bool isempty = false;
unsigned removed = 0;

if (set->Encoding() == kEncodingIntSet) {
intset* is = (intset*)set->RObjPtr();
long long llval;

unsigned removed = 0;
for (string_view val : vals) {
if (!string2ll(val.data(), val.size(), &llval)) {
continue;
Expand All @@ -135,12 +151,12 @@ pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange val
is = intsetRemove(is, llval, &is_removed);
removed += is_removed;
}
isempty = (intsetLen(is) == 0);
set->SetRObjPtr(is);

return {removed, intsetLen(is) == 0};
} else {
return RemoveStrSet(MemberTimeSeconds(db_context.time_now_ms), vals, set);
return StringSetWrapper{*set, db_context}.Remove(vals);
}
return make_pair(removed, isempty);
}

void InitSet(const NewEntries& vals, CompactObj* set) {
Expand All @@ -158,44 +174,15 @@ void InitSet(const NewEntries& vals, CompactObj* set) {
intset* is = intsetNew();
set->InitRobj(OBJ_SET, kEncodingIntSet, is);
} else {
InitStrSet(set);
}
}

uint64_t ScanStrSet(const DbContext& db_context, const CompactObj& co, uint64_t curs,
const ScanOpts& scan_op, StringVec* res) {
uint32_t count = scan_op.limit;
long maxiterations = count * 10;
DCHECK(IsDenseEncoding(co));

if (true) {
StringSet* set = (StringSet*)co.RObjPtr();
set->set_time(MemberTimeSeconds(db_context.time_now_ms));

do {
auto scan_callback = [&](const sds ptr) {
string_view str{ptr, sdslen(ptr)};
if (scan_op.Matches(str)) {
res->push_back(std::string(str));
}
};

curs = set->Scan(curs, scan_callback);

} while (curs && maxiterations-- && res->size() < count);
StringSetWrapper::Init(set);
}
return curs;
}

uint32_t SetTypeLen(const DbContext& db_context, const SetType& set) {
if (set.second == kEncodingIntSet) {
return intsetLen((const intset*)set.first);
}

if (true) {
StringSet* ss = (StringSet*)set.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
return ss->UpperBoundSize();
} else {
return StringSetWrapper(set, db_context)->UpperBoundSize();
}
}

Expand All @@ -207,11 +194,7 @@ bool IsInSet(const DbContext& db_context, const SetType& st, int64_t val) {
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
string_view str{buf, size_t(next - buf)};

if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
return ss->Contains(str);
}
return StringSetWrapper(st, db_context)->Contains(str);
}

bool IsInSet(const DbContext& db_context, const SetType& st, string_view member) {
Expand All @@ -221,13 +204,8 @@ bool IsInSet(const DbContext& db_context, const SetType& st, string_view member)
return false;

return intsetFind((intset*)st.first, llval);
}

if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));

return ss->Contains(member);
} else {
return StringSetWrapper(st, db_context)->Contains(member);
}
}

Expand All @@ -239,12 +217,8 @@ int32_t GetExpiry(const DbContext& db_context, const SetType& st, string_view me
return -3;

return -1;
}

if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));

} else {
StringSetWrapper ss{st, db_context};
auto it = ss->Find(member);
if (it == ss->end())
return -3;
Expand All @@ -264,31 +238,21 @@ void FindInSet(StringVec& memberships, const DbContext& db_context, const SetTyp
// Removes arg from result.
void DiffStrSet(const DbContext& db_context, const SetType& st,
absl::flat_hash_set<string>* result) {
if (true) {
StringSet* ss = (StringSet*)st.first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
for (sds ptr : *ss) {
result->erase(string_view{ptr, sdslen(ptr)});
}
}
for (string_view entry : StringSetWrapper{st, db_context}.Range())
result->erase(entry);
}

void InterStrSet(const DbContext& db_context, const vector<SetType>& vec, StringVec* result) {
if (true) {
StringSet* ss = (StringSet*)vec.front().first;
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));
for (const sds ptr : *ss) {
std::string_view str{ptr, sdslen(ptr)};
size_t j = 1;
for (j = 1; j < vec.size(); ++j) {
if (vec[j].first != ss && !IsInSet(db_context, vec[j], str)) {
break;
}
for (string_view str : StringSetWrapper{vec.front(), db_context}.Range()) {
size_t j = 1;
for (j = 1; j < vec.size(); ++j) {
if (vec[j].first != vec.front().first && !IsInSet(db_context, vec[j], str)) {
break;
}
}

if (j == vec.size()) {
result->push_back(std::string(str));
}
if (j == vec.size()) {
result->emplace_back(str);
}
}
}
Expand All @@ -305,22 +269,14 @@ StringVec RandMemberStrSet(const DbContext& db_context, const CompactObj& co,
StringVec result;
result.reserve(picks_count);

StringSet* ss = static_cast<StringSet*>(co.RObjPtr());
ss->set_time(MemberTimeSeconds(db_context.time_now_ms));

std::uint32_t ss_entry_index = 0;
container_utils::IterateSet(
co, [&result, &times_index_is_picked, &ss_entry_index](container_utils::ContainerEntry ce) {
auto it = times_index_is_picked.find(ss_entry_index++);
if (it != times_index_is_picked.end()) {
std::uint32_t t = it->second;
while (t--) {
result.emplace_back(ce.ToString());
}
}
return true;
});

for (string_view str : StringSetWrapper{co, db_context}.Range()) {
auto it = times_index_is_picked.find(ss_entry_index++);
if (it != times_index_is_picked.end()) {
while (it->second--)
result.emplace_back(str);
}
}
/* Equal elements in the result are always successive. So, it is necessary to shuffle them */
absl::BitGen gen;
std::shuffle(result.begin(), result.end(), gen);
Expand Down Expand Up @@ -504,11 +460,10 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
InitSet(vals, &co);
}

void* inner_obj = co.RObjPtr();
uint32_t res = 0;

if (co.Encoding() == kEncodingIntSet) {
intset* is = (intset*)inner_obj;
intset* is = (intset*)co.RObjPtr();
bool success = true;

for (auto val : vals_it) {
Expand All @@ -526,7 +481,6 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE

// frees 'is' on a way.
co.InitRobj(OBJ_SET, kEncodingStrMap2, ss);
inner_obj = co.RObjPtr();
break;
}
}
Expand All @@ -536,7 +490,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE
}

if (co.Encoding() != kEncodingIntSet) {
res = AddStrSet(op_args.db_cntx, vals, UINT32_MAX, &co);
res = StringSetWrapper{co, op_args.db_cntx}.Add(vals, UINT32_MAX);
}

if (journal_update && op_args.shard->journal()) {
Expand All @@ -563,7 +517,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
CompactObj& co = add_res.it->second;

if (add_res.is_new) {
InitStrSet(&co);
StringSetWrapper::Init(&co);
} else {
// for non-overwrite case it must be set.
if (co.ObjType() != OBJ_SET)
Expand All @@ -582,9 +536,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
CHECK(IsDenseEncoding(co));
}

uint32_t res = AddStrSet(op_args.db_cntx, vals, ttl_sec, &co);

return res;
return StringSetWrapper{co, op_args.db_cntx}.Add(vals, ttl_sec);
}

OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, facade::ArgRange vals,
Expand Down Expand Up @@ -982,7 +934,7 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, string_view key, uint64_t* cur
}
*cursor = 0;
} else {
*cursor = ScanStrSet(op_args.db_cntx, it->second, *cursor, scan_op, &res);
*cursor = StringSetWrapper{it->second, op_args.db_cntx}.Scan(*cursor, scan_op, &res);
}

return res;
Expand Down
Loading