Skip to content

Commit

Permalink
feat : change kv from map to skipList
Browse files Browse the repository at this point in the history
  • Loading branch information
578223592 committed Jan 5, 2024
1 parent dd9dc1f commit e02f388
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 58 deletions.
6 changes: 3 additions & 3 deletions bin/test.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
node0ip=127.0.1.1
node0port=26951
node0port=26603
node1ip=127.0.1.1
node1port=26952
node1port=26604
node2ip=127.0.1.1
node2port=26953
node2port=26605
Binary file modified lib/libskip_list_on_raft.a
Binary file not shown.
9 changes: 8 additions & 1 deletion src/raftCore/include/kvServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class KvServer : raftKVRpcProctoc::kvServerRpc {
int m_maxRaftState; // snapshot if log grows this big

// Your definitions here.
std::string m_serializedKVData; // todo : 序列化后的kv数据,理论上可以不用,但是目前没有找到特别好的替代方法
SkipList<std::string, std::string> m_skipList;
std::unordered_map<std::string, std::string> m_kvDB;

Expand Down Expand Up @@ -110,21 +111,27 @@ class KvServer : raftKVRpcProctoc::kvServerRpc {
template<class Archive>
void serialize(Archive &ar, const unsigned int version) //这里面写需要序列话和反序列化的字段
{
ar & m_kvDB;
ar & m_serializedKVData;

// ar & m_kvDB;
ar & m_lastRequestId;
}

std::string getSnapshotData() {
m_serializedKVData = m_skipList.dump_file();
std::stringstream ss;
boost::archive::text_oarchive oa(ss);
oa << *this;
m_serializedKVData.clear();
return ss.str();
}

void parseFromString(const std::string &str) {
std::stringstream ss(str);
boost::archive::text_iarchive ia(ss);
ia >> *this;
m_skipList.load_file(m_serializedKVData);
m_serializedKVData.clear();
}

/////////////////serialiazation end ///////////////////////////////
Expand Down
40 changes: 26 additions & 14 deletions src/raftCore/kvServer.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "kvServer.h" //todo : 这里为什么只能用相对路径
#include "kvServer.h"

#include <rpcprovider.h>

Expand All @@ -10,22 +10,27 @@ void KvServer::DprintfKVDB() {
}
std::lock_guard<std::mutex> lg(m_mtx);
Defer ec1([this]() -> void {
for (const auto &item: m_kvDB) {
DPrintf("[DBInfo ----]Key : %s, Value : %s", &item.first, &item.second);
}
// for (const auto &item: m_kvDB) {
// DPrintf("[DBInfo ----]Key : %s, Value : %s", &item.first, &item.second);
// }
m_skipList.display_list();
});

}

void KvServer::ExecuteAppendOpOnKVDB(Op op) {
//if op.IfDuplicate { //get请求是可重复执行的,因此可以不用判复
// return
//}
m_mtx.lock();
if (m_kvDB.find(op.Key) != m_kvDB.end()) {
m_kvDB[op.Key] = m_kvDB[op.Key] + op.Value;
} else {
m_kvDB.insert(std::make_pair(op.Key, op.Value));
}

m_skipList.insert_set_element(op.Key,op.Value);

// if (m_kvDB.find(op.Key) != m_kvDB.end()) {
// m_kvDB[op.Key] = m_kvDB[op.Key] + op.Value;
// } else {
// m_kvDB.insert(std::make_pair(op.Key, op.Value));
// }
m_lastRequestId[op.ClientId] = op.RequestId;
m_mtx.unlock();

Expand All @@ -38,10 +43,14 @@ void KvServer::ExecuteGetOpOnKVDB(Op op, std::string *value, bool *exist) {
m_mtx.lock();
*value = "";
*exist = false;
if (m_kvDB.find(op.Key) != m_kvDB.end()) {
if(m_skipList.search_element(op.Key, *value)) {
*exist = true;
*value = m_kvDB[op.Key];
// *value = m_skipList.se //value已经完成赋值了
}
// if (m_kvDB.find(op.Key) != m_kvDB.end()) {
// *exist = true;
// *value = m_kvDB[op.Key];
// }
m_lastRequestId[op.ClientId] = op.RequestId;
m_mtx.unlock();

Expand All @@ -56,7 +65,8 @@ void KvServer::ExecuteGetOpOnKVDB(Op op, std::string *value, bool *exist) {

void KvServer::ExecutePutOpOnKVDB(Op op) {
m_mtx.lock();
m_kvDB[op.Key] = op.Value;
m_skipList.insert_set_element(op.Key,op.Value);
// m_kvDB[op.Key] = op.Value;
m_lastRequestId[op.ClientId] = op.RequestId;
m_mtx.unlock();

Expand Down Expand Up @@ -357,7 +367,8 @@ void KvServer::Get(google::protobuf::RpcController *controller, const ::raftKVRp
done->Run();
}

KvServer::KvServer(int me, int maxraftstate, std::string nodeInforFileName, short port) {
KvServer::KvServer(int me, int maxraftstate, std::string nodeInforFileName, short port):
m_skipList(6){
std::shared_ptr<Persister> persister = std::make_shared<Persister>(me);

m_me = me;
Expand Down Expand Up @@ -418,7 +429,8 @@ KvServer::KvServer(int me, int maxraftstate, std::string nodeInforFileName, shor
//////////////////////////////////

// You may need initialization code here.
m_kvDB; //kvdb初始化
// m_kvDB; //kvdb初始化
m_skipList;
waitApplyCh;
m_lastRequestId;
m_lastSnapShotRaftLogIndex = 0; //todo:感覺這個函數沒什麼用,不如直接調用raft節點中的snapshot值???
Expand Down
135 changes: 95 additions & 40 deletions src/skipList/include/skipList.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

#define STORE_FILE "store/dumpFile"

std::mutex mtx; // mutex for critical section
std::string delimiter = ":";

static std::string delimiter = ":";

//Class template to implement node
template<typename K, typename V>
Expand Down Expand Up @@ -83,7 +83,22 @@ template<typename K, typename V>
void Node<K, V>::set_value(V value) {
this->value=value;
};
//Class template to implement node
template<typename K, typename V>
class SkipListDump {
public:
friend class boost::serialization::access;

template<class Archive>
void serialize(Archive &ar, const unsigned int version) {
ar & keyDumpVt_;
ar & valDumpVt_;
}
std::vector<K> keyDumpVt_;
std::vector<V> valDumpVt_;
public:
void insert(const Node<K, V> &node);
};
// Class template for Skip list
template <typename K, typename V>
class SkipList {
Expand All @@ -95,10 +110,11 @@ class SkipList {
Node<K, V>* create_node(K, V, int);
int insert_element(K, V);
void display_list();
bool search_element(K);
bool search_element(K, V &value);
void delete_element(K);
void dump_file();
void load_file();
void insert_set_element(K&,V&);
std::string dump_file();
void load_file(const std::string &dumpStr);
//递归删除节点
void clear(Node<K,V>*);
int size();
Expand All @@ -123,6 +139,8 @@ class SkipList {

// skiplist current element count
int _element_count;

std::mutex _mtx; // mutex for critical section
};

// create new node
Expand Down Expand Up @@ -158,7 +176,7 @@ level 0 1 4 9 10 30 40 | 50 | 60 70 100
template<typename K, typename V>
int SkipList<K, V>::insert_element(const K key, const V value) {

mtx.lock();
_mtx.lock();
Node<K, V> *current = this->_header;

// create update array and initialize it
Expand All @@ -180,7 +198,7 @@ int SkipList<K, V>::insert_element(const K key, const V value) {
// if current node have key equal to searched key, we get it
if (current != NULL && current->get_key() == key) {
std::cout << "key: " << key << ", exists" << std::endl;
mtx.unlock();
_mtx.unlock();
return 1;
}

Expand Down Expand Up @@ -210,7 +228,7 @@ int SkipList<K, V>::insert_element(const K key, const V value) {
std::cout << "Successfully inserted key:" << key << ", value:" << value << std::endl;
_element_count ++;
}
mtx.unlock();
_mtx.unlock();
return 0;
}

Expand All @@ -230,46 +248,62 @@ void SkipList<K, V>::display_list() {
}
}

// todo 对dump 和 load 后面可能要考虑加锁的问题
// Dump data in memory to file
template<typename K, typename V>
void SkipList<K, V>::dump_file() {
std::string SkipList<K, V>::dump_file() {

std::cout << "dump_file-----------------" << std::endl;
_file_writer.open(STORE_FILE);
// std::cout << "dump_file-----------------" << std::endl;
//
//
// _file_writer.open(STORE_FILE);
Node<K, V> *node = this->_header->forward[0];

while (node != NULL) {
_file_writer << node->get_key() << ":" << node->get_value() << "\n";
std::cout << node->get_key() << ":" << node->get_value() << ";\n";
SkipListDump<K, V> dumper;
while (node != nullptr) {
dumper.insert(*node);
// _file_writer << node->get_key() << ":" << node->get_value() << "\n";
// std::cout << node->get_key() << ":" << node->get_value() << ";\n";
node = node->forward[0];
}

_file_writer.flush();
_file_writer.close();
return ;
std::stringstream ss;
boost::archive::text_oarchive oa(ss);
oa<< dumper;
return ss.str();
// _file_writer.flush();
// _file_writer.close();
}

// Load data from disk
template<typename K, typename V>
void SkipList<K, V>::load_file() {

_file_reader.open(STORE_FILE);
std::cout << "load_file-----------------" << std::endl;
std::string line;
std::string* key = new std::string();
std::string* value = new std::string();
while (getline(_file_reader, line)) {
get_key_value_from_string(line, key, value);
if (key->empty() || value->empty()) {
continue;
}
// Define key as int type
insert_element(stoi(*key), *value);
std::cout << "key:" << *key << "value:" << *value << std::endl;
void SkipList<K, V>::load_file(const std::string &dumpStr) {
// _file_reader.open(STORE_FILE);
// std::cout << "load_file-----------------" << std::endl;
// std::string line;
// std::string* key = new std::string();
// std::string* value = new std::string();
// while (getline(_file_reader, line)) {
// get_key_value_from_string(line, key, value);
// if (key->empty() || value->empty()) {
// continue;
// }
// // Define key as int type
// insert_element(stoi(*key), *value);
// std::cout << "key:" << *key << "value:" << *value << std::endl;
// }
// delete key;
// delete value;
// _file_reader.close();

if(dumpStr.empty()) {
return ;
}
SkipListDump<K, V> dumper;
std::stringstream iss(dumpStr);
boost::archive::text_iarchive ia(iss);
ia >> dumper;
for(int i = 0;i<dumper.keyDumpVt_.size();++i) {
insert_element(dumper.keyDumpVt_[i], dumper.keyDumpVt_[i]);
}
delete key;
delete value;
_file_reader.close();
}

// Get current SkipList size
Expand Down Expand Up @@ -304,7 +338,7 @@ bool SkipList<K, V>::is_valid_string(const std::string& str) {
template<typename K, typename V>
void SkipList<K, V>::delete_element(K key) {

mtx.lock();
_mtx.lock();
Node<K, V> *current = this->_header;
Node<K, V> *update[_max_level+1];
memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));
Expand Down Expand Up @@ -339,10 +373,24 @@ void SkipList<K, V>::delete_element(K key) {
delete current;
_element_count --;
}
mtx.unlock();
_mtx.unlock();
return;
}

/**
* \brief 作用与insert_element相同类似,
* insert_element是插入新元素,
* insert_set_element是插入元素,如果元素存在则改变其值
*/
template<typename K, typename V>
void SkipList<K, V>::insert_set_element(K &key, V &value) {
V oldValue;
if(search_element(key,oldValue)) {
delete_element(key);
}
insert_element(key,value);
}

// Search for element in skip list
/*
+------------+
Expand All @@ -363,7 +411,7 @@ level 1 1 4 10 30 50| 70 100
level 0 1 4 9 10 30 40 50+-->60 70 100
*/
template<typename K, typename V>
bool SkipList<K, V>::search_element(K key) {
bool SkipList<K, V>::search_element(K key,V &value) {

std::cout << "search_element-----------------" << std::endl;
Node<K, V> *current = _header;
Expand All @@ -380,6 +428,7 @@ bool SkipList<K, V>::search_element(K key) {

// if current node have key equal to searched key, we get it
if (current and current->get_key() == key) {
value = current->get_value();
std::cout << "Found key: " << key << ", value: " << current->get_value() << std::endl;
return true;
}
Expand All @@ -388,6 +437,12 @@ bool SkipList<K, V>::search_element(K key) {
return false;
}

template<typename K, typename V>
void SkipListDump<K, V>::insert(const Node<K, V> &node) {
keyDumpVt_.emplace_back(node.get_key());
valDumpVt_.emplace_back(node.get_value());
}

// construct skip list
template<typename K, typename V>
SkipList<K, V>::SkipList(int max_level) {
Expand Down
Loading

0 comments on commit e02f388

Please sign in to comment.