Skip to content

Commit

Permalink
GH-452 Encapsulate unlinkable_block_state_index.
Browse files Browse the repository at this point in the history
  • Loading branch information
heifner committed Apr 18, 2023
1 parent 022b6f3 commit fd55469
Showing 1 changed file with 94 additions and 70 deletions.
164 changes: 94 additions & 70 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,27 +131,48 @@ namespace eosio {
const block_timestamp_type& timestamp() const { return block->timestamp; }
};

struct by_timestamp;
struct by_block_num_id;
struct by_prev;
typedef multi_index_container<
eosio::unlinkable_block_state,
indexed_by<
ordered_unique< tag<by_block_num_id>,
composite_key< unlinkable_block_state,
const_mem_fun<unlinkable_block_state, uint32_t , &eosio::unlinkable_block_state::block_num>,
member<unlinkable_block_state, block_id_type, &eosio::unlinkable_block_state::id>
>,
composite_key_compare< std::less<>, sha256_less >
>,
ordered_non_unique< tag<by_timestamp>,
const_mem_fun< unlinkable_block_state, const block_timestamp_type&, &unlinkable_block_state::timestamp >
>,
ordered_non_unique< tag<by_prev>,
const_mem_fun< unlinkable_block_state, const block_id_type&, &unlinkable_block_state::prev >
>
>
> unlinkable_block_state_index;
class dispatch_manager;
class unlinkable_block_state_cache {
private:
struct by_timestamp;
struct by_block_num_id;
struct by_prev;
using unlinkable_block_state_index = multi_index_container<
eosio::unlinkable_block_state,
indexed_by<
ordered_unique<tag<by_block_num_id>,
composite_key<unlinkable_block_state,
const_mem_fun<unlinkable_block_state, uint32_t, &eosio::unlinkable_block_state::block_num>,
member<unlinkable_block_state, block_id_type, &eosio::unlinkable_block_state::id>
>,
composite_key_compare<std::less<>, sha256_less>
>,
ordered_non_unique<tag<by_timestamp>,
const_mem_fun<unlinkable_block_state, const block_timestamp_type&, &unlinkable_block_state::timestamp>
>,
ordered_non_unique<tag<by_prev>,
const_mem_fun<unlinkable_block_state, const block_id_type&, &unlinkable_block_state::prev>
>
>
>;

alignas(hardware_destructive_interference_size)
mutable std::mutex unlinkable_blk_state_mtx;
unlinkable_block_state_index unlinkable_blk_state;
// 30 should be plenty large enough as any unlinkable block that will be usable is likely to be usable
// almost immediately (blocks came in from multiple peers out of order). 30 allows for one block per
// producer round until lib. When queue larger than max, remove by block timestamp farthest in the past.
static constexpr size_t max_unlinkable_cache_size = 30;
dispatch_manager& dispatch;

public:
explicit unlinkable_block_state_cache(dispatch_manager& dispatch)
: dispatch(dispatch) {};

void add_unlinkable_block( signed_block_ptr b, const block_id_type& id );
unlinkable_block_state pop_possible_linkable_block( const block_id_type& blkid );
void expire_blocks( uint32_t lib_num );
};

class sync_manager {
private:
Expand Down Expand Up @@ -206,23 +227,18 @@ namespace eosio {
mutable std::mutex blk_state_mtx;
peer_block_state_index blk_state;

alignas(hardware_destructive_interference_size)
mutable std::mutex unlinkable_blk_state_mtx;
unlinkable_block_state_index unlinkable_blk_state;
// 30 should be plenty large enough as any unlinkable block that will be usable is likely to be usable
// almost immediately (blocks came in from multiple peers out of order). 30 allows for one block per
// producer round until lib. When queue larger than max, remove by block timestamp farthest in the past.
static constexpr size_t max_unlinkable_cache_size = 30;

alignas(hardware_destructive_interference_size)
mutable std::mutex local_txns_mtx;
node_transaction_index local_txns;

unlinkable_block_state_cache unlinkable_block_cache;

public:
boost::asio::io_context::strand strand;

explicit dispatch_manager(boost::asio::io_context& io_context)
: strand( io_context ) {}
: unlinkable_block_cache(*this),
strand( io_context ) {}

void bcast_transaction(const packed_transaction_ptr& trx);
void rejected_transaction(const packed_transaction_ptr& trx);
Expand All @@ -240,13 +256,17 @@ namespace eosio {
bool have_block(const block_id_type& blkid) const;
void rm_block(const block_id_type& blkid);

void add_unlinkable_block( signed_block_ptr b, const block_id_type& id );
unlinkable_block_state pop_possible_linkable_block(const block_id_type& blkid);

bool add_peer_txn( const transaction_id_type& id, const time_point_sec& trx_expires, uint32_t connection_id,
const time_point_sec& now = time_point::now() );
bool have_txn( const transaction_id_type& tid ) const;
void expire_txns();

void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) {
unlinkable_block_cache.add_unlinkable_block(std::move(b), id);
}
unlinkable_block_state pop_possible_linkable_block( const block_id_type& blkid ) {
return unlinkable_block_cache.pop_possible_linkable_block(blkid);
}
};

/**
Expand Down Expand Up @@ -2117,8 +2137,43 @@ namespace eosio {
}

//------------------------------------------------------------------------
// thread safe

void unlinkable_block_state_cache::add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) {
std::unique_lock g(unlinkable_blk_state_mtx);
unlinkable_blk_state.insert( {id, std::move(b)} ); // does not insert if already there
if (unlinkable_blk_state.size() > max_unlinkable_cache_size) {
auto& index = unlinkable_blk_state.get<by_timestamp>();
auto begin = index.begin();
block_id_type rm_block_id = begin->id;
index.erase( begin );
g.unlock();
// rm_block since we are no longer tracking this not applied block, allowing it to flow back in if needed
dispatch.rm_block(rm_block_id);

This comment has been minimized.

Copy link
@greg7mdp

greg7mdp Apr 18, 2023

Contributor

Thanks a lot for making the change, Kevin. This line is the only location where dispatch is used. Instead of creating a dependency, why not have this function returning std::optional<block_id_type> as in my original suggestion? That way unlinkable_block_state_cache doeasn't know about dispatch_manager.

Also, why not have the functions inline in the class? It is all in the .cpp file anyways.

This comment has been minimized.

Copy link
@heifner

heifner Apr 18, 2023

Author Member

Sorry, I missed that. Sounds good to me. I'll make the change.

}
}

unlinkable_block_state unlinkable_block_state_cache::pop_possible_linkable_block(const block_id_type& blkid) {
std::lock_guard g(unlinkable_blk_state_mtx);
auto& index = unlinkable_blk_state.get<by_prev>();
auto blk_itr = index.find( blkid );
if (blk_itr != index.end()) {
unlinkable_block_state result = *blk_itr;
index.erase(blk_itr);
return result;
}
return {};
}

void unlinkable_block_state_cache::expire_blocks( uint32_t lib_num ) {
std::lock_guard<std::mutex> g(unlinkable_blk_state_mtx);
auto& stale_blk = unlinkable_blk_state.get<by_block_num_id>();
stale_blk.erase( stale_blk.lower_bound( 1 ), stale_blk.upper_bound( lib_num ) );
}

//------------------------------------------------------------------------
// thread safe

bool dispatch_manager::add_peer_block( const block_id_type& blkid, uint32_t connection_id) {
uint32_t block_num = block_header::num_from_id(blkid);
std::lock_guard<std::mutex> g( blk_state_mtx );
Expand Down Expand Up @@ -2154,32 +2209,6 @@ namespace eosio {
index.erase(p.first, p.second);
}

void dispatch_manager::add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) {
std::unique_lock g(unlinkable_blk_state_mtx);
unlinkable_blk_state.insert( {id, std::move(b)} ); // does not insert if already there
if (unlinkable_blk_state.size() > max_unlinkable_cache_size) {
auto& index = unlinkable_blk_state.get<by_timestamp>();
auto begin = index.begin();
block_id_type rm_block_id = begin->id;
index.erase( begin );
g.unlock();
// rm_block since we are no longer tracking this not applied block, allowing it to flow back in if needed
rm_block(rm_block_id);
}
}

unlinkable_block_state dispatch_manager::pop_possible_linkable_block(const block_id_type& blkid) {
std::lock_guard g(unlinkable_blk_state_mtx);
auto& index = unlinkable_blk_state.get<by_prev>();
auto blk_itr = index.find( blkid );
if (blk_itr != index.end()) {
unlinkable_block_state result = *blk_itr;
index.erase(blk_itr);
return result;
}
return {};
}

bool dispatch_manager::add_peer_txn( const transaction_id_type& id, const time_point_sec& trx_expires,
uint32_t connection_id, const time_point_sec& now ) {
std::lock_guard<std::mutex> g( local_txns_mtx );
Expand Down Expand Up @@ -2218,16 +2247,11 @@ namespace eosio {
}

void dispatch_manager::expire_blocks( uint32_t lib_num ) {
{
std::lock_guard<std::mutex> g( blk_state_mtx );
auto& stale_blk = blk_state.get<by_connection_id>();
stale_blk.erase( stale_blk.lower_bound( 1 ), stale_blk.upper_bound( lib_num ) );
}
{
std::lock_guard<std::mutex> g( unlinkable_blk_state_mtx );
auto& stale_blk = unlinkable_blk_state.get<by_block_num_id>();
stale_blk.erase( stale_blk.lower_bound( 1 ), stale_blk.upper_bound( lib_num ) );
}
unlinkable_block_cache.expire_blocks( lib_num );

std::lock_guard<std::mutex> g( blk_state_mtx );
auto& stale_blk = blk_state.get<by_connection_id>();
stale_blk.erase( stale_blk.lower_bound( 1 ), stale_blk.upper_bound( lib_num ) );
}

// thread safe
Expand Down Expand Up @@ -3409,7 +3433,7 @@ namespace eosio {
dispatcher->add_peer_block( blk_id, c->connection_id );

while (true) { // attempt previously unlinkable blocks where prev_unlinkable->block->previous == blk_id
unlinkable_block_state prev_unlinkable = my_impl->dispatcher->pop_possible_linkable_block(blk_id);
unlinkable_block_state prev_unlinkable = dispatcher->pop_possible_linkable_block(blk_id);
if (!prev_unlinkable.block)
break;
fc_dlog( logger, "retrying previous unlinkable block #${n} ${id}...",
Expand Down

0 comments on commit fd55469

Please sign in to comment.