Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add Ttl semantics to string_map #813

Merged
merged 1 commit into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions src/core/dense_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool ha
}

if (has_ttl)
it->SetTtl();
it->SetTtl(true);
return ObjectAllocSize(data);
}

Expand All @@ -97,7 +97,7 @@ void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr pt
if (it->IsEmpty()) {
it->SetObject(ptr.GetObject());
if (ptr.HasTtl())
it->SetTtl();
it->SetTtl(true);
if (ptr.IsLink()) {
FreeLink(ptr.AsLink());
}
Expand All @@ -112,7 +112,7 @@ void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr pt
// allocate a new link if needed and copy the pointer to the new link
it->SetLink(NewLink(ptr.Raw(), *it));
if (ptr.HasTtl())
it->SetTtl();
it->SetTtl(true);
DCHECK(!it->AsLink()->next.IsEmpty());
}
}
Expand Down Expand Up @@ -313,7 +313,7 @@ void DenseSet::Grow() {
}
}

void* DenseSet::AddOrFind(void* ptr, bool has_ttl) {
auto DenseSet::AddOrFindDense(void* ptr, bool has_ttl) -> DensePtr* {
uint64_t hc = Hash(ptr, 0);

if (entries_.empty()) {
Expand All @@ -332,7 +332,7 @@ void* DenseSet::AddOrFind(void* ptr, bool has_ttl) {
uint32_t bucket_id = BucketId(hc);
DensePtr* dptr = Find(ptr, bucket_id, 0).second;
if (dptr != nullptr) {
return dptr->GetObject();
return dptr;
}

DCHECK_LT(bucket_id, entries_.size());
Expand Down Expand Up @@ -373,7 +373,7 @@ void* DenseSet::AddOrFind(void* ptr, bool has_ttl) {

DensePtr to_insert(ptr);
if (has_ttl)
to_insert.SetTtl();
to_insert.SetTtl(true);

while (!entries_[bucket_id].IsEmpty() && entries_[bucket_id].IsDisplaced()) {
DensePtr unlinked = PopPtrFront(entries_.begin() + bucket_id);
Expand Down Expand Up @@ -497,6 +497,25 @@ void* DenseSet::PopInternal() {
return ret;
}

void* DenseSet::AddOrReplaceObj(void* obj, bool has_ttl) {
DensePtr* ptr = AddOrFindDense(obj, has_ttl);
if (!ptr)
return nullptr;

if (ptr->IsLink()) {
ptr = ptr->AsLink();
}

void* res = ptr->Raw();
obj_malloc_used_ -= ObjectAllocSize(res);
obj_malloc_used_ += ObjectAllocSize(obj);

ptr->SetObject(obj);
ptr->SetTtl(has_ttl);

return res;
}

/**
* stable scanning api. has the same guarantees as redis scan command.
* we avoid doing bit-reverse by using a different function to derive a bucket id
Expand Down
26 changes: 20 additions & 6 deletions src/core/dense_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,11 @@ class DenseSet {
return (uptr() & kDisplaceDirectionBit) == kDisplaceDirectionBit ? 1 : -1;
}

void SetTtl() {
ptr_ = (void*)(uptr() | kTtlBit);
void SetTtl(bool b) {
if (b)
ptr_ = (void*)(uptr() | kTtlBit);
else
ptr_ = (void*)(uptr() & (~kTtlBit));
}

void Reset() {
Expand Down Expand Up @@ -245,10 +248,6 @@ class DenseSet {
return false;
}

// Returns previous object if the object with such key already exists,
// Returns null if obj was added.
void* AddOrFind(void* obj, bool has_ttl);

void* FindInternal(const void* obj, uint32_t cookie) const;
void* PopInternal();

Expand All @@ -265,6 +264,17 @@ class DenseSet {
obj_malloc_used_ -= delta;
}

// Returns previous if the equivalent object already exists,
// Returns nullptr if obj was added.
void* AddOrFindObj(void* obj, bool has_ttl) {
DensePtr* ptr = AddOrFindDense(obj, has_ttl);
return ptr ? ptr->GetObject() : nullptr;
}

// Returns the previous object if it has been replaced.
// nullptr, if obj was added.
void* AddOrReplaceObj(void* obj, bool has_ttl);

private:
DenseSet(const DenseSet&) = delete;
DenseSet& operator=(DenseSet&) = delete;
Expand Down Expand Up @@ -297,6 +307,10 @@ class DenseSet {
void* PopDataFront(ChainVectorIterator);
DensePtr PopPtrFront(ChainVectorIterator);

// Returns DensePtr if the object with such key already exists,
// Returns null if obj was added.
DensePtr* AddOrFindDense(void* obj, bool has_ttl);

// ============ Pseudo Linked List in DenseSet end ==================

// returns (prev, item) pair. If item is root, then prev is null.
Expand Down
75 changes: 40 additions & 35 deletions src/core/string_map.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@ namespace dfly {

namespace {

constexpr uint64_t kValTtlBit = 1ULL << 63;
constexpr uint64_t kValMask = ~kValTtlBit;

inline sds GetValue(sds key) {
char* valptr = key + sdslen(key) + 1;
return (char*)absl::little_endian::Load64(valptr);
uint64_t val = absl::little_endian::Load64(valptr);
return (sds)(kValMask & val);
}

} // namespace
Expand All @@ -31,57 +35,49 @@ StringMap::~StringMap() {
}

bool StringMap::AddOrUpdate(string_view field, string_view value, uint32_t ttl_sec) {
CHECK_EQ(ttl_sec, UINT32_MAX); // TBD

// 8 additional bytes for a pointer to value.
sds newkey = AllocSdsWithSpace(field.size(), 8);
sds newkey;
size_t meta_offset = field.size() + 1;
sds sdsval = sdsnewlen(value.data(), value.size());
uint64_t sdsval_tag = uint64_t(sdsval);

if (ttl_sec == UINT32_MAX) {
// The layout is:
// key, '\0', 8-byte pointer to value
newkey = AllocSdsWithSpace(field.size(), 8);
} else {
// The layout is:
// key, '\0', 8-byte pointer to value, 4-byte absolute time.
// the value pointer it tagged.
newkey = AllocSdsWithSpace(field.size(), 8 + 4);
uint32_t at = time_now() + ttl_sec;
absl::little_endian::Store32(newkey + meta_offset + 8, at); // skip the value pointer.
sdsval_tag |= kValTtlBit;
}

if (!field.empty()) {
memcpy(newkey, field.data(), field.size());
}

sds val = sdsnewlen(value.data(), value.size());
absl::little_endian::Store64(newkey + field.size() + 1, uint64_t(val));
absl::little_endian::Store64(newkey + meta_offset, sdsval_tag);

bool has_ttl = false;
sds prev_entry = (sds)AddOrFind(newkey, has_ttl);
// Replace the whole entry.
sds prev_entry = (sds)AddOrReplaceObj(newkey, sdsval_tag & kValTtlBit);
if (prev_entry) {
sdsfree(newkey);
char* valptr = prev_entry + sdslen(prev_entry) + 1;
sds prev_val = (sds)absl::little_endian::Load64(valptr);
DecreaseMallocUsed(zmalloc_usable_size(sdsAllocPtr(prev_val)));
sdsfree(prev_val);

absl::little_endian::Store64(valptr, uint64_t(val));
IncreaseMallocUsed(zmalloc_usable_size(sdsAllocPtr(val)));

ObjDelete(prev_entry, false);
return false;
}

return true;
}

bool StringMap::AddOrSkip(std::string_view field, std::string_view value, uint32_t ttl_sec) {
CHECK_EQ(ttl_sec, UINT32_MAX); // TBD

void* obj = FindInternal(&field, 1); // 1 - string_view

if (obj)
return false;

// 8 additional bytes for a pointer to value.
sds newkey = AllocSdsWithSpace(field.size(), 8);
if (!field.empty()) {
memcpy(newkey, field.data(), field.size());
}

sds val = sdsnewlen(value.data(), value.size());
absl::little_endian::Store64(newkey + field.size() + 1, uint64_t(val));

bool has_ttl = false;
sds prev_entry = (sds)AddOrFind(newkey, has_ttl);
DCHECK(!prev_entry);

return true;
return AddOrUpdate(field, value, ttl_sec);
}

bool StringMap::Erase(string_view key) {
Expand Down Expand Up @@ -146,8 +142,17 @@ size_t StringMap::ObjectAllocSize(const void* obj) const {
}

uint32_t StringMap::ObjExpireTime(const void* obj) const {
LOG(FATAL) << "TBD";
return 0;
sds str = (sds)obj;
const char* valptr = str + sdslen(str) + 1;

uint64_t val = absl::little_endian::Load64(valptr);
DCHECK(val & kValTtlBit);
if (val & kValTtlBit) {
return absl::little_endian::Load32(valptr + 8);
}

// Should not reach.
return UINT32_MAX;
}

void StringMap::ObjDelete(void* obj, bool has_ttl) const {
Expand Down
21 changes: 21 additions & 0 deletions src/core/string_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,31 @@ TEST_F(StringMapTest, Basic) {
EXPECT_GT(sm_->ObjMallocUsed(), sz);
it = sm_->begin();
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);

EXPECT_FALSE(sm_->AddOrSkip("foo", "bar2"));
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);
}

TEST_F(StringMapTest, EmptyFind) {
sm_->Find("bar");
}

TEST_F(StringMapTest, Ttl) {
EXPECT_TRUE(sm_->AddOrUpdate("bla", "val1", 1));
EXPECT_FALSE(sm_->AddOrUpdate("bla", "val2", 1));
sm_->set_time(1);
EXPECT_TRUE(sm_->AddOrUpdate("bla", "val2", 1));
EXPECT_EQ(1u, sm_->Size());

EXPECT_FALSE(sm_->AddOrSkip("bla", "val3", 2));

// set ttl to 2, meaning that the key will expire at time 3.
EXPECT_TRUE(sm_->AddOrSkip("bla2", "val3", 2));
EXPECT_TRUE(sm_->Contains("bla2"));

sm_->set_time(3);
auto it = sm_->begin();
EXPECT_TRUE(it == sm_->end());
}

} // namespace dfly
4 changes: 2 additions & 2 deletions src/core/string_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ StringSet::~StringSet() {
}

bool StringSet::AddSds(sds s1) {
return AddOrFind(s1, false) == nullptr;
return AddOrFindObj(s1, false) == nullptr;
}

bool StringSet::Add(string_view src, uint32_t ttl_sec) {
Expand All @@ -60,7 +60,7 @@ bool StringSet::Add(string_view src, uint32_t ttl_sec) {
has_ttl = true;
}

if (AddOrFind(newsds, has_ttl) != nullptr) {
if (AddOrFindObj(newsds, has_ttl) != nullptr) {
sdsfree(newsds);
return false;
}
Expand Down