-
Notifications
You must be signed in to change notification settings - Fork 409
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DNM] Raft integration and TMT storage review #32
Changes from 6 commits
a9655e9
2da8045
8484ef5
90f9b9b
4e773d7
5ab2fec
655103c
2198099
5122a68
a17ddaa
4b727c1
2eb08cf
eb6e215
c00e57c
496948f
ed6eddf
0282d76
fc4f0bc
b97a48a
ee01c1b
f03a03e
b8ce52f
b941e9c
e2b8404
de75e71
7960935
2803f01
a9adb5b
377265c
0cf3ce3
4ac613e
4de0525
5612116
a618601
68fe31f
eaa4a65
c89e110
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,7 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) | |
{ | ||
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true)); | ||
|
||
// REVIEW: in what case this can happen? rngine crushed? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure. just for idempotent. |
||
if (old_region->getProbableIndex() >= new_region->getProbableIndex()) | ||
{ | ||
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated"); | ||
|
@@ -82,11 +83,13 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context) | |
|
||
if (new_region->isPendingRemove()) | ||
{ | ||
// REVIEW: here we remove the region in persister, then below the region is added to persister again | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a kind of ABA problem. I have avoid the wrong process about selecting in |
||
removeRegion(region_id, context); | ||
return; | ||
} | ||
} | ||
|
||
// REVIEW: we don't need to persist region on any change. | ||
region_persister.persist(new_region); | ||
|
||
if (tmt_ctx) | ||
|
@@ -160,6 +163,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC | |
auto [it, ok] = regions.emplace(new_region->id(), new_region); | ||
if (!ok) | ||
{ | ||
// REVIEW: do we need to compare the old one and the new one? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because index of the splitted one is definitely init state. |
||
// definitely, any region's index is greater or equal than the initial one, discard it. | ||
continue; | ||
} | ||
|
@@ -177,11 +181,17 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC | |
region_persister.persist(region); | ||
} | ||
|
||
// REVIEW: if process crushed here ... | ||
|
||
if (tmt_ctx) | ||
tmt_ctx->region_table.splitRegion(curr_region, split_regions); | ||
|
||
// REVIEW: do region_table need to updateRegion of the splitted regions? | ||
} | ||
else | ||
{ | ||
// REVIEW: is the persisting order OK? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updateRegion can only add info of range, can't change |
||
|
||
if (tmt_ctx) | ||
tmt_ctx->region_table.updateRegion(curr_region, table_ids); | ||
|
||
|
@@ -272,6 +282,9 @@ void KVStore::removeRegion(RegionID region_id, Context * context) | |
} | ||
|
||
region_persister.drop(region_id); | ||
|
||
// REVIEW: if process crushed here, then when the process start again, the region_table will not find this region, is it OK? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In region_table, we don't store the range of table to disk. At any time, we should ensure that the range in region_table should contains the one in region_persister to avoid losing data. After restarted, we can recompute the range of table. |
||
|
||
if (context) | ||
context->getTMTContext().region_table.removeRegion(region); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,7 @@ class KVStore final : private boost::noncopyable | |
|
||
std::atomic<Timepoint> last_try_persist_time = Clock::now(); | ||
|
||
// REVIEW: the task_mutex looks should be one mutex per region than one big lock in kvstore | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. because of operation like split and merge, we can not regard region as independent one. |
||
// onServiceCommand and onSnapshot should not be called concurrently | ||
mutable std::mutex task_mutex; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -42,6 +42,8 @@ RegionMeta RegionMeta::deserialize(ReadBuffer & buf) | |
return RegionMeta(peer, region, apply_state, applied_term, pending_remove); | ||
} | ||
|
||
// REVIEW: lock? be carefull, can be easily deadlock. | ||
// or use member `const RegionID region_id` | ||
RegionID RegionMeta::regionId() const { return region.id(); } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it should return region_id. I added the member region_id but forgot to change this part. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
|
||
UInt64 RegionMeta::peerId() const | ||
|
@@ -101,6 +103,7 @@ void RegionMeta::doSetApplied(UInt64 index, UInt64 term) | |
applied_term = term; | ||
} | ||
|
||
// REVIEW: should move this notification to doSetApplied? | ||
void RegionMeta::notifyAll() { cv.notify_all(); } | ||
|
||
UInt64 RegionMeta::appliedIndex() const | ||
|
@@ -127,9 +130,11 @@ enginepb::CommandResponse RegionMeta::toCommandResponse() const | |
|
||
RegionMeta::RegionMeta(RegionMeta && rhs) : region_id(rhs.regionId()) | ||
{ | ||
// REVIEW: lock rhs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe no need. I just added a lock in case |
||
peer = std::move(rhs.peer); | ||
region = std::move(rhs.region); | ||
apply_state = std::move(rhs.apply_state); | ||
// REVIEW: set rhs.* to init state | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. even if a region is at initial state, its data is not empty. since we've decided to move it, it should not be used any more. |
||
applied_term = rhs.applied_term; | ||
pending_remove = rhs.pending_remove; | ||
} | ||
|
@@ -176,6 +181,7 @@ void RegionMeta::doSetPendingRemove() { pending_remove = true; } | |
void RegionMeta::waitIndex(UInt64 index) | ||
{ | ||
std::unique_lock<std::mutex> lock(mutex); | ||
// REVIEW: should we lock inside the closure function? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. std::condition_variable itself will lock. |
||
cv.wait(lock, [this, index] { return pending_remove || apply_state.applied_index() >= index; }); | ||
} | ||
|
||
|
@@ -193,6 +199,7 @@ UInt64 RegionMeta::confVer() const | |
|
||
void RegionMeta::reset(RegionMeta && rhs) | ||
{ | ||
// REVIEW: lock rhs | ||
std::lock_guard<std::mutex> lock(mutex); | ||
|
||
peer = std::move(rhs.peer); | ||
|
@@ -225,6 +232,7 @@ void RegionMeta::execChangePeer( | |
|
||
switch (change_peer_request.change_type()) | ||
{ | ||
// REVIEW: throws when meet `AddNode`? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any raft command is sent to all peer. we should discard those we don't need. |
||
case eraftpb::ConfChangeType::AddNode: | ||
case eraftpb::ConfChangeType::AddLearnerNode: | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,6 +70,7 @@ class RegionMeta | |
bool isPendingRemove() const; | ||
void setPendingRemove(); | ||
|
||
// REVIEW: assign is a better name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can be considered |
||
void reset(RegionMeta && other); | ||
|
||
friend bool operator==(const RegionMeta & meta1, const RegionMeta & meta2) | ||
|
@@ -94,6 +95,7 @@ class RegionMeta | |
void doSetApplied(UInt64 index, UInt64 term); | ||
|
||
private: | ||
// REVIEW: we should make sure all these member are deepcopy, (eg: region.peers()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. it is |
||
metapb::Peer peer; | ||
metapb::Region region; | ||
raft_serverpb::RaftApplyState apply_state; | ||
|
@@ -110,7 +112,9 @@ class RegionMeta | |
|
||
// When we create a region peer, we should initialize its log term/index > 0, | ||
// so that we can force the follower peer to sync the snapshot first. | ||
// REVIEW: why initialize term = 5? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ask menglong, this is a magic number. |
||
static constexpr UInt64 RAFT_INIT_LOG_TERM = 5; | ||
// REVIEW: raft log initialize position should be 0, after received snapshot, the position should move to 5 | ||
static constexpr UInt64 RAFT_INIT_LOG_INDEX = 5; | ||
|
||
inline raft_serverpb::RaftApplyState initialApplyState() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ extern const int LOGICAL_ERROR; | |
|
||
void RegionPersister::drop(RegionID region_id) | ||
{ | ||
// REVIEW: need mutex? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
WriteBatch wb; | ||
wb.delPage(region_id); | ||
page_storage.write(wb); | ||
|
@@ -21,6 +23,7 @@ void RegionPersister::persist(const RegionPtr & region, enginepb::CommandRespons | |
// Support only on thread persist. | ||
std::lock_guard<std::mutex> lock(mutex); | ||
|
||
// REVIEW: can we just region->resetPersistParm(void)? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I store the value first because it may be changed during |
||
size_t persist_parm = region->persistParm(); | ||
doPersist(region, response); | ||
region->markPersisted(); | ||
|
@@ -40,6 +43,7 @@ void RegionPersister::doPersist(const RegionPtr & region, enginepb::CommandRespo | |
} | ||
|
||
MemoryWriteBuffer buffer; | ||
// REVIEW: add some warning log here if region is too large | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO |
||
size_t region_size = region->serialize(buffer, response); | ||
if (unlikely(region_size > std::numeric_limits<UInt32>::max())) | ||
throw Exception("Region is too big to persist", ErrorCodes::LOGICAL_ERROR); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,12 +15,16 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) | |
|
||
enginepb::SnapshotRequest request; | ||
auto ok = read(&request); | ||
// REVIEW: use two 'throw' here, 'not ok' and 'no state' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO |
||
if (!ok || !request.has_state()) | ||
throw Exception("Failed to read snapshot state", ErrorCodes::LOGICAL_ERROR); | ||
|
||
const auto & state = request.state(); | ||
pingcap::kv::RegionClientPtr region_client = nullptr; | ||
auto meta = RegionMeta(state.peer(), state.region(), state.apply_state()); | ||
|
||
Region::RegionClientCreateFunc region_client_create = [&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr { | ||
// context may be null in test cases. | ||
if (context) | ||
{ | ||
auto & tmt_ctx = context->getTMTContext(); | ||
|
@@ -44,6 +48,8 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context) | |
auto cf_name = data.cf(); | ||
auto key = TiKVKey(); | ||
auto value = TiKVValue(); | ||
// REVIEW: the batch inserting logic is OK, but the calling stack is weird. | ||
// May be we should just do lock action on each node-inserting, it's also fast when lock contention is low. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By now, we don't need a specific function to deal with batch inserting. I implement this function just there was a TODO. |
||
region->batchInsert([&](Region::BatchInsertNode & node) -> bool { | ||
if (it == cf_data.end()) | ||
return false; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be any upcoming commits to fix this? Or just put some questions in codebase?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I think messages like this should put somewhere else like Jira instead of here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All that messages will be addressed(PRs/issues should be created) after online-video discussion, before that nothing need to do