Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/shard validation #3297

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ target_sources (rippled PRIVATE
src/ripple/rpc/impl/Role.cpp
src/ripple/rpc/impl/ServerHandlerImp.cpp
src/ripple/rpc/impl/ShardArchiveHandler.cpp
src/ripple/rpc/impl/ShardVerificationScheduler.cpp
src/ripple/rpc/impl/Status.cpp
src/ripple/rpc/impl/TransactionSign.cpp

Expand Down Expand Up @@ -836,7 +837,7 @@ target_sources (rippled PRIVATE
test sources:
subdir: net
#]===============================]
src/test/net/SSLHTTPDownloader_test.cpp
src/test/net/DatabaseDownloader_test.cpp
#[===============================[
test sources:
subdir: nodestore
Expand Down
87 changes: 66 additions & 21 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
detail::AppFamily family_;
std::unique_ptr<NodeStore::DatabaseShard> shardStore_;
std::unique_ptr<detail::AppFamily> shardFamily_;
std::unique_ptr<RPC::ShardArchiveHandler> shardArchiveHandler_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr<PathRequests> m_pathRequests;
Expand Down Expand Up @@ -786,6 +787,64 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
return shardStore_.get();
}

RPC::ShardArchiveHandler*
getShardArchiveHandler(bool tryRecovery) override
{
static std::mutex handlerMutex;
std::lock_guard lock(handlerMutex);

// After constructing the handler, try to
// initialize it. Log on error; set the
// member variable on success.
auto initAndSet =
[this](std::unique_ptr<RPC::ShardArchiveHandler>&& handler) {
if (!handler)
return false;

if (!handler->init())
{
JLOG(m_journal.error())
<< "Failed to initialize ShardArchiveHandler.";

return false;
}

shardArchiveHandler_ = std::move(handler);
return true;
};

// Need to resume based on state from a previous
// run.
if (tryRecovery)
{
if (shardArchiveHandler_ != nullptr)
{
JLOG(m_journal.error())
<< "ShardArchiveHandler already created at startup.";

return nullptr;
}

auto handler = RPC::ShardArchiveHandler::tryMakeRecoveryHandler(
*this, *m_jobQueue);

if (!initAndSet(std::move(handler)))
return nullptr;
}

// Construct the ShardArchiveHandler
if (shardArchiveHandler_ == nullptr)
{
auto handler = RPC::ShardArchiveHandler::makeShardArchiveHandler(
*this, *m_jobQueue);

if (!initAndSet(std::move(handler)))
return nullptr;
}

return shardArchiveHandler_.get();
}
undertome marked this conversation as resolved.
Show resolved Hide resolved

Application::MutexType&
getMasterMutex() override
{
Expand Down Expand Up @@ -1714,30 +1773,16 @@ ApplicationImp::setup()

if (shardStore_)
{
using namespace boost::filesystem;

auto stateDb(
RPC::ShardArchiveHandler::getDownloadDirectory(*config_) /
stateDBName);

try
{
if (exists(stateDb) && is_regular_file(stateDb) &&
!RPC::ShardArchiveHandler::hasInstance())
{
auto handler = RPC::ShardArchiveHandler::recoverInstance(
*this, *m_jobQueue);

assert(handler);

if (!handler->initFromDB())
{
JLOG(m_journal.fatal())
<< "Failed to initialize ShardArchiveHandler.";

return false;
}
// Create a ShardArchiveHandler if recovery
// is needed (there's a state database left
// over from a previous run).
auto handler = getShardArchiveHandler(true);
undertome marked this conversation as resolved.
Show resolved Hide resolved

// Recovery is needed.
if (handler)
{
if (!handler->start())
{
JLOG(m_journal.fatal())
Expand Down
5 changes: 5 additions & 0 deletions src/ripple/app/main/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class DatabaseShard;
namespace perf {
class PerfLog;
}
namespace RPC {
class ShardArchiveHandler;
}

// VFALCO TODO Fix forward declares required for header dependency loops
class AmendmentTable;
Expand Down Expand Up @@ -185,6 +188,8 @@ class Application : public beast::PropertyStream::Source
getNodeStore() = 0;
virtual NodeStore::DatabaseShard*
getShardStore() = 0;
virtual RPC::ShardArchiveHandler*
getShardArchiveHandler(bool tryRecovery = false) = 0;
virtual InboundLedgers&
getInboundLedgers() = 0;
virtual InboundTransactions&
Expand Down
24 changes: 15 additions & 9 deletions src/ripple/net/DatabaseBody.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

namespace ripple {

// DatabaseBody needs to meet requirements
// from asio which is why some conventions
// used elsewhere in this code base are not
// followed.
undertome marked this conversation as resolved.
Show resolved Hide resolved
struct DatabaseBody
undertome marked this conversation as resolved.
Show resolved Hide resolved
{
// Algorithm for storing buffers when parsing.
Expand All @@ -53,15 +57,15 @@ class DatabaseBody::value_type
friend struct DatabaseBody;

// The cached file size
std::uint64_t file_size_ = 0;
std::uint64_t fileSize_ = 0;
boost::filesystem::path path_;
std::unique_ptr<DatabaseCon> conn_;
std::string batch_;
std::shared_ptr<boost::asio::io_service::strand> strand_;
std::mutex m_;
undertome marked this conversation as resolved.
Show resolved Hide resolved
std::condition_variable c_;
uint64_t handler_count_ = 0;
uint64_t part_ = 0;
std::uint64_t handlerCount_ = 0;
std::uint64_t part_ = 0;
bool closing_ = false;

public:
Expand All @@ -75,14 +79,14 @@ class DatabaseBody::value_type
bool
is_open() const
{
return bool{conn_};
return static_cast<bool>(conn_);
}

/// Returns the size of the file if open
std::uint64_t
size() const
{
return file_size_;
return fileSize_;
}

/// Close the file if open
Expand All @@ -93,7 +97,9 @@ class DatabaseBody::value_type

@param path The utf-8 encoded path to the file

@param mode The file mode to use
@param config The configuration settings

@param io_service The asio context for running a strand.

@param ec Set to the error, if any occurred
*/
undertome marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -114,9 +120,9 @@ class DatabaseBody::reader
{
value_type& body_; // The body we are writing to

static const uint32_t FLUSH_SIZE = 50000000;
static const uint8_t MAX_HANDLERS = 3;
static const uint16_t MAX_ROW_SIZE_PAD = 500;
static constexpr std::uint32_t FLUSH_SIZE = 50000000;
static constexpr std::uint8_t MAX_HANDLERS = 3;
static constexpr std::uint16_t MAX_ROW_SIZE_PAD = 500;

public:
// Constructor.
Expand Down
5 changes: 3 additions & 2 deletions src/ripple/net/DatabaseDownloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class DatabaseDownloader : public SSLHTTPDownloader
Config const& config);

private:
static const uint8_t MAX_PATH_LEN = std::numeric_limits<uint8_t>::max();
static const std::uint8_t MAX_PATH_LEN =
std::numeric_limits<std::uint8_t>::max();

std::shared_ptr<parser>
getParser(
Expand All @@ -48,7 +49,7 @@ class DatabaseDownloader : public SSLHTTPDownloader
void
closeBody(std::shared_ptr<parser> p) override;

uint64_t
std::uint64_t
size(std::shared_ptr<parser> p) override;

Config const& config_;
Expand Down
15 changes: 8 additions & 7 deletions src/ripple/net/SSLHTTPDownloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ namespace ripple {

/** Provides an asynchronous HTTPS file downloader
*/
class SSLHTTPDownloader : public std::enable_shared_from_this<SSLHTTPDownloader>
class SSLHTTPDownloader
{
public:
using error_code = boost::system::error_code;

SSLHTTPDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config,
bool isPaused = false);
Config const& config);

bool
download(
Expand All @@ -71,23 +70,25 @@ class SSLHTTPDownloader : public std::enable_shared_from_this<SSLHTTPDownloader>

beast::Journal const j_;

bool
void
fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg,
std::shared_ptr<parser> parser = nullptr);
std::shared_ptr<parser> parser);

private:
HTTPClientSSLContext ssl_ctx_;
boost::asio::io_service::strand strand_;
boost::optional<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>
stream_;
boost::beast::flat_buffer read_buf_;
std::atomic<bool> isStopped_;
bool sessionActive_;
std::atomic<bool> cancelDownloads_;

// Used to protect sessionActive_
undertome marked this conversation as resolved.
Show resolved Hide resolved
std::mutex m_;
bool sessionActive_;
std::condition_variable c_;
undertome marked this conversation as resolved.
Show resolved Hide resolved

void
Expand Down
Loading