Skip to content

Commit

Permalink
Saved WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
maxtropets committed Jun 21, 2024
1 parent dcb1329 commit ba609ba
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 6 deletions.
2 changes: 1 addition & 1 deletion doc/build_apps/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ Historical Queries

.. doxygenclass:: ccf::historical::AbstractStateCache
:project: CCF
:members: set_default_expiry_duration, get_state_at, get_store_at, get_store_range, drop_cached_states
:members: set_default_expiry_duration, set_soft_cache_limit, get_state_at, get_store_at, get_store_range, drop_cached_states

.. doxygenstruct:: ccf::historical::State
:project: CCF
Expand Down
6 changes: 6 additions & 0 deletions include/ccf/historical_queries_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ namespace ccf::historical

using ExpiryDuration = std::chrono::seconds;

using CacheSize = size_t;

/** Stores the progress of historical query requests.
*
* A request will generally need to be made multiple times (with the same
Expand Down Expand Up @@ -79,6 +81,10 @@ namespace ccf::historical
virtual void set_default_expiry_duration(
ExpiryDuration seconds_until_expiry) = 0;

/** TODO
*/
virtual void set_soft_cache_limit(CacheSize seconds_until_expiry) = 0;

/** EXPERIMENTAL: Set the tracking of deletes on missing keys for historical
* queries.
*
Expand Down
161 changes: 156 additions & 5 deletions src/node/historical_queries.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ namespace ccf::historical
enum class RequestStage
{
Fetching,
Untrusted,
Trusted,
};

Expand Down Expand Up @@ -176,7 +175,7 @@ namespace ccf::historical

using WeakStoreDetailsPtr = std::weak_ptr<StoreDetails>;
using AllRequestedStores = std::map<ccf::SeqNo, WeakStoreDetailsPtr>;

using StoreSizes = std::unordered_map<ccf::SeqNo, size_t>;
struct VersionedSecret
{
ccf::SeqNo valid_from = {};
Expand Down Expand Up @@ -234,11 +233,13 @@ namespace ccf::historical
return {};
}

void adjust_ranges(
std::pair<std::vector<SeqNo>, std::vector<SeqNo>> adjust_ranges(
const SeqNoCollection& new_seqnos,
bool should_include_receipts,
SeqNo earliest_ledger_secret_seqno)
{
std::vector<SeqNo> removed{}, probably_added{};

bool any_diff = false;

// If a seqno is earlier than the earliest known ledger secret, we will
Expand Down Expand Up @@ -266,6 +267,7 @@ namespace ccf::historical
{
// No longer looking for a seqno which was previously requested.
// Remove it from my_stores
removed.push_back(prev_it->first);
prev_it = my_stores.erase(prev_it);
any_diff |= true;
}
Expand All @@ -279,6 +281,7 @@ namespace ccf::historical
{
// If this is too early for known secrets, just record that it
// was requested but don't add it to all_stores yet
probably_added.push_back(*new_it);
prev_it = my_stores.insert_or_assign(prev_it, *new_it, nullptr);
any_too_early = true;
}
Expand All @@ -293,6 +296,7 @@ namespace ccf::historical
details = std::make_shared<StoreDetails>();
all_stores.insert_or_assign(all_it, *new_it, details);
}
probably_added.push_back(*new_it);
prev_it = my_stores.insert_or_assign(prev_it, *new_it, details);
}
any_diff |= true;
Expand All @@ -311,7 +315,7 @@ namespace ccf::historical
if (!any_diff && (should_include_receipts == include_receipts))
{
HISTORICAL_LOG("Identical to previous request");
return;
return {removed, probably_added};
}

include_receipts = should_include_receipts;
Expand All @@ -331,6 +335,7 @@ namespace ccf::historical
populate_receipts(seqno);
}
}
return {removed, probably_added};
}

void populate_receipts(ccf::SeqNo new_seqno)
Expand Down Expand Up @@ -493,6 +498,102 @@ namespace ccf::historical

ExpiryDuration default_expiry_duration = std::chrono::seconds(1800);

// Needs explanation?..
// Also mention we can't use that for comp as ref because it changes when
// deserialisation comes.
StoreSizes raw_sizes{};
std::vector<CompoundHandle> stupid;
std::unordered_map<SeqNo, std::set<CompoundHandle>> parents;
CacheSize remembered_size{0};
CacheSize soft_limit{0};

void add_ref(SeqNo seq, CompoundHandle handle)
{
auto it = parents.find(seq);
if (it == parents.end())
{
parents.insert({seq, {handle}});
auto size = raw_sizes.find(seq);
if (size != raw_sizes.end())
{
remembered_size += size->second;
}
}
else
{
it->second.insert(handle);
}
}

void add_refs(CompoundHandle handle)
{
for (const auto& [seq, _] : requests.at(handle).my_stores)
{
add_ref(seq, handle);
}
}

void remove_ref(SeqNo seq, CompoundHandle handle)
{
auto it = parents.find(seq);
assert(it != parents.end());

it->second.erase(handle);
if (it->second.empty())
{
parents.erase(it);
auto size = raw_sizes.find(seq);
if (size != raw_sizes.end())
{
remembered_size -= size->second;
raw_sizes.erase(size);
}
}
}

void remove_refs(CompoundHandle handle)
{
for (const auto& [seq, _] : requests.at(handle).my_stores)
{
remove_ref(seq, handle);
}
}

void use(CompoundHandle handle)
{
auto it = std::find(stupid.begin(), stupid.end(), handle);
if (it == stupid.end()) // New on top
{
stupid.insert(stupid.begin(), handle);
add_refs(handle);
}
else
{ // Topify
stupid.erase(it);
stupid.insert(stupid.begin(), handle);
}
}

void evict(CompoundHandle handle)
{
auto it = std::find(stupid.begin(), stupid.end(), handle);
if (it != stupid.end())
{
remove_refs(handle);
stupid.erase(it);
}
}

void update_store_raw_size(SeqNo seq, size_t new_size)
{
auto& stored_size = raw_sizes[seq];
assert(!stored_size || stored_size == new_size);

remembered_size -= stored_size;
remembered_size += new_size;
stored_size = new_size;
}

void fetch_entry_at(ccf::SeqNo seqno)
{
fetch_entries_range(seqno, seqno);
Expand Down Expand Up @@ -757,6 +858,7 @@ namespace ccf::historical
{
// This is a new handle - insert a newly created Request for it
it = requests.emplace_hint(it, handle, Request(all_stores));
use(handle);
HISTORICAL_LOG("First time I've seen handle {}", handle);
}

Expand All @@ -772,9 +874,18 @@ namespace ccf::historical
seqnos.size(),
*seqnos.begin(),
include_receipts);
request.adjust_ranges(
auto [removed, probably_added] = request.adjust_ranges(
seqnos, include_receipts, earliest_secret_.valid_from);

for (auto seq : removed)
{
remove_ref(seq, handle);
}
for (auto seq : probably_added)
{
add_ref(seq, handle);
}

// If the earliest target entry cannot be deserialised with the earliest
// known ledger secret, record the target seqno and begin fetching the
// previous historical ledger secret.
Expand Down Expand Up @@ -823,6 +934,7 @@ namespace ccf::historical
{
if (request_it->second.get_store_details(seqno) != nullptr)
{
evict(request_it->first);
request_it = requests.erase(request_it);
}
else
Expand Down Expand Up @@ -977,6 +1089,11 @@ namespace ccf::historical
default_expiry_duration = duration;
}

void set_soft_cache_limit(CacheSize cache_limit)
{
soft_limit = cache_limit;
}

void track_deletes_on_missing_keys(bool track)
{
track_deletes_on_missing_keys_v = track;
Expand All @@ -985,6 +1102,7 @@ namespace ccf::historical
bool drop_cached_states(const CompoundHandle& handle)
{
std::lock_guard<ccf::pal::Mutex> guard(requests_lock);
evict(handle);
const auto erased_count = requests.erase(handle);
HISTORICAL_LOG("Dropping historical request {}", handle);
return erased_count > 0;
Expand Down Expand Up @@ -1094,6 +1212,7 @@ namespace ccf::historical
std::move(claims_digest),
has_commit_evidence);

update_store_raw_size(seqno, size);
return true;
}

Expand Down Expand Up @@ -1245,6 +1364,7 @@ namespace ccf::historical
{
LOG_DEBUG_FMT(
"Dropping expired historical query with handle {}", it->first);
evict(it->first);
it = requests.erase(it);
}
else
Expand All @@ -1255,6 +1375,32 @@ namespace ccf::historical
}
}

size_t avg = raw_sizes.empty() ?
0 :
(std::accumulate(
raw_sizes.begin(),
raw_sizes.end(),
0ll,
[&](size_t current, const auto& item) {
return current + item.second;
}) /
raw_sizes.size());
LOG_INFO_FMT(
"3x3x remembered size {}, raw_sizes cnt {}, avg raw size {}",
remembered_size,
raw_sizes.size(),
avg);

while (soft_limit && remembered_size > soft_limit)
{
assert(false); // Never called in the test
assert(!stupid.empty());
auto handle = stupid.back();
evict(handle);
requests.erase(handle);
stupid.pop_back();
}

{
auto it = all_stores.begin();
std::optional<std::pair<ccf::SeqNo, ccf::SeqNo>> range_to_request =
Expand Down Expand Up @@ -1434,6 +1580,11 @@ namespace ccf::historical
StateCacheImpl::set_default_expiry_duration(duration);
}

void set_soft_cache_limit(CacheSize cache_limit) override
{
StateCacheImpl::set_soft_cache_limit(cache_limit);
}

void track_deletes_on_missing_keys(bool track) override
{
StateCacheImpl::track_deletes_on_missing_keys(track);
Expand Down
2 changes: 2 additions & 0 deletions src/node/rpc/test/node_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ namespace ccf
historical::ExpiryDuration seconds_until_expiry)
{}

void set_soft_cache_limit(historical::CacheSize seconds_until_expiry){};

void track_deletes_on_missing_keys(bool track) {}

kv::ReadOnlyStorePtr get_store_at(
Expand Down

0 comments on commit ba609ba

Please sign in to comment.