-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add db-upgrade V3 #3417
add db-upgrade V3 #3417
Changes from 8 commits
aabff08
3b62a1a
1f51f3b
62f7d98
84a3581
3a0412b
b8c118a
6399d5a
7145f6b
7b92dab
b4c80ad
06db765
fdbf0c4
cd9b0e5
9c2ff71
19782be
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,10 @@ | |
#include "common/fs/FileUtils.h" | ||
#include "common/utils/IndexKeyUtils.h" | ||
#include "common/utils/NebulaKeyUtils.h" | ||
#include "rocksdb/sst_file_writer.h" | ||
#include "tools/db-upgrade/NebulaKeyUtilsV1.h" | ||
#include "tools/db-upgrade/NebulaKeyUtilsV2.h" | ||
#include "tools/db-upgrade/NebulaKeyUtilsV3.h" | ||
|
||
DEFINE_string(src_db_path, | ||
"", | ||
|
@@ -22,10 +24,11 @@ DEFINE_string(dst_db_path, | |
"multi paths should be split by comma"); | ||
DEFINE_string(upgrade_meta_server, "127.0.0.1:45500", "Meta servers' address."); | ||
DEFINE_uint32(write_batch_num, 100, "The size of the batch written to rocksdb"); | ||
DEFINE_uint32(upgrade_version, | ||
0, | ||
"When the value is 1, upgrade the data from 1.x to 2.0 GA. " | ||
"When the value is 2, upgrade the data from 2.0 RC to 2.0 GA."); | ||
DEFINE_string(upgrade_version, | ||
"", | ||
"When the value is 1:2, upgrade the data from 1.x to 2.0 GA. " | ||
"When the value is 2RC:2, upgrade the data from 2.0 RC to 2.0 GA." | ||
"When the value is 2:3, upgrade the data from 2.0 GA to 3.0 ."); | ||
DEFINE_bool(compactions, | ||
true, | ||
"When the upgrade of the space is completed, " | ||
|
@@ -83,7 +86,7 @@ Status UpgraderSpace::initSpace(const std::string& sId) { | |
|
||
// Use readonly rocksdb | ||
readEngine_.reset(new nebula::kvstore::RocksEngine( | ||
spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, true)); | ||
spaceId_, spaceVidLen_, srcPath_, "", nullptr, nullptr, false)); | ||
writeEngine_.reset(new nebula::kvstore::RocksEngine(spaceId_, spaceVidLen_, dstPath_)); | ||
|
||
parts_.clear(); | ||
|
@@ -882,6 +885,108 @@ std::string UpgraderSpace::encodeRowVal(const RowReader* reader, | |
return std::move(rowWrite).moveEncodedStr(); | ||
} | ||
|
||
void UpgraderSpace::runPartV3() { | ||
std::chrono::milliseconds take_dura{10}; | ||
if (auto pId = partQueue_.try_take_for(take_dura)) { | ||
PartitionID partId = *pId; | ||
// Handle vertex and edge, if there is an index, generate index data | ||
LOG(INFO) << "Start to handle vertex/edge/index data in space id " << spaceId_ << " part id " | ||
<< partId; | ||
auto prefix = NebulaKeyUtilsV3::partTagPrefix(partId); | ||
std::unique_ptr<kvstore::KVIterator> iter; | ||
auto retCode = readEngine_->prefix(prefix, &iter); | ||
if (retCode != nebula::cpp2::ErrorCode::SUCCEEDED) { | ||
LOG(ERROR) << "Space id " << spaceId_ << " part " << partId << " no found!"; | ||
LOG(ERROR) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " | ||
<< partId << " failed"; | ||
|
||
auto unFinishedPart = --unFinishedPart_; | ||
if (unFinishedPart == 0) { | ||
// all parts has finished | ||
LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " | ||
<< spaceId_ << " finished"; | ||
} else { | ||
pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); | ||
} | ||
return; | ||
} | ||
auto write_sst = [&, this](const std::vector<kvstore::KV>& data) { | ||
::rocksdb::Options option; | ||
option.create_if_missing = true; | ||
option.compression = ::rocksdb::CompressionType::kNoCompression; | ||
::rocksdb::SstFileWriter sst_file_writer(::rocksdb::EnvOptions(), option); | ||
std::string file = ::fmt::format( | ||
".nebula_upgrade.space-{}.part-{}.{}.sst", spaceId_, partId, std::time(nullptr)); | ||
::rocksdb::Status s = sst_file_writer.Open(file); | ||
if (!s.ok()) { | ||
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" | ||
<< s.code(); | ||
} | ||
for (auto item : data) { | ||
sst_file_writer.Put(item.first, item.second); | ||
SuperYoko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
s = sst_file_writer.Finish(); | ||
if (!s.ok()) { | ||
LOG(FATAL) << "Faild upgrade V3 of space " << spaceId_ << ", part " << partId << ":" | ||
<< s.code(); | ||
} | ||
std::lock_guard<std::mutex> lck(this->ingest_sst_file_mut_); | ||
ingest_sst_file_.push_back(file); | ||
}; | ||
std::vector<kvstore::KV> data; | ||
std::string lastVertexKey = ""; | ||
while (iter && iter->valid()) { | ||
auto vertex = NebulaKeyUtilsV3::getVertexKey(iter->key()); | ||
if (vertex == lastVertexKey) { | ||
continue; | ||
SuperYoko marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
data.emplace_back(vertex, ""); | ||
lastVertexKey = vertex; | ||
if (data.size() >= 100000) { | ||
write_sst(data); | ||
data.clear(); | ||
} | ||
} | ||
if (!data.empty()) { | ||
write_sst(data); | ||
data.clear(); | ||
} | ||
LOG(INFO) << "Handle vertex/edge/index data in space id " << spaceId_ << " part id " << partId | ||
<< " succeed"; | ||
|
||
auto unFinishedPart = --unFinishedPart_; | ||
if (unFinishedPart == 0) { | ||
// all parts has finished | ||
LOG(INFO) << "Handle last part: " << partId << " vertex/edge/index data in space id " | ||
<< spaceId_ << " finished."; | ||
} else { | ||
pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); | ||
} | ||
} else { | ||
LOG(INFO) << "Handle vertex/edge/index of parts data in space id " << spaceId_ << " finished"; | ||
} | ||
} | ||
void UpgraderSpace::doProcessV3() { | ||
LOG(INFO) << "Start to handle data in space id " << spaceId_; | ||
// Parallel process part | ||
auto partConcurrency = std::min(static_cast<size_t>(FLAGS_max_concurrent_parts), parts_.size()); | ||
LOG(INFO) << "Max concurrent parts: " << partConcurrency; | ||
unFinishedPart_ = parts_.size(); | ||
|
||
LOG(INFO) << "Start to handle vertex/edge/index of parts data in space id " << spaceId_; | ||
for (size_t i = 0; i < partConcurrency; ++i) { | ||
pool_->add(std::bind(&UpgraderSpace::runPartV3, this)); | ||
} | ||
|
||
while (unFinishedPart_ != 0) { | ||
sleep(10); | ||
} | ||
auto code = readEngine_->ingest(ingest_sst_file_, true); | ||
if (code != ::nebula::cpp2::ErrorCode::SUCCEEDED) { | ||
LOG(FATAL) << "Faild upgrade 2:3 when ingest sst file:" << static_cast<int>(code); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and if ingest failed, upgrader can do nothing except crash. |
||
} | ||
readEngine_->put(NebulaKeyUtilsV3::dataVersionKey(), NebulaKeyUtilsV3::dataVersionValue()); | ||
} | ||
std::vector<std::string> UpgraderSpace::indexVertexKeys( | ||
PartitionID partId, | ||
VertexID& vId, | ||
|
@@ -1094,10 +1199,14 @@ void DbUpgrader::doSpace() { | |
LOG(INFO) << "Upgrade from path " << upgraderSpaceIter->srcPath_ << " space id " | ||
<< upgraderSpaceIter->entry_ << " to path " << upgraderSpaceIter->dstPath_ | ||
<< " begin"; | ||
if (FLAGS_upgrade_version == 1) { | ||
if (FLAGS_upgrade_version == "1:2") { | ||
upgraderSpaceIter->doProcessV1(); | ||
} else { | ||
} else if (FLAGS_upgrade_version == "2RC:2") { | ||
upgraderSpaceIter->doProcessV2(); | ||
} else if (FLAGS_upgrade_version == "2:3") { | ||
upgraderSpaceIter->doProcessV3(); | ||
} else { | ||
LOG(FATAL) << "error upgrade version " << FLAGS_upgrade_version; | ||
} | ||
|
||
auto ret = upgraderSpaceIter->copyWal(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
|
||
#include "tools/db-upgrade/NebulaKeyUtilsV3.h" | ||
|
||
namespace nebula { | ||
std::string NebulaKeyUtilsV3::partTagPrefix(PartitionID partId) { | ||
PartitionID item = (partId << kPartitionOffset) | static_cast<uint32_t>(kTag_); | ||
std::string key; | ||
key.reserve(sizeof(PartitionID)); | ||
key.append(reinterpret_cast<const char*>(&item), sizeof(PartitionID)); | ||
return key; | ||
} | ||
std::string NebulaKeyUtilsV3::getVertexKey(folly::StringPiece tagKey) { | ||
std::string key = tagKey.toString(); | ||
key[3] = static_cast<uint32_t>(kVertex); | ||
key.resize(key.size() - sizeof(TagID)); | ||
return key; | ||
} | ||
std::string NebulaKeyUtilsV3::dataVersionKey() { return "\xFF\xFF\xFF\xFF"; } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You could move it to |
||
std::string NebulaKeyUtilsV3::dataVersionValue() { return "3.0"; } | ||
|
||
} // namespace nebula |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
#pragma once | ||
#include "common/utils/Types.h" | ||
namespace nebula { | ||
class NebulaKeyUtilsV3 { | ||
public: | ||
static std::string partTagPrefix(PartitionID partId); | ||
static std::string getVertexKey(folly::StringPiece tagKey); | ||
static std::string dataVersionKey(); | ||
static std::string dataVersionValue(); | ||
|
||
private: | ||
enum NebulaKeyTypeV3 : uint32_t { kTag_ = 0x00000001, kVertex = 0x00000007 }; | ||
}; | ||
|
||
} // namespace nebula |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This rocksdb is originally a read-only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because upgrade to 3.0 just need append new data by ingest sst file and write a
dataVersionKey
to identity the data encode version. So there is no need to write a new rocksdb, just upgrade in placeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confused about
readEngine_.reset
a writeable oneThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
read
means source andwrite
means destnation