Skip to content

Commit

Permalink
Introduce ShardArchiveHandler improvements:
Browse files Browse the repository at this point in the history
* Improve documentation
* Make the ShardArchiveHandler rather than the DatabaseShardImp perform
  LastLedgerHash verification for downloaded shards
* Remove ShardArchiveHandler's singleton implementation and make it an
  Application member
* Have the Application invoke ShardArchiveHandler initialization
  instead of clients
* Add RecoveryHandler as a ShardArchiveHandler derived class
* Improve commenting
  • Loading branch information
undertome committed May 22, 2020
1 parent debb1e3 commit 318d77d
Show file tree
Hide file tree
Showing 23 changed files with 939 additions and 887 deletions.
4 changes: 2 additions & 2 deletions Builds/CMake/RippledCore.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/impl/EncodedBlob.cpp
src/ripple/nodestore/impl/ManagerImp.cpp
src/ripple/nodestore/impl/NodeObject.cpp
src/ripple/nodestore/impl/RetryFinalize.cpp
src/ripple/nodestore/impl/Shard.cpp
src/ripple/nodestore/impl/TaskQueue.cpp
#[===============================[
Expand Down Expand Up @@ -623,6 +622,7 @@ target_sources (rippled PRIVATE
src/ripple/rpc/impl/Role.cpp
src/ripple/rpc/impl/ServerHandlerImp.cpp
src/ripple/rpc/impl/ShardArchiveHandler.cpp
src/ripple/rpc/impl/ShardVerificationScheduler.cpp
src/ripple/rpc/impl/Status.cpp
src/ripple/rpc/impl/TransactionSign.cpp

Expand Down Expand Up @@ -837,7 +837,7 @@ target_sources (rippled PRIVATE
test sources:
subdir: net
#]===============================]
src/test/net/SSLHTTPDownloader_test.cpp
src/test/net/DatabaseDownloader_test.cpp
#[===============================[
test sources:
subdir: nodestore
Expand Down
87 changes: 66 additions & 21 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
detail::AppFamily family_;
std::unique_ptr<NodeStore::DatabaseShard> shardStore_;
std::unique_ptr<detail::AppFamily> shardFamily_;
std::unique_ptr<RPC::ShardArchiveHandler> shardArchiveHandler_;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr<PathRequests> m_pathRequests;
Expand Down Expand Up @@ -786,6 +787,64 @@ class ApplicationImp : public Application, public RootStoppable, public BasicApp
return shardStore_.get();
}

RPC::ShardArchiveHandler*
getShardArchiveHandler(bool tryRecovery) override
{
static std::mutex handlerMutex;
std::lock_guard lock(handlerMutex);

// After constructing the handler, try to
// initialize it. Log on error; set the
// member variable on success.
auto initAndSet =
[this](std::unique_ptr<RPC::ShardArchiveHandler>&& handler) {
if (!handler)
return false;

if (!handler->init())
{
JLOG(m_journal.error())
<< "Failed to initialize ShardArchiveHandler.";

return false;
}

shardArchiveHandler_ = std::move(handler);
return true;
};

// Need to resume based on state from a previous
// run.
if (tryRecovery)
{
if (shardArchiveHandler_ != nullptr)
{
JLOG(m_journal.error())
<< "ShardArchiveHandler already created at startup.";

return nullptr;
}

auto handler = RPC::ShardArchiveHandler::tryMakeRecoveryHandler(
*this, *m_jobQueue);

if (!initAndSet(std::move(handler)))
return nullptr;
}

// Construct the ShardArchiveHandler
if (shardArchiveHandler_ == nullptr)
{
auto handler = RPC::ShardArchiveHandler::makeShardArchiveHandler(
*this, *m_jobQueue);

if (!initAndSet(std::move(handler)))
return nullptr;
}

return shardArchiveHandler_.get();
}

Application::MutexType&
getMasterMutex() override
{
Expand Down Expand Up @@ -1714,30 +1773,16 @@ ApplicationImp::setup()

if (shardStore_)
{
using namespace boost::filesystem;

auto stateDb(
RPC::ShardArchiveHandler::getDownloadDirectory(*config_) /
stateDBName);

try
{
if (exists(stateDb) && is_regular_file(stateDb) &&
!RPC::ShardArchiveHandler::hasInstance())
{
auto handler = RPC::ShardArchiveHandler::recoverInstance(
*this, *m_jobQueue);

assert(handler);

if (!handler->initFromDB())
{
JLOG(m_journal.fatal())
<< "Failed to initialize ShardArchiveHandler.";

return false;
}
// Create a ShardArchiveHandler if recovery
// is needed (there's a state database left
// over from a previous run).
auto handler = getShardArchiveHandler(true);

// Recovery is needed.
if (handler)
{
if (!handler->start())
{
JLOG(m_journal.fatal())
Expand Down
5 changes: 5 additions & 0 deletions src/ripple/app/main/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class DatabaseShard;
namespace perf {
class PerfLog;
}
namespace RPC {
class ShardArchiveHandler;
}

// VFALCO TODO Fix forward declares required for header dependency loops
class AmendmentTable;
Expand Down Expand Up @@ -185,6 +188,8 @@ class Application : public beast::PropertyStream::Source
getNodeStore() = 0;
virtual NodeStore::DatabaseShard*
getShardStore() = 0;
virtual RPC::ShardArchiveHandler*
getShardArchiveHandler(bool tryRecovery = false) = 0;
virtual InboundLedgers&
getInboundLedgers() = 0;
virtual InboundTransactions&
Expand Down
24 changes: 15 additions & 9 deletions src/ripple/net/DatabaseBody.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@

namespace ripple {

// DatabaseBody needs to meet requirements
// from asio which is why some conventions
// used elsewhere in this code base are not
// followed.
struct DatabaseBody
{
// Algorithm for storing buffers when parsing.
Expand All @@ -53,15 +57,15 @@ class DatabaseBody::value_type
friend struct DatabaseBody;

// The cached file size
std::uint64_t file_size_ = 0;
std::uint64_t fileSize_ = 0;
boost::filesystem::path path_;
std::unique_ptr<DatabaseCon> conn_;
std::string batch_;
std::shared_ptr<boost::asio::io_service::strand> strand_;
std::mutex m_;
std::condition_variable c_;
uint64_t handler_count_ = 0;
uint64_t part_ = 0;
std::uint64_t handlerCount_ = 0;
std::uint64_t part_ = 0;
bool closing_ = false;

public:
Expand All @@ -75,14 +79,14 @@ class DatabaseBody::value_type
bool
is_open() const
{
return bool{conn_};
return static_cast<bool>(conn_);
}

/// Returns the size of the file if open
std::uint64_t
size() const
{
return file_size_;
return fileSize_;
}

/// Close the file if open
Expand All @@ -93,7 +97,9 @@ class DatabaseBody::value_type
@param path The utf-8 encoded path to the file
@param mode The file mode to use
@param config The configuration settings
@param io_service The asio context for running a strand.
@param ec Set to the error, if any occurred
*/
Expand All @@ -114,9 +120,9 @@ class DatabaseBody::reader
{
value_type& body_; // The body we are writing to

static const uint32_t FLUSH_SIZE = 50000000;
static const uint8_t MAX_HANDLERS = 3;
static const uint16_t MAX_ROW_SIZE_PAD = 500;
static constexpr std::uint32_t FLUSH_SIZE = 50000000;
static constexpr std::uint8_t MAX_HANDLERS = 3;
static constexpr std::uint16_t MAX_ROW_SIZE_PAD = 500;

public:
// Constructor.
Expand Down
5 changes: 3 additions & 2 deletions src/ripple/net/DatabaseDownloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class DatabaseDownloader : public SSLHTTPDownloader
Config const& config);

private:
static const uint8_t MAX_PATH_LEN = std::numeric_limits<uint8_t>::max();
static const std::uint8_t MAX_PATH_LEN =
std::numeric_limits<std::uint8_t>::max();

std::shared_ptr<parser>
getParser(
Expand All @@ -48,7 +49,7 @@ class DatabaseDownloader : public SSLHTTPDownloader
void
closeBody(std::shared_ptr<parser> p) override;

uint64_t
std::uint64_t
size(std::shared_ptr<parser> p) override;

Config const& config_;
Expand Down
15 changes: 8 additions & 7 deletions src/ripple/net/SSLHTTPDownloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,15 @@ namespace ripple {

/** Provides an asynchronous HTTPS file downloader
*/
class SSLHTTPDownloader : public std::enable_shared_from_this<SSLHTTPDownloader>
class SSLHTTPDownloader
{
public:
using error_code = boost::system::error_code;

SSLHTTPDownloader(
boost::asio::io_service& io_service,
beast::Journal j,
Config const& config,
bool isPaused = false);
Config const& config);

bool
download(
Expand All @@ -71,23 +70,25 @@ class SSLHTTPDownloader : public std::enable_shared_from_this<SSLHTTPDownloader>

beast::Journal const j_;

bool
void
fail(
boost::filesystem::path dstPath,
std::function<void(boost::filesystem::path)> const& complete,
boost::system::error_code const& ec,
std::string const& errMsg,
std::shared_ptr<parser> parser = nullptr);
std::shared_ptr<parser> parser);

private:
HTTPClientSSLContext ssl_ctx_;
boost::asio::io_service::strand strand_;
boost::optional<boost::asio::ssl::stream<boost::asio::ip::tcp::socket>>
stream_;
boost::beast::flat_buffer read_buf_;
std::atomic<bool> isStopped_;
bool sessionActive_;
std::atomic<bool> cancelDownloads_;

// Used to protect sessionActive_
std::mutex m_;
bool sessionActive_;
std::condition_variable c_;

void
Expand Down
Loading

0 comments on commit 318d77d

Please sign in to comment.