Skip to content

Commit

Permalink
accumulate bug fix for TOSS
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Oct 15, 2021
1 parent a432011 commit 203f4c2
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 83 deletions.
4 changes: 0 additions & 4 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ std::shared_ptr<Part> NebulaStore::newPart(GraphSpaceID spaceId,
}
}
raftService_->addPartition(part);
LOG(INFO) << "TransactionManager onNewPartAdded_.size()=" << onNewPartAdded_.size();
for (auto& func : onNewPartAdded_) {
func.second(part);
}
Expand Down Expand Up @@ -1197,11 +1196,8 @@ void NebulaStore::registerOnNewPartAdded(
const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts) {
LOG(INFO) << "spaces_.size() = " << spaces_.size();
for (auto& item : spaces_) {
LOG(INFO) << "registerOnNewPartAdded() space = " << item.first;
for (auto& partItem : item.second->parts_) {
LOG(INFO) << "registerOnNewPartAdded() part = " << partItem.first;
existParts.emplace_back(std::make_pair(item.first, partItem.first));
func(partItem.second);
}
Expand Down
80 changes: 11 additions & 69 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,6 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
LogType logType,
std::string log,
AtomicOp op) {
std::string debugLog = log;
if (blocking_) {
// No need to block heartbeats and empty log.
if ((logType == LogType::NORMAL && !log.empty()) || logType == LogType::ATOMIC_OP) {
Expand All @@ -604,17 +603,6 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
<< "replicatingLogs_ :" << replicatingLogs_;
return AppendLogResult::E_BUFFER_OVERFLOW;
}
LogID firstId = 0;
TermID termId = 0;
AppendLogResult res;
{
std::lock_guard<std::mutex> g(raftLock_);
res = canAppendLogs();
if (res == AppendLogResult::SUCCEEDED) {
firstId = lastLogId_ + 1;
termId = term_;
}
}
{
std::lock_guard<std::mutex> lck(logsLock_);

Expand Down Expand Up @@ -647,30 +635,6 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
break;
}

if (!debugLog.empty()) {
switch (debugLog[sizeof(int64_t)]) {
case kvstore::OP_MULTI_PUT: {
auto kvs = kvstore::decodeMultiValues(debugLog);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1])
<< " res = " << static_cast<int>(res);
}
break;
}
case kvstore::OP_BATCH_WRITE: {
auto data = kvstore::decodeBatchValue(debugLog);
for (auto& opp : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(opp.second.first)
<< ", val=" << folly::hexlify(opp.second.second);
}
break;
}
default:
break;
}
}

bool expected = false;
if (replicatingLogs_.compare_exchange_strong(expected, true)) {
// We need to send logs to all followers
Expand All @@ -685,39 +649,17 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
}
}

// LogID firstId = 0;
// TermID termId = 0;
// AppendLogResult res;
// {
// std::lock_guard<std::mutex> g(raftLock_);
// res = canAppendLogs();
// if (res == AppendLogResult::SUCCEEDED) {
// firstId = lastLogId_ + 1;
// termId = term_;
// }
// }

// if (!debugLog.empty()) {
// switch (debugLog[sizeof(int64_t)]) {
// case kvstore::OP_MULTI_PUT: {
// auto kvs = kvstore::decodeMultiValues(debugLog);
// for (size_t i = 0; i < kvs.size(); i += 2) {
// VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
// << " res = " << static_cast<int>(res);
// }
// break;
// }
// case kvstore::OP_BATCH_WRITE: {
// auto data = kvstore::decodeBatchValue(debugLog);
// for (auto& opp : data) {
// VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(opp.second.first);
// }
// break;
// }
// default:
// break;
// }
// }
LogID firstId = 0;
TermID termId = 0;
AppendLogResult res;
{
std::lock_guard<std::mutex> g(raftLock_);
res = canAppendLogs();
if (res == AppendLogResult::SUCCEEDED) {
firstId = lastLogId_ + 1;
termId = term_;
}
}

if (!checkAppendLogResult(res)) {
// Mosy likely failed because the parttion is not leader
Expand Down
2 changes: 0 additions & 2 deletions src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ void AddEdgesProcessor::doProcess(const cpp2::AddEdgesRequest& req) {
code = writeResultTo(wRet, true);
break;
} else {
LOG(INFO) << "doProcess() key=" << folly::hexlify(key)
<< ", val=" << folly::hexlify(retEnc.value());
data.emplace_back(std::move(key), std::move(retEnc.value()));
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ std::vector<kvstore::KV> ChainAddEdgesProcessorLocal::makeDoublePrime() {
void ChainAddEdgesProcessorLocal::erasePrime() {
auto fn = [&](const cpp2::NewEdge& edge) {
auto key = ConsistUtil::primeKey(spaceVidLen_, localPartId_, edge.get_key());
// VLOG(1) << uuid_ << "prepare to erase prime " << folly::hexlify(key);
return key;
};
for (auto& edge : req_.get_parts().begin()->second) {
Expand All @@ -317,7 +316,6 @@ void ChainAddEdgesProcessorLocal::erasePrime() {
void ChainAddEdgesProcessorLocal::eraseDoublePrime() {
auto fn = [&](const cpp2::NewEdge& edge) {
auto key = ConsistUtil::doublePrime(spaceVidLen_, localPartId_, edge.get_key());
// VLOG(1) << uuid_ << "prepare to erase double prime " << folly::hexlify(key);
return key;
};
for (auto& edge : req_.get_parts().begin()->second) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/transaction/ChainAddEdgesProcessorLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class ChainAddEdgesProcessorLocal : public BaseProcessor<cpp2::ExecResponse>,
cpp2::AddEdgesRequest req_;
std::unique_ptr<TransactionManager::LockGuard> lk_{nullptr};
int retryLimit_{10};
//
// need to restrict all the phase in the same term.
TermID restrictTerm_{-1};
// set to true when prime insert succeed
// in processLocal(), we check this to determine if need to do abort()
Expand Down
3 changes: 0 additions & 3 deletions src/storage/transaction/TransactionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ TransactionManager::TransactionManager(StorageEnv* env) : env_(env) {
exec_ = std::make_shared<folly::IOThreadPoolExecutor>(10);
iClient_ = env_->interClient_;
resumeThread_ = std::make_unique<thread::GenericWorker>();
// scanAll();
std::vector<std::pair<GraphSpaceID, PartitionID>> existParts;
auto fn = std::bind(&TransactionManager::onNewPartAdded, this, std::placeholders::_1);
static_cast<::nebula::kvstore::NebulaStore*>(env_->kvstore_)
Expand All @@ -42,9 +41,7 @@ TransactionManager::LockCore* TransactionManager::getLockCore(GraphSpaceID space
bool checkWhiteList) {
if (checkWhiteList) {
if (whiteListParts_.find(std::make_pair(spaceId, partId)) == whiteListParts_.end()) {
// LOG(INFO) << folly::sformat("space {}, part {} not in white list", spaceId, partId);
return nullptr;
// scanPrimes(spaceId, partId);
}
}
auto it = memLocks_.find(spaceId);
Expand Down
2 changes: 0 additions & 2 deletions src/storage/transaction/TransactionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ class TransactionManager {
* @brief only part in this white list allowed to get lock
*/
folly::ConcurrentHashMap<std::pair<GraphSpaceID, PartitionID>, int> whiteListParts_;
// std::mutex partWhiteListMu_;
// std::map<std::pair<GraphSpaceID, PartitionID>, int64_t> partWhiteList_;
};

} // namespace storage
Expand Down

0 comments on commit 203f4c2

Please sign in to comment.