Skip to content
This repository has been archived by the owner on Dec 1, 2022. It is now read-only.

unify reset/cleanup usage in Listener/Part #484

Merged
merged 2 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions src/kvstore/Listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,13 @@ bool Listener::preProcessLog(LogID logId,

bool Listener::commitLogs(std::unique_ptr<LogIterator> iter) {
LogID lastId = -1;
TermID lastTerm = -1;
while (iter->valid()) {
lastId = iter->logId();
lastTerm = iter->logTerm();
++(*iter);
}
if (lastId > 0) {
lastId_ = lastId;
lastTerm_ = lastTerm;
leaderCommitId_ = lastId;
}
lastCommitTime_ = time::WallClock::fastNowInMilliSec();
return true;
}

Expand Down Expand Up @@ -214,7 +210,7 @@ void Listener::doApply() {
if (apply(data)) {
std::lock_guard<std::mutex> guard(raftLock_);
lastApplyLogId_ = lastApplyId;
persist(lastId_, lastTerm_, lastApplyLogId_);
persist(committedLogId_, term_, lastApplyLogId_);
VLOG(1) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
Expand Down Expand Up @@ -242,10 +238,9 @@ std::pair<int64_t, int64_t> Listener::commitSnapshot(const std::vector<std::stri
}
if (finished) {
CHECK(!raftLock_.try_lock());
lastId_ = committedLogId;
leaderCommitId_ = committedLogId;
lastApplyLogId_ = committedLogId;
lastTerm_ = committedLogTerm;
persist(committedLogId, lastTerm_, lastApplyLogId_);
persist(committedLogId, committedLogTerm, lastApplyLogId_);
LOG(INFO) << idStr_ << "Listener succeeded apply log to " << lastApplyLogId_;
lastApplyTime_ = time::WallClock::fastNowInMilliSec();
}
Expand Down
23 changes: 11 additions & 12 deletions src/kvstore/Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ derived class.
// persist last commit log id/term and lastApplyId
bool persist(LogID, TermID, LogID)

// extra cleanup work, will be invoked when listener is about to be removed
// extra cleanup work, will be invoked when listener is about to be removed, or raft is reseted
virtual void cleanup() = 0
*/

Expand All @@ -106,17 +106,19 @@ class Listener : public raftex::RaftPart {
// Stop listener
void stop() override;

int64_t logGapInMs() {
return lastCommitTime_ - lastApplyTime_;
}

LogID getApplyId() {
return lastApplyLogId_;
}

void reset() {
LOG(INFO) << idStr_ << "Clean up all wals";
wal_->reset();
void cleanup() override {
leaderCommitId_ = 0;
lastApplyLogId_ = 0;
persist(0, 0, lastApplyLogId_);
}

void resetListener() {
std::lock_guard<std::mutex> g(raftLock_);
reset();
}

protected:
Expand Down Expand Up @@ -172,11 +174,8 @@ class Listener : public raftex::RaftPart {
void doApply();

protected:
// lastId_ and lastTerm_ is same as committedLogId_ and term_
LogID lastId_ = -1;
TermID lastTerm_ = -1;
LogID leaderCommitId_ = 0;
LogID lastApplyLogId_ = 0;
int64_t lastCommitTime_ = 0;
int64_t lastApplyTime_ = 0;
std::set<HostAddr> peers_;
meta::SchemaManager* schemaMan_{nullptr};
Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ void NebulaStore::removePart(GraphSpaceID spaceId, PartitionID partId) {
CHECK_NOTNULL(e);
raftService_->removePartition(partIt->second);
diskMan_->removePartFromPath(spaceId, partId, e->getDataRoot());
partIt->second->reset();
partIt->second->resetPart();
spaceIt->second->parts_.erase(partId);
e->removePart(partId);
}
Expand Down Expand Up @@ -499,7 +499,7 @@ void NebulaStore::removeListener(GraphSpaceID spaceId,
auto listener = partIt->second.find(type);
if (listener != partIt->second.end()) {
raftService_->removePartition(listener->second);
listener->second->reset();
listener->second->resetListener();
partIt->second.erase(type);
LOG(INFO) << "Listener of type " << apache::thrift::util::enumNameSafe(type)
<< " of [Space: " << spaceId << ", Part: " << partId << "] is removed";
Expand Down
25 changes: 8 additions & 17 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,9 @@ class Part : public raftex::RaftPart {
}

// clean up all data about this part.
void reset() {
LOG(INFO) << idStr_ << "Clean up all wals";
wal()->reset();
auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_));
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error "
<< static_cast<int32_t>(res);
}
void resetPart() {
std::lock_guard<std::mutex> g(raftLock_);
reset();
}

private:
Expand Down Expand Up @@ -116,15 +111,11 @@ class Part : public raftex::RaftPart {
putCommitMsg(WriteBatch* batch, LogID committedLogId, TermID committedLogTerm);

void cleanup() override {
LOG(INFO) << idStr_ << "Clean up all data, just reset the committedLogId!";
auto batch = engine_->startBatchWrite();
if (nebula::cpp2::ErrorCode::SUCCEEDED != putCommitMsg(batch.get(), 0, 0)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return;
}
if (nebula::cpp2::ErrorCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return;
LOG(INFO) << idStr_ << "Clean rocksdb commit key";
auto res = engine_->remove(NebulaKeyUtils::systemCommitKey(partId_));
if (res != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << idStr_ << "Remove the committedLogId failed, error "
<< static_cast<int32_t>(res);
}
return;
}
Expand Down
8 changes: 4 additions & 4 deletions src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {

std::pair<LogID, TermID> lastLogInfo() const;

// Reset the part, clean up all data and WALs.
void reset();

protected:
// Protected constructor to prevent from instantiating directly
RaftPart(
Expand Down Expand Up @@ -304,12 +307,9 @@ class RaftPart : public std::enable_shared_from_this<RaftPart> {
TermID committedLogTerm,
bool finished) = 0;

// Clean up all data about current part in storage.
// Clean up extra data about the part, usually related to state machine
virtual void cleanup() = 0;

// Reset the part, clean up all data and WALs.
void reset();

void addPeer(const HostAddr& peer);

void removePeer(const HostAddr& peer);
Expand Down