Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: WrapSds from family_utils.h #3818

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ endif()

add_library(dragonfly_lib bloom_family.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc engine_shard.cc
engine_shard_set.cc
engine_shard_set.cc family_utils.cc
generic_family.cc hset_family.cc http_api.cc json_family.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
Expand Down
31 changes: 0 additions & 31 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -423,37 +423,6 @@ std::ostream& operator<<(std::ostream& os, const GlobalState& state) {
return os << GlobalStateName(state);
}

NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
CHECK_GT(max_range, RandomPick(0));
}

RandomPick NonUniquePicksGenerator::Generate() {
return absl::Uniform(bitgen_, 0u, max_range_);
}

UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range)
: remaining_picks_count_(picks_count), picked_indexes_(picks_count) {
CHECK_GE(max_range, picks_count);
current_random_limit_ = max_range - picks_count;
}

RandomPick UniquePicksGenerator::Generate() {
DCHECK_GT(remaining_picks_count_, 0u);

remaining_picks_count_--;

const RandomPick max_index = current_random_limit_++;
const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u);

const bool random_index_is_picked = picked_indexes_.emplace(random_index).second;
if (random_index_is_picked) {
return random_index;
}

picked_indexes_.insert(max_index);
return max_index;
}

ThreadLocalMutex::ThreadLocalMutex() {
shard_ = EngineShard::tlocal();
}
Expand Down
44 changes: 0 additions & 44 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

#pragma once

#include <absl/random/random.h>
#include <absl/strings/ascii.h>
#include <absl/strings/str_cat.h>
#include <absl/types/span.h>
Expand Down Expand Up @@ -318,49 +317,6 @@ struct MemoryBytesFlag {
bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err);
std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag);

using RandomPick = std::uint32_t;

class PicksGenerator {
public:
virtual RandomPick Generate() = 0;
virtual ~PicksGenerator() = default;
};

class NonUniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
NonUniquePicksGenerator(RandomPick max_range);

RandomPick Generate() override;

private:
const RandomPick max_range_;
absl::BitGen bitgen_{};
};

/*
* Generates unique index in O(1).
*
* picks_count specifies the number of random indexes to be generated.
* In other words, this is the number of times the Generate() function is called.
*
* The class uses Robert Floyd's sampling algorithm
* https://dl.acm.org/doi/pdf/10.1145/30401.315746
* */
class UniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range);

RandomPick Generate() override;

private:
RandomPick current_random_limit_;
std::uint32_t remaining_picks_count_;
std::unordered_set<RandomPick> picked_indexes_;
absl::BitGen bitgen_{};
};

// Helper class used to guarantee atomicity between serialization of buckets
class ABSL_LOCKABLE ThreadLocalMutex {
public:
Expand Down
6 changes: 0 additions & 6 deletions src/server/engine_shard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,11 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap),
shard_id_(pb->GetPoolIndex()) {
tmp_str1 = sdsempty();

defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
}

EngineShard::~EngineShard() {
sdsfree(tmp_str1);
}

void EngineShard::Shutdown() {
DVLOG(1) << "EngineShard::Shutdown";

Expand Down
6 changes: 0 additions & 6 deletions src/server/engine_shard.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ class EngineShard {
Stats& operator+=(const Stats&);
};

// EngineShard() is private down below.
~EngineShard();

// Sets up a new EngineShard in the thread.
// If update_db_time is true, initializes periodic time update for its db_slice.
static void InitThreadLocal(util::ProactorBase* pb);
Expand Down Expand Up @@ -122,9 +119,6 @@ class EngineShard {
return shard_search_indices_.get();
}

// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1;

// Moving average counters.
enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };

Expand Down
43 changes: 43 additions & 0 deletions src/server/family_utils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include "server/family_utils.h"

#include "base/logging.h"

namespace dfly {

sds WrapSds(std::string_view s) {
static __thread sds tmp_sds = sdsempry();
kostasrim marked this conversation as resolved.
Show resolved Hide resolved
return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length());
}

NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
CHECK_GT(max_range, RandomPick(0));
}

RandomPick NonUniquePicksGenerator::Generate() {
return absl::Uniform(bitgen_, 0u, max_range_);
}

UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range)
: remaining_picks_count_(picks_count), picked_indexes_(picks_count) {
CHECK_GE(max_range, picks_count);
current_random_limit_ = max_range - picks_count;
}

RandomPick UniquePicksGenerator::Generate() {
DCHECK_GT(remaining_picks_count_, 0u);

remaining_picks_count_--;

const RandomPick max_index = current_random_limit_++;
const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u);

const bool random_index_is_picked = picked_indexes_.emplace(random_index).second;
if (random_index_is_picked) {
return random_index;
}

picked_indexes_.insert(max_index);
return max_index;
}

} // namespace dfly
63 changes: 63 additions & 0 deletions src/server/family_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//

#pragma once

#include <absl/container/flat_hash_set.h>
#include <absl/random/random.h>

#include <cstdint>

extern "C" {
#include "redis/sds.h"
}
namespace dfly {

// Copy str to thread local sds instance. Valid until next WrapSds call on thread
sds WrapSds(std::string_view str);

using RandomPick = uint32_t;

class PicksGenerator {
public:
virtual RandomPick Generate() = 0;
virtual ~PicksGenerator() = default;
};

class NonUniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
NonUniquePicksGenerator(RandomPick max_range);

RandomPick Generate() override;

private:
const RandomPick max_range_;
absl::BitGen bitgen_{};
};

/*
* Generates unique index in O(1).
*
* picks_count specifies the number of random indexes to be generated.
* In other words, this is the number of times the Generate() function is called.
*
* The class uses Robert Floyd's sampling algorithm
* https://dl.acm.org/doi/pdf/10.1145/30401.315746
* */
class UniquePicksGenerator : public PicksGenerator {
public:
/* The generated value will be within the closed-open interval [0, max_range) */
UniquePicksGenerator(uint32_t picks_count, RandomPick max_range);

RandomPick Generate() override;

private:
RandomPick current_random_limit_;
uint32_t remaining_picks_count_;
absl::flat_hash_set<RandomPick> picked_indexes_;
absl::BitGen bitgen_{};
};

} // namespace dfly
6 changes: 3 additions & 3 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/server_state.h"
#include "server/family_utils.h"
#include "server/transaction.h"

/**
Expand Down Expand Up @@ -283,8 +283,8 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;

for (string_view v : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size());
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
auto vsds = WrapSds(v);
quicklistPush(ql, vsds, sdslen(vsds), pos);
}

if (res.is_new) {
Expand Down
2 changes: 2 additions & 0 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

#include "server/set_family.h"

#include "server/family_utils.h"

extern "C" {
#include "redis/intset.h"
#include "redis/redis_aux.h"
Expand Down
Loading
Loading