-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(server): Introduce StringSetWrapper #3347
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,58 +74,74 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) { | |
return is; | ||
} | ||
|
||
pair<unsigned, bool> RemoveStrSet(uint32_t now_sec, facade::ArgRange vals, CompactObj* set) { | ||
unsigned removed = 0; | ||
bool isempty = false; | ||
DCHECK(IsDenseEncoding(*set)); | ||
struct StringSetWrapper { | ||
StringSetWrapper(const CompactObj& obj, const DbContext& db_cntx) | ||
: StringSetWrapper(obj.RObjPtr(), db_cntx.time_now_ms) { | ||
DCHECK(IsDenseEncoding(obj)); | ||
} | ||
|
||
if (true) { | ||
StringSet* ss = ((StringSet*)set->RObjPtr()); | ||
ss->set_time(now_sec); | ||
StringSetWrapper(const SetType& st, const DbContext& db_cntx) | ||
: StringSetWrapper(st.first, db_cntx.time_now_ms) { | ||
DCHECK_EQ(st.second, kEncodingStrMap2); | ||
} | ||
|
||
for (string_view member : vals) { | ||
removed += ss->Erase(member); | ||
} | ||
static void Init(CompactObj* obj) { | ||
obj->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>()); | ||
} | ||
|
||
isempty = ss->Empty(); | ||
unsigned Add(const NewEntries& entries, uint32_t ttl_sec) const { | ||
unsigned res = 0; | ||
for (string_view member : EntriesRange(entries)) | ||
res += ss->Add(member, ttl_sec); | ||
return res; | ||
} | ||
|
||
return make_pair(removed, isempty); | ||
} | ||
pair<unsigned, bool> Remove(const facade::ArgRange& entries) const { | ||
unsigned removed = 0; | ||
for (string_view member : entries) | ||
removed += ss->Erase(member); | ||
return {removed, ss->Empty()}; | ||
} | ||
|
||
unsigned AddStrSet(const DbContext& db_context, const NewEntries& vals, uint32_t ttl_sec, | ||
CompactObj* dest) { | ||
unsigned res = 0; | ||
DCHECK(IsDenseEncoding(*dest)); | ||
uint64_t Scan(uint64_t curs, const ScanOpts& scan_op, StringVec* res) const { | ||
uint32_t count = scan_op.limit; | ||
long maxiterations = count * 10; | ||
|
||
if (true) { | ||
StringSet* ss = (StringSet*)dest->RObjPtr(); | ||
uint32_t time_now = MemberTimeSeconds(db_context.time_now_ms); | ||
do { | ||
auto scan_callback = [&](const sds ptr) { | ||
if (string_view str{ptr, sdslen(ptr)}; scan_op.Matches(str)) | ||
res->emplace_back(str); | ||
}; | ||
curs = ss->Scan(curs, scan_callback); | ||
} while (curs && maxiterations-- && res->size() < count); | ||
return curs; | ||
} | ||
|
||
ss->set_time(time_now); | ||
StringSet* operator->() const { | ||
return ss; | ||
} | ||
|
||
for (auto member : EntriesRange(vals)) { | ||
res += ss->Add(member, ttl_sec); | ||
} | ||
auto Range() const { | ||
auto transform = [](const sds ptr) { return string_view{ptr, sdslen(ptr)}; }; | ||
return base::it::Transform(transform, base::it::Range(ss->begin(), ss->end())); | ||
} | ||
|
||
return res; | ||
} | ||
private: | ||
StringSetWrapper(void* robj_ptr, uint64_t now_ms) : ss(static_cast<StringSet*>(robj_ptr)) { | ||
ss->set_time(MemberTimeSeconds(now_ms)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just make sure that we don't call this on paths that don't require this. I went over the cases and they seemed fine but just double checking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I checked too. And if a command path existed, it was likely missing, not abundant 🤷🏻♂️ |
||
} | ||
|
||
void InitStrSet(CompactObj* set) { | ||
set->InitRobj(OBJ_SET, kEncodingStrMap2, CompactObj::AllocateMR<StringSet>()); | ||
} | ||
StringSet* const ss; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. west side vs east side 👀 |
||
}; | ||
|
||
// returns (removed, isempty) | ||
pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange vals, | ||
CompactObj* set) { | ||
bool isempty = false; | ||
unsigned removed = 0; | ||
|
||
if (set->Encoding() == kEncodingIntSet) { | ||
intset* is = (intset*)set->RObjPtr(); | ||
long long llval; | ||
|
||
unsigned removed = 0; | ||
for (string_view val : vals) { | ||
if (!string2ll(val.data(), val.size(), &llval)) { | ||
continue; | ||
|
@@ -135,12 +151,12 @@ pair<unsigned, bool> RemoveSet(const DbContext& db_context, facade::ArgRange val | |
is = intsetRemove(is, llval, &is_removed); | ||
removed += is_removed; | ||
} | ||
isempty = (intsetLen(is) == 0); | ||
set->SetRObjPtr(is); | ||
|
||
return {removed, intsetLen(is) == 0}; | ||
} else { | ||
return RemoveStrSet(MemberTimeSeconds(db_context.time_now_ms), vals, set); | ||
return StringSetWrapper{*set, db_context}.Remove(vals); | ||
} | ||
return make_pair(removed, isempty); | ||
} | ||
|
||
void InitSet(const NewEntries& vals, CompactObj* set) { | ||
|
@@ -158,44 +174,15 @@ void InitSet(const NewEntries& vals, CompactObj* set) { | |
intset* is = intsetNew(); | ||
set->InitRobj(OBJ_SET, kEncodingIntSet, is); | ||
} else { | ||
InitStrSet(set); | ||
} | ||
} | ||
|
||
uint64_t ScanStrSet(const DbContext& db_context, const CompactObj& co, uint64_t curs, | ||
const ScanOpts& scan_op, StringVec* res) { | ||
uint32_t count = scan_op.limit; | ||
long maxiterations = count * 10; | ||
DCHECK(IsDenseEncoding(co)); | ||
|
||
if (true) { | ||
StringSet* set = (StringSet*)co.RObjPtr(); | ||
set->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
|
||
do { | ||
auto scan_callback = [&](const sds ptr) { | ||
string_view str{ptr, sdslen(ptr)}; | ||
if (scan_op.Matches(str)) { | ||
res->push_back(std::string(str)); | ||
} | ||
}; | ||
|
||
curs = set->Scan(curs, scan_callback); | ||
|
||
} while (curs && maxiterations-- && res->size() < count); | ||
StringSetWrapper::Init(set); | ||
} | ||
return curs; | ||
} | ||
|
||
uint32_t SetTypeLen(const DbContext& db_context, const SetType& set) { | ||
if (set.second == kEncodingIntSet) { | ||
return intsetLen((const intset*)set.first); | ||
} | ||
|
||
if (true) { | ||
StringSet* ss = (StringSet*)set.first; | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
return ss->UpperBoundSize(); | ||
} else { | ||
return StringSetWrapper(set, db_context)->UpperBoundSize(); | ||
} | ||
} | ||
|
||
|
@@ -207,11 +194,7 @@ bool IsInSet(const DbContext& db_context, const SetType& st, int64_t val) { | |
char* next = absl::numbers_internal::FastIntToBuffer(val, buf); | ||
string_view str{buf, size_t(next - buf)}; | ||
|
||
if (true) { | ||
StringSet* ss = (StringSet*)st.first; | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
return ss->Contains(str); | ||
} | ||
return StringSetWrapper(st, db_context)->Contains(str); | ||
} | ||
|
||
bool IsInSet(const DbContext& db_context, const SetType& st, string_view member) { | ||
|
@@ -221,13 +204,8 @@ bool IsInSet(const DbContext& db_context, const SetType& st, string_view member) | |
return false; | ||
|
||
return intsetFind((intset*)st.first, llval); | ||
} | ||
|
||
if (true) { | ||
StringSet* ss = (StringSet*)st.first; | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
|
||
return ss->Contains(member); | ||
} else { | ||
return StringSetWrapper(st, db_context)->Contains(member); | ||
} | ||
} | ||
|
||
|
@@ -239,12 +217,8 @@ int32_t GetExpiry(const DbContext& db_context, const SetType& st, string_view me | |
return -3; | ||
|
||
return -1; | ||
} | ||
|
||
if (true) { | ||
StringSet* ss = (StringSet*)st.first; | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
|
||
} else { | ||
StringSetWrapper ss{st, db_context}; | ||
auto it = ss->Find(member); | ||
if (it == ss->end()) | ||
return -3; | ||
|
@@ -264,31 +238,21 @@ void FindInSet(StringVec& memberships, const DbContext& db_context, const SetTyp | |
// Removes arg from result. | ||
void DiffStrSet(const DbContext& db_context, const SetType& st, | ||
absl::flat_hash_set<string>* result) { | ||
if (true) { | ||
StringSet* ss = (StringSet*)st.first; | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
for (sds ptr : *ss) { | ||
result->erase(string_view{ptr, sdslen(ptr)}); | ||
} | ||
} | ||
for (string_view entry : StringSetWrapper{st, db_context}.Range()) | ||
result->erase(entry); | ||
} | ||
|
||
void InterStrSet(const DbContext& db_context, const vector<SetType>& vec, StringVec* result) { | ||
if (true) { | ||
StringSet* ss = (StringSet*)vec.front().first; | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
for (const sds ptr : *ss) { | ||
std::string_view str{ptr, sdslen(ptr)}; | ||
size_t j = 1; | ||
for (j = 1; j < vec.size(); ++j) { | ||
if (vec[j].first != ss && !IsInSet(db_context, vec[j], str)) { | ||
break; | ||
} | ||
for (string_view str : StringSetWrapper{vec.front(), db_context}.Range()) { | ||
size_t j = 1; | ||
for (j = 1; j < vec.size(); ++j) { | ||
if (vec[j].first != vec.front().first && !IsInSet(db_context, vec[j], str)) { | ||
break; | ||
} | ||
} | ||
|
||
if (j == vec.size()) { | ||
result->push_back(std::string(str)); | ||
} | ||
if (j == vec.size()) { | ||
result->emplace_back(str); | ||
} | ||
} | ||
} | ||
|
@@ -305,22 +269,14 @@ StringVec RandMemberStrSet(const DbContext& db_context, const CompactObj& co, | |
StringVec result; | ||
result.reserve(picks_count); | ||
|
||
StringSet* ss = static_cast<StringSet*>(co.RObjPtr()); | ||
ss->set_time(MemberTimeSeconds(db_context.time_now_ms)); | ||
|
||
std::uint32_t ss_entry_index = 0; | ||
container_utils::IterateSet( | ||
co, [&result, ×_index_is_picked, &ss_entry_index](container_utils::ContainerEntry ce) { | ||
auto it = times_index_is_picked.find(ss_entry_index++); | ||
if (it != times_index_is_picked.end()) { | ||
std::uint32_t t = it->second; | ||
while (t--) { | ||
result.emplace_back(ce.ToString()); | ||
} | ||
} | ||
return true; | ||
}); | ||
|
||
for (string_view str : StringSetWrapper{co, db_context}.Range()) { | ||
auto it = times_index_is_picked.find(ss_entry_index++); | ||
if (it != times_index_is_picked.end()) { | ||
while (it->second--) | ||
result.emplace_back(str); | ||
} | ||
} | ||
/* Equal elements in the result are always successive. So, it is necessary to shuffle them */ | ||
absl::BitGen gen; | ||
std::shuffle(result.begin(), result.end(), gen); | ||
|
@@ -504,11 +460,10 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE | |
InitSet(vals, &co); | ||
} | ||
|
||
void* inner_obj = co.RObjPtr(); | ||
uint32_t res = 0; | ||
|
||
if (co.Encoding() == kEncodingIntSet) { | ||
intset* is = (intset*)inner_obj; | ||
intset* is = (intset*)co.RObjPtr(); | ||
bool success = true; | ||
|
||
for (auto val : vals_it) { | ||
|
@@ -526,7 +481,6 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE | |
|
||
// frees 'is' on a way. | ||
co.InitRobj(OBJ_SET, kEncodingStrMap2, ss); | ||
inner_obj = co.RObjPtr(); | ||
break; | ||
} | ||
} | ||
|
@@ -536,7 +490,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const NewE | |
} | ||
|
||
if (co.Encoding() != kEncodingIntSet) { | ||
res = AddStrSet(op_args.db_cntx, vals, UINT32_MAX, &co); | ||
res = StringSetWrapper{co, op_args.db_cntx}.Add(vals, UINT32_MAX); | ||
} | ||
|
||
if (journal_update && op_args.shard->journal()) { | ||
|
@@ -563,7 +517,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_ | |
CompactObj& co = add_res.it->second; | ||
|
||
if (add_res.is_new) { | ||
InitStrSet(&co); | ||
StringSetWrapper::Init(&co); | ||
} else { | ||
// for non-overwrite case it must be set. | ||
if (co.ObjType() != OBJ_SET) | ||
|
@@ -582,9 +536,7 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_ | |
CHECK(IsDenseEncoding(co)); | ||
} | ||
|
||
uint32_t res = AddStrSet(op_args.db_cntx, vals, ttl_sec, &co); | ||
|
||
return res; | ||
return StringSetWrapper{co, op_args.db_cntx}.Add(vals, ttl_sec); | ||
} | ||
|
||
OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, facade::ArgRange vals, | ||
|
@@ -982,7 +934,7 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, string_view key, uint64_t* cur | |
} | ||
*cursor = 0; | ||
} else { | ||
*cursor = ScanStrSet(op_args.db_cntx, it->second, *cursor, scan_op, &res); | ||
*cursor = StringSetWrapper{it->second, op_args.db_cntx}.Scan(*cursor, scan_op, &res); | ||
} | ||
|
||
return res; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use unsigned an not
uint32_t or uint64_t
? It's less verbose 🤣