Skip to content

Commit

Permalink
datastore: kvdb schema and DHII entities
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Dec 12, 2024
1 parent fa7a2e0 commit 93b359e
Show file tree
Hide file tree
Showing 35 changed files with 602 additions and 132 deletions.
4 changes: 2 additions & 2 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ int execute_blocks(SilkwormHandle handle, ExecuteBlocksSettings settings, const

// Execute blocks
if (settings.use_internal_txn) {
return execute_with_internal_txn(handle, settings, data_store.chaindata_rw());
return execute_with_internal_txn(handle, settings, data_store.chaindata().access_rw());
}
return execute_with_external_txn(handle, settings, data_store.chaindata_rw().start_rw_tx());
return execute_with_external_txn(handle, settings, data_store.chaindata().access_rw().start_rw_tx());
}

int build_indexes(SilkwormHandle handle, const BuildIndexesSettings& settings, const DataDirectory& data_dir) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/dev/check_changes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ int main(int argc, char* argv[]) {
data_dir.snapshots().path(),
};

auto txn = data_store.chaindata_rw().start_rw_tx();
auto txn = data_store.chaindata().access_rw().start_rw_tx();
auto chain_config{db::read_chain_config(txn)};
if (!chain_config) {
throw std::runtime_error("Unable to retrieve chain config");
Expand Down
4 changes: 2 additions & 2 deletions cmd/dev/db_toolbox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1460,10 +1460,10 @@ void do_freeze(EnvConfig& config, const DataDirectory& data_dir, bool keep_block
config,
data_dir.snapshots().path(),
};
StageSchedulerAdapter stage_scheduler{data_store.chaindata_rw()};
StageSchedulerAdapter stage_scheduler{data_store.chaindata().access_rw()};

Freezer freezer{
data_store.chaindata(),
data_store.chaindata().access_ro(),
data_store.ref().blocks_repository,
stage_scheduler,
data_dir.temp().path(),
Expand Down
52 changes: 22 additions & 30 deletions cmd/dev/staged_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,18 +281,15 @@ void debug_unwind(datastore::kvdb::EnvConfig& config, BlockNum height, uint32_t
Environment::set_start_at_stage(start_at_stage);
Environment::set_stop_before_stage(stop_before_stage);

auto env = silkworm::datastore::kvdb::open_env(config);
auto data_directory = std::make_unique<DataDirectory>(datadir_path);
db::DataStore data_store{config, data_directory->snapshots().path()};

datastore::kvdb::ROTxnManaged ro_txn{env};
datastore::kvdb::ROTxnManaged ro_txn = data_store.chaindata().access_ro().start_ro_tx();
const auto chain_config = db::read_chain_config(ro_txn);
ensure(chain_config.has_value(), "Uninitialized Silkworm db or unknown/custom chain");
ro_txn.abort();

auto data_directory = std::make_unique<DataDirectory>(datadir_path);
auto blocks_repository = db::blocks::make_blocks_repository(data_directory->snapshots().path(), /*open=*/false);
auto state_repository = db::state::make_state_repository(data_directory->snapshots().path(), /*open=*/false);
db::DataStoreRef data_store{datastore::kvdb::RWAccess{env}, blocks_repository, state_repository};
db::DataModelFactory data_model_factory{data_store};
db::DataModelFactory data_model_factory{data_store.ref()};

// We need full snapshot sync to take place to have database tables properly updated
snapshots::SnapshotSettings snapshot_settings{
Expand All @@ -307,7 +304,7 @@ void debug_unwind(datastore::kvdb::EnvConfig& config, BlockNum height, uint32_t
db::SnapshotSync snapshot_sync{
std::move(snapshot_settings),
chain_config->chain_id,
data_store,
data_store.ref(),
std::filesystem::path{},
empty_scheduler};

Expand All @@ -324,7 +321,7 @@ void debug_unwind(datastore::kvdb::EnvConfig& config, BlockNum height, uint32_t
snap_sync_future.get();

// Commit is enabled by default in RWTxn(Managed), so we need to check here
RWTxnManaged txn{env};
RWTxnManaged txn = data_store.chaindata().access_rw().start_rw_tx();
if (dry) {
txn.disable_commit();
} else {
Expand Down Expand Up @@ -373,6 +370,7 @@ void debug_unwind(datastore::kvdb::EnvConfig& config, BlockNum height, uint32_t

// Unwind has just set progress for pre-Execution stages back to unwind_point even if it is within the snapshots
// We need to reset progress for such stages to the max block in snapshots to avoid database update on next start
auto& blocks_repository = data_store.blocks_repository();
db::stages::write_stage_progress(txn, db::stages::kHeadersKey, blocks_repository.max_block_available());
db::stages::write_stage_progress(txn, db::stages::kBlockBodiesKey, blocks_repository.max_block_available());
db::stages::write_stage_progress(txn, db::stages::kBlockHashesKey, blocks_repository.max_block_available());
Expand All @@ -386,8 +384,10 @@ void unwind(datastore::kvdb::EnvConfig& config, BlockNum unwind_point, const boo

config.readonly = false;

auto env{silkworm::datastore::kvdb::open_env(config)};
RWTxnManaged txn{env};
auto data_directory = std::make_unique<DataDirectory>();
db::DataStore data_store{config, data_directory->snapshots().path()};

RWTxnManaged txn = data_store.chaindata().access_rw().start_rw_tx();

// Commit is enabled by default in RWTxn(Managed), so we need to check here
if (dry) {
Expand All @@ -402,11 +402,7 @@ void unwind(datastore::kvdb::EnvConfig& config, BlockNum unwind_point, const boo
const auto chain_config = db::read_chain_config(txn);
ensure(chain_config.has_value(), "Not an initialized Silkworm db or unknown/custom chain");

auto data_directory = std::make_unique<DataDirectory>();
auto blocks_repository = db::blocks::make_blocks_repository(data_directory->snapshots().path(), /*open=*/true);
auto state_repository = db::state::make_state_repository(data_directory->path(), /*open=*/true);
db::DataStoreRef data_store{datastore::kvdb::RWAccess{env}, blocks_repository, state_repository};
db::DataModelFactory data_model_factory{data_store};
db::DataModelFactory data_model_factory{data_store.ref()};

boost::asio::io_context io_context;

Expand Down Expand Up @@ -491,8 +487,10 @@ void forward(datastore::kvdb::EnvConfig& config, BlockNum forward_point, const b
Environment::set_start_at_stage(start_at_stage);
Environment::set_stop_before_stage(stop_before_stage);

auto env = silkworm::datastore::kvdb::open_env(config);
RWTxnManaged txn{env};
auto data_directory = std::make_unique<DataDirectory>();
db::DataStore data_store{config, data_directory->snapshots().path()};

RWTxnManaged txn = data_store.chaindata().access_rw().start_rw_tx();

// Commit is enabled by default in RWTxn(Managed), so we need to check here
if (dry) {
Expand All @@ -510,11 +508,7 @@ void forward(datastore::kvdb::EnvConfig& config, BlockNum forward_point, const b
const auto datadir_path = std::filesystem::path{config.path}.parent_path();
SILK_INFO << "Forward: datadir=" << datadir_path.string();

auto data_directory = std::make_unique<DataDirectory>();
auto blocks_repository = db::blocks::make_blocks_repository(data_directory->snapshots().path(), /*open=*/true);
auto state_repository = db::state::make_state_repository(data_directory->path(), /*open=*/true);
db::DataStoreRef data_store{datastore::kvdb::RWAccess{env}, blocks_repository, state_repository};
db::DataModelFactory data_model_factory{data_store};
db::DataModelFactory data_model_factory{data_store.ref()};

boost::asio::io_context io_context;

Expand Down Expand Up @@ -561,8 +555,10 @@ void bisect_pipeline(datastore::kvdb::EnvConfig& config, BlockNum start, BlockNu
Environment::set_start_at_stage(start_at_stage);
Environment::set_stop_before_stage(stop_before_stage);

auto env = silkworm::datastore::kvdb::open_env(config);
RWTxnManaged txn{env};
auto data_directory = std::make_unique<DataDirectory>();
db::DataStore data_store{config, data_directory->snapshots().path()};

RWTxnManaged txn = data_store.chaindata().access_rw().start_rw_tx();

// Commit is enabled by default in RWTxn(Managed), so we need to check here
if (dry) {
Expand All @@ -580,11 +576,7 @@ void bisect_pipeline(datastore::kvdb::EnvConfig& config, BlockNum start, BlockNu
const auto datadir_path = std::filesystem::path{config.path}.parent_path();
SILK_INFO << "Bisect: datadir=" << datadir_path.string();

auto data_directory = std::make_unique<DataDirectory>();
auto blocks_repository = db::blocks::make_blocks_repository(data_directory->snapshots().path(), /*open=*/true);
auto state_repository = db::state::make_state_repository(data_directory->path(), /*open=*/true);
db::DataStoreRef data_store{datastore::kvdb::RWAccess{env}, blocks_repository, state_repository};
db::DataModelFactory data_model_factory{data_store};
db::DataModelFactory data_model_factory{data_store.ref()};

boost::asio::io_context io_context;

Expand Down
10 changes: 6 additions & 4 deletions silkworm/capi/fork_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ SILKWORM_EXPORT int silkworm_start_fork_validator(SilkwormHandle handle, MDBX_en
SILK_INFO << "Starting fork validator";
set_node_settings(handle, *settings, mdbx_env);

silkworm::datastore::kvdb::EnvUnmanaged unmanaged_env{mdbx_env};
silkworm::datastore::kvdb::RWAccess rw_access{unmanaged_env};
handle->chaindata = std::make_unique<silkworm::datastore::kvdb::DatabaseUnmanaged>(
silkworm::db::DataStore::make_chaindata_database(silkworm::datastore::kvdb::EnvUnmanaged{mdbx_env}));
auto& chaindata = *handle->chaindata;

silkworm::db::DataStoreRef data_store{
rw_access,
chaindata.ref(),
*handle->blocks_repository,
*handle->state_repository,
};
Expand All @@ -139,7 +141,7 @@ SILKWORM_EXPORT int silkworm_start_fork_validator(SilkwormHandle handle, MDBX_en
data_model_factory,
/* log_timer_factory = */ std::nullopt,
make_stages_factory(handle->node_settings, data_model_factory),
rw_access);
chaindata.access_rw());

SILK_DEBUG << "Execution engine created";

Expand Down
5 changes: 5 additions & 0 deletions silkworm/capi/instance.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
#include <silkworm/node/stagedsync/execution_engine.hpp>
#include <silkworm/rpc/daemon.hpp>

namespace silkworm::datastore::kvdb {
class DatabaseUnmanaged;
} // namespace silkworm::datastore::kvdb

struct SilkwormInstance {
silkworm::log::Settings log_settings;
silkworm::concurrency::ContextPoolSettings context_pool_settings;
std::filesystem::path data_dir_path;
silkworm::NodeSettings node_settings;
std::unique_ptr<silkworm::datastore::kvdb::DatabaseUnmanaged> chaindata;
std::unique_ptr<silkworm::snapshots::SnapshotRepository> blocks_repository;
std::unique_ptr<silkworm::snapshots::SnapshotRepository> state_repository;
std::unique_ptr<silkworm::rpc::Daemon> rpcdaemon;
Expand Down
5 changes: 4 additions & 1 deletion silkworm/capi/rpcdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,11 @@ SILKWORM_EXPORT int silkworm_start_rpcdaemon(SilkwormHandle handle, MDBX_env* en
}

auto daemon_settings = make_daemon_settings(handle, *settings);
handle->chaindata = std::make_unique<datastore::kvdb::DatabaseUnmanaged>(
db::DataStore::make_chaindata_database(datastore::kvdb::EnvUnmanaged{env}));

db::DataStoreRef data_store{
datastore::kvdb::RWAccess{datastore::kvdb::EnvUnmanaged{env}},
handle->chaindata->ref(),
*handle->blocks_repository,
*handle->state_repository,
};
Expand Down
12 changes: 8 additions & 4 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,13 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,
try {
// Wrap MDBX env into an internal *unmanaged* env, i.e. MDBX env is only used but its lifecycle is untouched
datastore::kvdb::EnvUnmanaged unmanaged_env{mdbx_env};
datastore::kvdb::RWAccess rw_access{unmanaged_env};
auto txn = rw_access.start_rw_tx();
const auto env_path = unmanaged_env.get_path();
handle->chaindata = std::make_unique<datastore::kvdb::DatabaseUnmanaged>(
db::DataStore::make_chaindata_database(std::move(unmanaged_env)));
auto& chaindata = *handle->chaindata;

datastore::kvdb::RWAccess rw_access = chaindata.access_rw();
auto txn = rw_access.start_rw_tx();

db::Buffer state_buffer{txn, std::make_unique<db::BufferFullDataModel>(db::DataModel{txn, *handle->blocks_repository})};
state_buffer.set_memory_limit(batch_size);
Expand All @@ -627,15 +631,15 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,
[[maybe_unused]] auto _ = gsl::finally([&block_buffer] { block_buffer.terminate_and_release_all(); });

db::DataStoreRef data_store{
rw_access,
chaindata.ref(),
*handle->blocks_repository,
*handle->state_repository,
};
db::DataModelFactory data_model_factory{std::move(data_store)};

BlockProvider block_provider{
&block_buffer,
datastore::kvdb::ROAccess{unmanaged_env},
chaindata.access_ro(),
std::move(data_model_factory),
start_block,
max_block,
Expand Down
29 changes: 29 additions & 0 deletions silkworm/db/data_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,44 @@
namespace silkworm::db {

datastore::Schema DataStore::make_schema() {
datastore::kvdb::Schema kvdb;
kvdb.default_database() = make_chaindata_database_schema();

snapshots::Schema snapshots;
snapshots.repository(blocks::kBlocksRepositoryName) = blocks::make_blocks_repository_schema();
snapshots.repository(state::kStateRepositoryName) = state::make_state_repository_schema();

return {
std::move(kvdb),
std::move(snapshots),
};
}

datastore::kvdb::Schema::DatabaseDef DataStore::make_chaindata_database_schema() {
return state::make_state_database_schema();
}

datastore::kvdb::Database DataStore::make_chaindata_database(mdbx::env_managed chaindata_env) {
return {
std::move(chaindata_env),
make_chaindata_database_schema(),
};
}

datastore::kvdb::DatabaseUnmanaged DataStore::make_chaindata_database(datastore::kvdb::EnvUnmanaged chaindata_env) {
return {
std::move(chaindata_env),
make_chaindata_database_schema(),
};
}

std::map<datastore::EntityName, std::unique_ptr<datastore::kvdb::Database>> DataStore::make_databases_map(
datastore::kvdb::Database chaindata_database) {
std::map<datastore::EntityName, std::unique_ptr<datastore::kvdb::Database>> databases;
databases.emplace(datastore::kvdb::Schema::kDefaultEntityName, std::make_unique<datastore::kvdb::Database>(std::move(chaindata_database)));
return databases;
}

std::map<datastore::EntityName, std::unique_ptr<snapshots::SnapshotRepository>> DataStore::make_repositories_map(
snapshots::SnapshotRepository blocks_repository,
snapshots::SnapshotRepository state_repository) {
Expand Down
48 changes: 33 additions & 15 deletions silkworm/db/data_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,69 @@
namespace silkworm::db {

struct DataStoreRef {
datastore::kvdb::RWAccess chaindata;
datastore::kvdb::DatabaseRef chaindata;
state::StateDatabaseRef state_db() const { return {chaindata}; }
snapshots::SnapshotRepository& blocks_repository;
snapshots::SnapshotRepository& state_repository;
};

class DataStore {
public:
explicit DataStore(datastore::DataStore store) : store_{std::move(store)} {}
DataStore(
mdbx::env_managed chaindata_env,
datastore::kvdb::Database chaindata_database,
snapshots::SnapshotRepository blocks_repository,
snapshots::SnapshotRepository state_repository)
: store_{
make_schema(),
std::move(chaindata_env),
make_databases_map(std::move(chaindata_database)),
make_repositories_map(std::move(blocks_repository), std::move(state_repository)),
} {}

public:
explicit DataStore(datastore::DataStore store) : store_{std::move(store)} {}

DataStore(
const datastore::kvdb::EnvConfig& chaindata_env_config,
mdbx::env_managed chaindata_env,
const std::filesystem::path& repository_path)
: DataStore{
datastore::kvdb::open_env(chaindata_env_config),
make_chaindata_database(std::move(chaindata_env)),
blocks::make_blocks_repository(repository_path),
state::make_state_repository(repository_path),
} {}

void close() {
store_.close();
}
DataStore(
const datastore::kvdb::EnvConfig& chaindata_env_config,
const std::filesystem::path& repository_path)
: DataStore{
datastore::kvdb::open_env(chaindata_env_config),
repository_path,
} {}

DataStoreRef ref() const {
return {
store_.chaindata_rw(),
store_.repository(blocks::kBlocksRepositoryName),
store_.repository(state::kStateRepositoryName),
chaindata().ref(),
blocks_repository(),
state_repository(),
};
}

datastore::kvdb::ROAccess chaindata() const { return store_.chaindata(); }
datastore::kvdb::RWAccess chaindata_rw() const { return store_.chaindata_rw(); }
datastore::kvdb::Database& chaindata() const { return store_.default_database(); }

snapshots::SnapshotRepository& blocks_repository() const {
return store_.repository(blocks::kBlocksRepositoryName);
}
snapshots::SnapshotRepository& state_repository() const {
return store_.repository(state::kStateRepositoryName);
}

static datastore::kvdb::Schema::DatabaseDef make_chaindata_database_schema();
static datastore::kvdb::Database make_chaindata_database(mdbx::env_managed chaindata_env);
static datastore::kvdb::DatabaseUnmanaged make_chaindata_database(datastore::kvdb::EnvUnmanaged chaindata_env);

private:
static datastore::Schema make_schema();

static std::map<datastore::EntityName, std::unique_ptr<datastore::kvdb::Database>> make_databases_map(
datastore::kvdb::Database chaindata_database);
static std::map<datastore::EntityName, std::unique_ptr<snapshots::SnapshotRepository>> make_repositories_map(
snapshots::SnapshotRepository blocks_repository,
snapshots::SnapshotRepository state_repository);
Expand Down
Loading

0 comments on commit 93b359e

Please sign in to comment.