Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulate toss bug fix during test. #3091

Merged
merged 4 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(1) << "leader host: " << leader;
VLOG(2) << "leader host: " << leader;

cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,5 +1192,18 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> NebulaStore::getProperty(
return folly::toJson(obj);
}

void NebulaStore::registerOnNewPartAdded(
const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts) {
for (auto& item : spaces_) {
for (auto& partItem : item.second->parts_) {
existParts.emplace_back(std::make_pair(item.first, partItem.first));
func(partItem.second);
}
}
onNewPartAdded_.insert(std::make_pair(funcName, func));
}

} // namespace kvstore
} // namespace nebula
5 changes: 2 additions & 3 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,8 @@ class NebulaStore : public KVStore, public Handler {
ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(GraphSpaceID spaceId,
const std::string& property) override;
void registerOnNewPartAdded(const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func) {
onNewPartAdded_.insert(std::make_pair(funcName, func));
}
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts);

void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); }

Expand Down
21 changes: 19 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,18 @@ void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {

void Part::setBlocking(bool sign) { blocking_ = sign; }

void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; }
void Part::onLostLeadership(TermID term) {
VLOG(1) << "Lost the leadership for the term " << term;

CallbackOptions opt;
opt.spaceId = spaceId_;
opt.partId = partId_;
opt.term = term_;

for (auto& cb : leaderLostCB_) {
cb(opt);
}
}

void Part::onElected(TermID term) {
VLOG(1) << "Being elected as the leader for the term: " << term;
Expand All @@ -191,7 +202,9 @@ void Part::onLeaderReady(TermID term) {
}
}

void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }
void Part::registerOnLeaderReady(LeaderChagneCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }

void Part::registerOnLeaderLost(LeaderChagneCB cb) { leaderLostCB_.emplace_back(std::move(cb)); }

void Part::onDiscoverNewLeader(HostAddr nLeader) {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
Expand Down Expand Up @@ -231,6 +244,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
Expand Down Expand Up @@ -272,6 +287,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
code = batch->put(op.second.first, op.second.second);
Expand Down
9 changes: 6 additions & 3 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ class Part : public raftex::RaftPart {
TermID term;
};

using LeaderReadyCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderReadyCB cb);
using LeaderChagneCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderChagneCB cb);

void registerOnLeaderLost(LeaderChagneCB cb);

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
std::string walPath_;
NewLeaderCallback newLeaderCb_ = nullptr;
std::vector<LeaderReadyCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderLostCB_;

private:
KVEngine* engine_ = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/thrift/ThriftClientManager.h"
#include "common/time/WallClock.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
#include "kvstore/raftex/LogStrListIterator.h"
#include "kvstore/wal/FileBasedWal.h"
Expand Down Expand Up @@ -1335,6 +1336,9 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
<< " i did not commit when i was leader, rollback to " << lastLogId_;
wal_->rollbackToLog(lastLogId_);
}
if (role_ == Role::LEADER) {
bgWorkers_->addTask([self = shared_from_this(), term] { self->onLostLeadership(term); });
}
role_ = Role::FOLLOWER;
votedAddr_ = candidate;
proposedTerm_ = req.get_term();
Expand Down
67 changes: 53 additions & 14 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,26 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::prepareLocal() {

auto [pro, fut] = folly::makePromiseContract<Code>();
auto primes = makePrime();
std::vector<kvstore::KV> debugPrimes;
if (FLAGS_trace_toss) {
for (auto& kv : primes) {
VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first);
}
debugPrimes = primes;
}

erasePrime();
env_->kvstore_->asyncMultiPut(
spaceId_, localPartId_, std::move(primes), [p = std::move(pro), this](auto rc) mutable {
spaceId_,
localPartId_,
std::move(primes),
[p = std::move(pro), debugPrimes, this](auto rc) mutable {
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
primeInserted_ = true;
if (FLAGS_trace_toss) {
for (auto& kv : debugPrimes) {
VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first);
}
}
} else {
LOG(WARNING) << "kvstore err: " << apache::thrift::util::enumNameSafe(rc);
LOG(WARNING) << uuid_ << "kvstore err: " << apache::thrift::util::enumNameSafe(rc);
}

p.setValue(rc);
Expand All @@ -85,10 +92,14 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processLocal(Code code) {
VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code);
}

bool remoteFailed{true};

if (code == Code::SUCCEEDED) {
// do nothing
remoteFailed = false;
} else if (code == Code::E_RPC_FAILURE) {
code_ = Code::SUCCEEDED;
remoteFailed = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we set to false here?

} else {
code_ = code;
}
Expand All @@ -106,7 +117,7 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processLocal(Code code) {
if (code_ == Code::SUCCEEDED) {
return forwardToDelegateProcessor();
} else {
if (primeInserted_) {
if (primeInserted_ && remoteFailed) {
return abort();
}
}
Expand Down Expand Up @@ -142,7 +153,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
pushResultCode(nebula::error(part), localPartId_);
return false;
}
localTerm_ = (nebula::value(part))->termId();
restrictTerm_ = (nebula::value(part))->termId();

auto vidLen = env_->schemaMan_->getSpaceVidLen(spaceId_);
if (!vidLen.ok()) {
Expand All @@ -164,7 +175,13 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::forwardToDelegateProcessor(
auto [pro, fut] = folly::makePromiseContract<Code>();
std::move(futProc).thenValue([&, p = std::move(pro)](auto&& resp) mutable {
auto rc = extractRpcError(resp);
if (rc != Code::SUCCEEDED) {
if (rc == Code::SUCCEEDED) {
if (FLAGS_trace_toss) {
for (auto& k : kvErased_) {
VLOG(1) << uuid_ << " erase prime " << folly::hexlify(k);
}
}
} else {
VLOG(1) << uuid_
<< " forwardToDelegateProcessor(), code = " << apache::thrift::util::enumNameSafe(rc);
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
Expand Down Expand Up @@ -194,7 +211,7 @@ void ChainAddEdgesProcessorLocal::doRpc(folly::Promise<Code>&& promise,
auto* iClient = env_->txnMan_->getInternalClient();
folly::Promise<Code> p;
auto f = p.getFuture();
iClient->chainAddEdges(req, localTerm_, edgeVer_, std::move(p));
iClient->chainAddEdges(req, restrictTerm_, edgeVer_, std::move(p));

std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable {
auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE;
Expand Down Expand Up @@ -229,14 +246,26 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::abort() {
if (kvErased_.empty()) {
return Code::SUCCEEDED;
}

std::vector<std::string> debugErased;
if (FLAGS_trace_toss) {
debugErased = kvErased_;
}

auto [pro, fut] = folly::makePromiseContract<Code>();
env_->kvstore_->asyncMultiRemove(
req_.get_space_id(),
localPartId_,
std::move(kvErased_),
[p = std::move(pro), this](auto rc) mutable {
[p = std::move(pro), debugErased, this](auto rc) mutable {
VLOG(1) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc);
if (rc != Code::SUCCEEDED) {
if (rc == Code::SUCCEEDED) {
if (FLAGS_trace_toss) {
for (auto& k : debugErased) {
VLOG(1) << uuid_ << "erase prime " << folly::hexlify(k);
}
}
} else {
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
}
p.setValue(rc);
Expand Down Expand Up @@ -313,9 +342,19 @@ bool ChainAddEdgesProcessorLocal::lockEdges(const cpp2::AddEdgesRequest& req) {
bool ChainAddEdgesProcessorLocal::checkTerm(const cpp2::AddEdgesRequest& req) {
auto space = req.get_space_id();
auto partId = req.get_parts().begin()->first;
auto ret = env_->txnMan_->checkTerm(space, partId, localTerm_);
LOG_IF(WARNING, !ret) << "check term failed, localTerm_ = " << localTerm_;
return ret;

auto part = env_->kvstore_->part(space, partId);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
return false;
}
auto curTerm = (nebula::value(part))->termId();
if (restrictTerm_ != curTerm) {
VLOG(1) << folly::sformat(
"check term failed, restrictTerm_={}, currTerm={}", restrictTerm_, curTerm);
return false;
}
return true;
}

// check if current edge is not newer than the one trying to resume.
Expand Down
3 changes: 2 additions & 1 deletion src/storage/transaction/ChainAddEdgesProcessorLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class ChainAddEdgesProcessorLocal : public BaseProcessor<cpp2::ExecResponse>,
cpp2::AddEdgesRequest req_;
std::unique_ptr<TransactionManager::LockGuard> lk_{nullptr};
int retryLimit_{10};
TermID localTerm_{-1};
// need to restrict all the phase in the same term.
TermID restrictTerm_{-1};
// set to true when prime insert succeed
// in processLocal(), we check this to determine if need to do abort()
bool primeInserted_{false};
Expand Down
20 changes: 16 additions & 4 deletions src/storage/transaction/ChainAddEdgesProcessorRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ namespace nebula {
namespace storage {

void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req) {
VLOG(1) << this << ConsistUtil::dumpParts(req.get_parts());
if (FLAGS_trace_toss) {
uuid_ = ConsistUtil::strUUID();
}
VLOG(1) << uuid_ << ConsistUtil::dumpParts(req.get_parts());
auto partId = req.get_parts().begin()->first;
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
if (!checkTerm(req)) {
LOG(WARNING) << "invalid term, incoming part " << partId << ", term = " << req.get_term();
LOG(WARNING) << uuid_ << " invalid term, incoming part " << partId
<< ", term = " << req.get_term();
code = nebula::cpp2::ErrorCode::E_OUTDATED_TERM;
break;
}
Expand All @@ -35,6 +39,13 @@ void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req
} while (0);

if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (FLAGS_trace_toss) {
// need to do this after set spaceVidLen_
auto keys = getStrEdgeKeys(req);
for (auto& key : keys) {
LOG(INFO) << uuid_ << ", key = " << folly::hexlify(key);
}
}
forwardRequest(req);
} else {
pushResultCode(code, partId);
Expand All @@ -53,13 +64,14 @@ void ChainAddEdgesProcessorRemote::forwardRequest(const cpp2::ChainAddEdgesReque
proc->getFuture().thenValue([=](auto&& resp) {
Code rc = Code::SUCCEEDED;
for (auto& part : resp.get_result().get_failed_parts()) {
rc = part.code;
handleErrorCode(part.code, spaceId, part.get_part_id());
}
VLOG(1) << this << " " << apache::thrift::util::enumNameSafe(rc);
VLOG(1) << uuid_ << " " << apache::thrift::util::enumNameSafe(rc);
this->result_ = resp.get_result();
this->onFinished();
});
proc->process(ConsistUtil::makeDirectAddReq(req));
proc->process(ConsistUtil::toAddEdgesRequest(req));
}

bool ChainAddEdgesProcessorRemote::checkVersion(const cpp2::ChainAddEdgesRequest& req) {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/transaction/ChainAddEdgesProcessorRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class ChainAddEdgesProcessorRemote : public BaseProcessor<cpp2::ExecResponse> {
void forwardRequest(const cpp2::ChainAddEdgesRequest& req);

std::vector<std::string> getStrEdgeKeys(const cpp2::ChainAddEdgesRequest& req);

private:
std::string uuid_; // for debug purpose
};

} // namespace storage
Expand Down
13 changes: 9 additions & 4 deletions src/storage/transaction/ChainResumeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@ void ChainResumeProcessor::process() {
auto edgeKey = std::string(it->first.c_str() + sizeof(GraphSpaceID),
it->first.size() - sizeof(GraphSpaceID));
auto partId = NebulaKeyUtils::getPart(edgeKey);
VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId
<< ", hex=" << folly::hexlify(edgeKey);
auto prefix = (it->second == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable()
: ConsistUtil::doublePrimeTable();
auto key = prefix + edgeKey;
std::string val;
auto rc = env_->kvstore_->get(spaceId, partId, key, &val);
VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId
<< ", hex = " << folly::hexlify(edgeKey)
<< ", rc = " << apache::thrift::util::enumNameSafe(rc);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
// do nothing
} else if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
// not leader any more, stop trying resume
env_->txnMan_->delPrime(spaceId, edgeKey);
VLOG(1) << "kvstore->get() leader changed";
auto getPart = env_->kvstore_->part(spaceId, partId);
if (nebula::ok(getPart) && !nebula::value(getPart)->isLeader()) {
// not leader any more, stop trying resume
env_->txnMan_->delPrime(spaceId, edgeKey);
}
continue;
} else {
LOG(WARNING) << "kvstore->get() failed, " << apache::thrift::util::enumNameSafe(rc);
Expand Down
7 changes: 6 additions & 1 deletion src/storage/transaction/ConsistUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ int64_t ConsistUtil::getTimestamp(const std::string& val) noexcept {
return *reinterpret_cast<const int64_t*>(val.data() + (val.size() - sizeof(int64_t)));
}

cpp2::AddEdgesRequest ConsistUtil::makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req) {
cpp2::AddEdgesRequest ConsistUtil::toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req) {
cpp2::AddEdgesRequest ret;
ret.set_space_id(req.get_space_id());
ret.set_parts(req.get_parts());
Expand Down Expand Up @@ -177,6 +177,11 @@ std::pair<int64_t, nebula::cpp2::ErrorCode> ConsistUtil::versionOfUpdateReq(

std::string ConsistUtil::dumpAddEdgeReq(const cpp2::AddEdgesRequest& req) {
std::stringstream oss;
oss << "prop_names.size() = " << req.get_prop_names().size() << " ";
for (auto& name : req.get_prop_names()) {
oss << name << " ";
}
oss << " ";
for (auto& part : req.get_parts()) {
// oss << dumpParts(part.second);
for (auto& edge : part.second) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/transaction/ConsistUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ConsistUtil final {

static int64_t getTimestamp(const std::string& val) noexcept;

static cpp2::AddEdgesRequest makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req);
static cpp2::AddEdgesRequest toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req);

static cpp2::EdgeKey reverseEdgeKey(const cpp2::EdgeKey& edgeKey);

Expand Down
Loading