Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Raft integration and TMT storage review #32

Closed
wants to merge 37 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a9655e9
RaftService/KVStore review
innerr Apr 2, 2019
2da8045
Reviewing StorageMergeTree
innerr Apr 3, 2019
8484ef5
StorageMergeTree review
innerr Apr 3, 2019
90f9b9b
RegionPersister review. Reviwing Region
innerr Apr 4, 2019
4e773d7
Merge branch 'raft-tmt-review' of github.com:pingcap/tics into raft-t…
innerr Apr 4, 2019
5ab2fec
RegionMeta review
innerr Apr 8, 2019
655103c
Merge master
innerr Apr 9, 2019
2198099
Region reviewing
innerr Apr 9, 2019
5122a68
Region reviewing
innerr Apr 10, 2019
a17ddaa
Resolve conflicts
innerr Apr 10, 2019
4b727c1
Merge branch 'master' into raft-tmt-review
innerr Apr 10, 2019
2eb08cf
Merge branch 'master' of github.com:pingcap/tics into raft-tmt-review
innerr Apr 10, 2019
eb6e215
Merge branch 'raft-tmt-review' of github.com:pingcap/tics into raft-t…
innerr Apr 10, 2019
c00e57c
Region reviewing
innerr Apr 10, 2019
496948f
client-c (aka: PD/Region Client) review
innerr Apr 10, 2019
ed6eddf
Region review
innerr Apr 10, 2019
0282d76
Region review
innerr Apr 11, 2019
fc4f0bc
RegionData review
innerr Apr 11, 2019
b97a48a
Codec/RegionTable reviewing
innerr Apr 11, 2019
ee01c1b
Exceptions reviewing
innerr Apr 11, 2019
f03a03e
Merge branch 'master' of github.com:pingcap/tics into raft-tmt-review
innerr Apr 12, 2019
b8ce52f
RegionTable reviewing
innerr Apr 12, 2019
b941e9c
Merge branch 'master' of github.com:pingcap/tics into raft-tmt-review
innerr Apr 15, 2019
e2b8404
RegionTable review
innerr Apr 15, 2019
de75e71
Reviewing
innerr Apr 15, 2019
7960935
Codec review
innerr Apr 15, 2019
2803f01
HashCheckHelper review
innerr Apr 16, 2019
a9adb5b
TiDB.* and TiKVKeyValue review
innerr Apr 16, 2019
377265c
Update TODO
innerr Apr 16, 2019
0cf3ce3
RegionBlockReader review
innerr Apr 16, 2019
4ac613e
PageStorage reviewing
innerr Apr 17, 2019
4de0525
PageFile reviewing
innerr Apr 18, 2019
5612116
PageStorage reviewing
innerr Apr 21, 2019
a618601
PageStorage review
innerr Apr 21, 2019
68fe31f
DataStreams / Storages review
innerr Apr 22, 2019
eaa4a65
Group review done
innerr Apr 24, 2019
c89e110
Review the reviewing
innerr Apr 24, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dbms/src/Raft/RaftService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ RaftService::RaftService(const std::string & address_, DB::Context & db_context_

RaftService::~RaftService()
{
// REVIEW: can we remove this mutex?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will there be any upcoming commits to fix this? Or just put some questions in codebase?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I think messages like this should put somewhere else like Jira instead of here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All that messages will be addressed(PRs/issues should be created) after online-video discussion, before that nothing need to do

std::lock_guard<std::mutex> lock{mutex};
grpc_server->Shutdown();
grpc_server->Wait();
Expand All @@ -42,10 +43,12 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,
{
kvstore->report(rctx);

// REVIEW: persisting interval is?
persist_handle = background_pool.addTask([&, this] { return kvstore->tryPersistAndReport(rctx); });
flush_handle = background_pool.addTask([&] { return region_table.tryFlushRegions(); });

enginepb::CommandRequestBatch request;
// REVIEW: should we use EOS flag?
while (stream->Read(&request))
{
applyCommand(rctx, request);
Expand All @@ -56,11 +59,13 @@ grpc::Status RaftService::ApplyCommandBatch(grpc::ServerContext * grpc_context,
tryLogCurrentException(log, "gRPC ApplyCommandBatch on " + address + " error");
}

// REVIEW: is this removing will cause persisting missing? For example, call write some data, and then this conn broke before flushing
if (persist_handle)
background_pool.removeTask(persist_handle);
if (flush_handle)
background_pool.removeTask(flush_handle);

// REVIEW: should we use OK?
return grpc::Status::CANCELLED;
}

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ BlockInputStreams StorageMergeTree::read(
auto res = reader.read(column_names, query_info, context, processed_stage, max_block_size, num_streams, 0);
const ASTSelectQuery * select_query = typeid_cast<const ASTSelectQuery *>(query_info.query.get());

// REVIEW: move engine specified processing to SelectExecutor may be better
if (data.merging_params.mode == MergeTreeData::MergingParams::Mutable ||
data.merging_params.mode == MergeTreeData::MergingParams::Txn)
{
Expand Down Expand Up @@ -180,6 +181,10 @@ BlockOutputStreamPtr StorageMergeTree::write(const ASTPtr & query, const Setting
const ASTInsertQuery * insert_query = typeid_cast<const ASTInsertQuery *>(&*query);
const ASTDeleteQuery * delete_query = typeid_cast<const ASTDeleteQuery *>(&*query);

// REVIEW: move engine specified processing to lower layer may be better:
// switch (engine type) -> MergeTreeBlockOutputStream
// -> TxnMergeTreeBlockOutputStream
// -> MutableMergeTreeBlockOutputStream
if (data.merging_params.mode == MergeTreeData::MergingParams::Txn)
{
res = std::make_shared<TxnMergeTreeBlockOutputStream>(*this);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/StorageMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ friend class TxnMergeTreeBlockOutputStream;

bool checkTableCanBeDropped() const override;

// REVIEW: get/set TableInfo => data
const TableInfo & getTableInfo() const { return data.table_info; }
void setTableInfo(const TableInfo & table_info_) { data.table_info = table_info_; }

Expand Down
13 changes: 13 additions & 0 deletions dbms/src/Storages/Transaction/KVStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context)
{
LOG_DEBUG(log, "KVStore::onSnapshot: previous " << old_region->toString(true) << " ; new " << new_region->toString(true));

// REVIEW: in what case this can happen? rngine crushed?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure. just for idempotent.

if (old_region->getProbableIndex() >= new_region->getProbableIndex())
{
LOG_DEBUG(log, "KVStore::onSnapshot: discard new region because of index is outdated");
Expand All @@ -82,11 +83,13 @@ void KVStore::onSnapshot(RegionPtr new_region, Context * context)

if (new_region->isPendingRemove())
{
// REVIEW: here we remove the region in persister, then below the region is added to persister again
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a kind of ABA problem. I have avoid the wrong process about selecting in applySnapshot and MergeTreeDataSelectExecutor::read.
However one region may appear in two store, because rngine may crush during send raft log which contains ChangePeer cmd. It will bring disaster to our learner read process.

removeRegion(region_id, context);
return;
}
}

// REVIEW: we don't need to persist region on any change.
region_persister.persist(new_region);

if (tmt_ctx)
Expand Down Expand Up @@ -160,6 +163,7 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
auto [it, ok] = regions.emplace(new_region->id(), new_region);
if (!ok)
{
// REVIEW: do we need to compare the old one and the new one?
Copy link
Contributor

@solotzg solotzg Apr 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because index of the splitted one is definitely init state.

// definitely, any region's index is greater or equal than the initial one, discard it.
continue;
}
Expand All @@ -177,11 +181,17 @@ void KVStore::onServiceCommand(const enginepb::CommandRequestBatch & cmds, RaftC
region_persister.persist(region);
}

// REVIEW: if process crushed here ...

if (tmt_ctx)
tmt_ctx->region_table.splitRegion(curr_region, split_regions);

// REVIEW: do region_table need to updateRegion of the splitted regions?
}
else
{
// REVIEW: is the persisting order OK?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateRegion can only add info of range, can't change


if (tmt_ctx)
tmt_ctx->region_table.updateRegion(curr_region, table_ids);

Expand Down Expand Up @@ -272,6 +282,9 @@ void KVStore::removeRegion(RegionID region_id, Context * context)
}

region_persister.drop(region_id);

// REVIEW: if process crushed here, then when the process start again, the region_table will not find this region, is it OK?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In region_table, we don't store the range of table to disk. At any time, we should ensure that the range in region_table should contains the one in region_persister to avoid losing data. After restarted, we can recompute the range of table.


if (context)
context->getTMTContext().region_table.removeRegion(region);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class KVStore final : private boost::noncopyable

std::atomic<Timepoint> last_try_persist_time = Clock::now();

// REVIEW: the task_mutex looks should be one mutex per region than one big lock in kvstore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of operation like split and merge, we can not regard region as independent one.

// onServiceCommand and onSnapshot should not be called concurrently
mutable std::mutex task_mutex;

Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/Transaction/Region.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ void Region::setPendingRemove()

size_t Region::dataSize() const { return cf_data_size; }

// REVIEW: reset persist_parm here?
void Region::markPersisted() { last_persist_time = Clock::now(); }

Timepoint Region::lastPersistTime() const { return last_persist_time; }
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Transaction/Region.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ class Region : public std::enable_shared_from_this<Region>
TableID insert(const std::string & cf, const TiKVKey & key, const TiKVValue & value);
TableID remove(const std::string & cf, const TiKVKey & key);

// REVIEW: 'node' seems not a good name here
using BatchInsertNode = std::tuple<const TiKVKey *, const TiKVValue *, const String *>;
void batchInsert(std::function<bool(BatchInsertNode &)> && f);

Expand All @@ -164,6 +165,8 @@ class Region : public std::enable_shared_from_this<Region>

void markPersisted();
Timepoint lastPersistTime() const;

// REVIEW: use a dirty flag instead?
size_t persistParm() const;
void decPersistParm(size_t x);
void incPersistParm();
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Transaction/RegionMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ RegionMeta RegionMeta::deserialize(ReadBuffer & buf)
return RegionMeta(peer, region, apply_state, applied_term, pending_remove);
}

// REVIEW: lock? be carefull, can be easily deadlock.
// or use member `const RegionID region_id`
RegionID RegionMeta::regionId() const { return region.id(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should return region_id. I added the member region_id but forgot to change this part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


UInt64 RegionMeta::peerId() const
Expand Down Expand Up @@ -101,6 +103,7 @@ void RegionMeta::doSetApplied(UInt64 index, UInt64 term)
applied_term = term;
}

// REVIEW: should move this notification to doSetApplied?
void RegionMeta::notifyAll() { cv.notify_all(); }

UInt64 RegionMeta::appliedIndex() const
Expand All @@ -127,9 +130,11 @@ enginepb::CommandResponse RegionMeta::toCommandResponse() const

RegionMeta::RegionMeta(RegionMeta && rhs) : region_id(rhs.regionId())
{
// REVIEW: lock rhs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe no need. I just added a lock in case

peer = std::move(rhs.peer);
region = std::move(rhs.region);
apply_state = std::move(rhs.apply_state);
// REVIEW: set rhs.* to init state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if a region is at initial state, its data is not empty. since we've decided to move it, it should not be used any more.

applied_term = rhs.applied_term;
pending_remove = rhs.pending_remove;
}
Expand Down Expand Up @@ -176,6 +181,7 @@ void RegionMeta::doSetPendingRemove() { pending_remove = true; }
void RegionMeta::waitIndex(UInt64 index)
{
std::unique_lock<std::mutex> lock(mutex);
// REVIEW: should we lock inside the closure function?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::condition_variable itself will lock.

cv.wait(lock, [this, index] { return pending_remove || apply_state.applied_index() >= index; });
}

Expand All @@ -193,6 +199,7 @@ UInt64 RegionMeta::confVer() const

void RegionMeta::reset(RegionMeta && rhs)
{
// REVIEW: lock rhs
std::lock_guard<std::mutex> lock(mutex);

peer = std::move(rhs.peer);
Expand Down Expand Up @@ -225,6 +232,7 @@ void RegionMeta::execChangePeer(

switch (change_peer_request.change_type())
{
// REVIEW: throws when meet `AddNode`?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any raft command is sent to all peer. we should discard those we don't need.

case eraftpb::ConfChangeType::AddNode:
case eraftpb::ConfChangeType::AddLearnerNode:
{
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/RegionMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class RegionMeta
bool isPendingRemove() const;
void setPendingRemove();

// REVIEW: assign is a better name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be considered

void reset(RegionMeta && other);

friend bool operator==(const RegionMeta & meta1, const RegionMeta & meta2)
Expand All @@ -94,6 +95,7 @@ class RegionMeta
void doSetApplied(UInt64 index, UInt64 term);

private:
// REVIEW: we should make sure all these member are deepcopy, (eg: region.peers())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. it is

metapb::Peer peer;
metapb::Region region;
raft_serverpb::RaftApplyState apply_state;
Expand All @@ -110,7 +112,9 @@ class RegionMeta

// When we create a region peer, we should initialize its log term/index > 0,
// so that we can force the follower peer to sync the snapshot first.
// REVIEW: why initialize term = 5?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ask menglong, this is a magic number.

static constexpr UInt64 RAFT_INIT_LOG_TERM = 5;
// REVIEW: raft log initialize position should be 0, after received snapshot, the position should move to 5
static constexpr UInt64 RAFT_INIT_LOG_INDEX = 5;

inline raft_serverpb::RaftApplyState initialApplyState()
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/RegionPersister.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ extern const int LOGICAL_ERROR;

void RegionPersister::drop(RegionID region_id)
{
// REVIEW: need mutex?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


WriteBatch wb;
wb.delPage(region_id);
page_storage.write(wb);
Expand All @@ -21,6 +23,7 @@ void RegionPersister::persist(const RegionPtr & region, enginepb::CommandRespons
// Support only on thread persist.
std::lock_guard<std::mutex> lock(mutex);

// REVIEW: can we just region->resetPersistParm(void)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I store the value first because it may be changed during doPersist.

size_t persist_parm = region->persistParm();
doPersist(region, response);
region->markPersisted();
Expand All @@ -40,6 +43,7 @@ void RegionPersister::doPersist(const RegionPtr & region, enginepb::CommandRespo
}

MemoryWriteBuffer buffer;
// REVIEW: add some warning log here if region is too large
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

size_t region_size = region->serialize(buffer, response);
if (unlikely(region_size > std::numeric_limits<UInt32>::max()))
throw Exception("Region is too big to persist", ErrorCodes::LOGICAL_ERROR);
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Storages/Transaction/applySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context)

enginepb::SnapshotRequest request;
auto ok = read(&request);
// REVIEW: use two 'throw' here, 'not ok' and 'no state'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO

if (!ok || !request.has_state())
throw Exception("Failed to read snapshot state", ErrorCodes::LOGICAL_ERROR);

const auto & state = request.state();
pingcap::kv::RegionClientPtr region_client = nullptr;
auto meta = RegionMeta(state.peer(), state.region(), state.apply_state());

Region::RegionClientCreateFunc region_client_create = [&](pingcap::kv::RegionVerID id) -> pingcap::kv::RegionClientPtr {
// context may be null in test cases.
if (context)
{
auto & tmt_ctx = context->getTMTContext();
Expand All @@ -44,6 +48,8 @@ void applySnapshot(KVStorePtr kvstore, RequestReader read, Context * context)
auto cf_name = data.cf();
auto key = TiKVKey();
auto value = TiKVValue();
// REVIEW: the batch inserting logic is OK, but the calling stack is weird.
// May be we should just do lock action on each node-inserting, it's also fast when lock contention is low.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By now, we don't need a specific function to deal with batch inserting. I implement this function just there was a TODO.

region->batchInsert([&](Region::BatchInsertNode & node) -> bool {
if (it == cf_data.end())
return false;
Expand Down