-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathnet_plugin.cpp
4759 lines (4205 loc) · 207 KB
/
net_plugin.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include <eosio/net_plugin/net_plugin.hpp>
#include <eosio/net_plugin/protocol.hpp>
#include <eosio/net_plugin/net_utils.hpp>
#include <eosio/net_plugin/auto_bp_peering.hpp>
#include <eosio/chain/types.hpp>
#include <eosio/chain/controller.hpp>
#include <eosio/chain/exceptions.hpp>
#include <eosio/chain/block.hpp>
#include <eosio/chain/plugin_interface.hpp>
#include <eosio/chain/thread_utils.hpp>
#include <eosio/producer_plugin/producer_plugin.hpp>
#include <eosio/chain/contract_types.hpp>
#include <fc/bitutil.hpp>
#include <fc/network/message_buffer.hpp>
#include <fc/io/json.hpp>
#include <fc/io/raw.hpp>
#include <fc/reflect/variant.hpp>
#include <fc/crypto/rand.hpp>
#include <fc/exception/exception.hpp>
#include <fc/time.hpp>
#include <fc/mutex.hpp>
#include <fc/network/listener.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/multi_index/key.hpp>
#include <atomic>
#include <cmath>
#include <memory>
#include <new>
#include <regex>
// should be defined for c++17, but clang++16 still has not implemented it
#ifdef __cpp_lib_hardware_interference_size
using std::hardware_constructive_interference_size;
using std::hardware_destructive_interference_size;
#else
// 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
[[maybe_unused]] constexpr std::size_t hardware_constructive_interference_size = 64;
[[maybe_unused]] constexpr std::size_t hardware_destructive_interference_size = 64;
#endif
using namespace eosio::chain::plugin_interface;
using namespace std::chrono_literals;
namespace boost
{
/// @brief Overload for boost::lexical_cast to convert vector of strings to string
///
/// Used by boost::program_options to print the default value of an std::vector<std::string> option
///
/// @param v the vector to convert
/// @return the contents of the vector as a comma-separated string
template<>
inline std::string lexical_cast<std::string>(const std::vector<std::string>& v)
{
return boost::join(v, ",");
}
}
namespace eosio {
static auto _net_plugin = application::register_plugin<net_plugin>();
using std::vector;
using boost::asio::ip::tcp;
using boost::asio::ip::address_v4;
using boost::asio::ip::host_name;
using boost::multi_index_container;
using namespace boost::multi_index;
using fc::time_point;
using fc::time_point_sec;
using eosio::chain::transaction_id_type;
using eosio::chain::sha256_less;
class connection;
using connection_ptr = std::shared_ptr<connection>;
using connection_wptr = std::weak_ptr<connection>;
static constexpr int64_t block_interval_ns =
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(config::block_interval_ms)).count();
const std::string logger_name("net_plugin_impl");
fc::logger logger;
std::string peer_log_format;
template <typename Strand>
void verify_strand_in_this_thread(const Strand& strand, const char* func, int line) {
if( !strand.running_in_this_thread() ) {
fc_elog( logger, "wrong strand: ${f} : line ${n}, exiting", ("f", func)("n", line) );
app().quit();
}
}
struct node_transaction_state {
transaction_id_type id;
time_point_sec expires; /// time after which this may be purged.
uint32_t connection_id = 0;
};
struct by_expiry;
typedef multi_index_container<
node_transaction_state,
indexed_by<
ordered_unique<
tag<by_id>,
composite_key< node_transaction_state,
member<node_transaction_state, transaction_id_type, &node_transaction_state::id>,
member<node_transaction_state, uint32_t, &node_transaction_state::connection_id>
>,
composite_key_compare< sha256_less, std::less<> >
>,
ordered_non_unique<
tag< by_expiry >,
member< node_transaction_state, fc::time_point_sec, &node_transaction_state::expires > >
>
>
node_transaction_index;
struct peer_block_state {
block_id_type id;
uint32_t connection_id = 0;
uint32_t block_num() const { return block_header::num_from_id(id); }
};
struct by_connection_id;
typedef multi_index_container<
eosio::peer_block_state,
indexed_by<
ordered_unique< tag<by_connection_id>,
composite_key< peer_block_state,
const_mem_fun<peer_block_state, uint32_t , &eosio::peer_block_state::block_num>,
member<peer_block_state, block_id_type, &eosio::peer_block_state::id>,
member<peer_block_state, uint32_t, &eosio::peer_block_state::connection_id>
>,
composite_key_compare< std::less<>, sha256_less, std::less<> >
>
>
> peer_block_state_index;
struct unlinkable_block_state {
block_id_type id;
signed_block_ptr block;
uint32_t block_num() const { return block_header::num_from_id(id); }
const block_id_type& prev() const { return block->previous; }
const block_timestamp_type& timestamp() const { return block->timestamp; }
};
class unlinkable_block_state_cache {
private:
struct by_timestamp;
struct by_block_num_id;
struct by_prev;
using unlinkable_block_state_index = multi_index_container<
eosio::unlinkable_block_state,
indexed_by<
ordered_unique<tag<by_block_num_id>,
composite_key<unlinkable_block_state,
const_mem_fun<unlinkable_block_state, uint32_t, &eosio::unlinkable_block_state::block_num>,
member<unlinkable_block_state, block_id_type, &eosio::unlinkable_block_state::id>
>,
composite_key_compare<std::less<>, sha256_less>
>,
ordered_non_unique<tag<by_timestamp>,
const_mem_fun<unlinkable_block_state, const block_timestamp_type&, &unlinkable_block_state::timestamp>
>,
ordered_non_unique<tag<by_prev>,
const_mem_fun<unlinkable_block_state, const block_id_type&, &unlinkable_block_state::prev>
>
>
>;
alignas(hardware_destructive_interference_size)
mutable fc::mutex unlinkable_blk_state_mtx;
unlinkable_block_state_index unlinkable_blk_state GUARDED_BY(unlinkable_blk_state_mtx);
// 30 should be plenty large enough as any unlinkable block that will be usable is likely to be usable
// almost immediately (blocks came in from multiple peers out of order). 30 allows for one block per
// producer round until lib. When queue larger than max, remove by block timestamp farthest in the past.
static constexpr size_t max_unlinkable_cache_size = 30;
public:
// returns block id of any block removed because of a full cache
std::optional<block_id_type> add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) {
fc::lock_guard g(unlinkable_blk_state_mtx);
unlinkable_blk_state.insert( {id, std::move(b)} ); // does not insert if already there
if (unlinkable_blk_state.size() > max_unlinkable_cache_size) {
auto& index = unlinkable_blk_state.get<by_timestamp>();
auto begin = index.begin();
block_id_type rm_block_id = begin->id;
index.erase( begin );
return rm_block_id;
}
return {};
}
unlinkable_block_state pop_possible_linkable_block(const block_id_type& blkid) {
fc::lock_guard g(unlinkable_blk_state_mtx);
auto& index = unlinkable_blk_state.get<by_prev>();
auto blk_itr = index.find( blkid );
if (blk_itr != index.end()) {
unlinkable_block_state result = *blk_itr;
index.erase(blk_itr);
return result;
}
return {};
}
void expire_blocks( uint32_t lib_num ) {
fc::lock_guard g(unlinkable_blk_state_mtx);
auto& stale_blk = unlinkable_blk_state.get<by_block_num_id>();
stale_blk.erase( stale_blk.lower_bound( 1 ), stale_blk.upper_bound( lib_num ) );
}
};
class sync_manager {
private:
enum stages {
lib_catchup,
head_catchup,
in_sync
};
alignas(hardware_destructive_interference_size)
fc::mutex sync_mtx;
uint32_t sync_known_lib_num GUARDED_BY(sync_mtx) {0}; // highest known lib num from currently connected peers
uint32_t sync_last_requested_num GUARDED_BY(sync_mtx) {0}; // end block number of the last requested range, inclusive
uint32_t sync_next_expected_num GUARDED_BY(sync_mtx) {0}; // the next block number we need from peer
connection_ptr sync_source GUARDED_BY(sync_mtx); // connection we are currently syncing from
const uint32_t sync_req_span {0};
const uint32_t sync_peer_limit {0};
alignas(hardware_destructive_interference_size)
std::atomic<stages> sync_state{in_sync};
std::atomic<uint32_t> sync_ordinal{0};
// Instant finality makes it likely peers think their lib and head are
// not in sync but in reality they are only within small difference.
// To avoid unnecessary catchups, a margin of min_blocks_distance
// between lib and head must be reached before catchup starts.
const uint32_t min_blocks_distance{0};
private:
constexpr static auto stage_str( stages s );
bool set_state( stages newstate );
bool is_sync_required( uint32_t fork_head_block_num ); // call with locked mutex
void request_next_chunk( const connection_ptr& conn = connection_ptr() ) REQUIRES(sync_mtx);
connection_ptr find_next_sync_node(); // call with locked mutex
void start_sync( const connection_ptr& c, uint32_t target ); // locks mutex
bool verify_catchup( const connection_ptr& c, uint32_t num, const block_id_type& id ); // locks mutex
public:
explicit sync_manager( uint32_t span, uint32_t sync_peer_limit, uint32_t min_blocks_distance );
static void send_handshakes();
bool syncing_from_peer() const { return sync_state == lib_catchup; }
bool is_in_sync() const { return sync_state == in_sync; }
void sync_reset_lib_num( const connection_ptr& conn, bool closing );
void sync_reassign_fetch( const connection_ptr& c, go_away_reason reason );
void rejected_block( const connection_ptr& c, uint32_t blk_num );
void sync_recv_block( const connection_ptr& c, const block_id_type& blk_id, uint32_t blk_num, bool blk_applied );
void recv_handshake( const connection_ptr& c, const handshake_message& msg, uint32_t nblk_combined_latency );
void sync_recv_notice( const connection_ptr& c, const notice_message& msg );
};
class dispatch_manager {
alignas(hardware_destructive_interference_size)
mutable fc::mutex blk_state_mtx;
peer_block_state_index blk_state GUARDED_BY(blk_state_mtx);
alignas(hardware_destructive_interference_size)
mutable fc::mutex local_txns_mtx;
node_transaction_index local_txns GUARDED_BY(local_txns_mtx);
unlinkable_block_state_cache unlinkable_block_cache;
public:
boost::asio::io_context::strand strand;
explicit dispatch_manager(boost::asio::io_context& io_context)
: strand( io_context ) {}
void bcast_transaction(const packed_transaction_ptr& trx);
void rejected_transaction(const packed_transaction_ptr& trx);
void bcast_block( const signed_block_ptr& b, const block_id_type& id );
void rejected_block(const block_id_type& id);
void recv_block(const connection_ptr& c, const block_id_type& id, uint32_t bnum);
void expire_blocks( uint32_t lib_num );
void recv_notice(const connection_ptr& conn, const notice_message& msg, bool generated);
void retry_fetch(const connection_ptr& conn);
bool add_peer_block( const block_id_type& blkid, uint32_t connection_id );
bool peer_has_block(const block_id_type& blkid, uint32_t connection_id) const;
bool have_block(const block_id_type& blkid) const;
void rm_block(const block_id_type& blkid);
bool add_peer_txn( const transaction_id_type& id, const time_point_sec& trx_expires, uint32_t connection_id,
const time_point_sec& now = time_point_sec(time_point::now()) );
bool have_txn( const transaction_id_type& tid ) const;
void expire_txns();
void add_unlinkable_block( signed_block_ptr b, const block_id_type& id ) {
std::optional<block_id_type> rm_blk_id = unlinkable_block_cache.add_unlinkable_block(std::move(b), id);
if (rm_blk_id) {
// rm_block since we are no longer tracking this not applied block, allowing it to flow back in if needed
rm_block(*rm_blk_id);
}
}
unlinkable_block_state pop_possible_linkable_block( const block_id_type& blkid ) {
return unlinkable_block_cache.pop_possible_linkable_block(blkid);
}
};
/**
* default value initializers
*/
constexpr auto def_send_buffer_size_mb = 4;
constexpr auto def_send_buffer_size = 1024*1024*def_send_buffer_size_mb;
constexpr auto def_max_write_queue_size = def_send_buffer_size*10;
constexpr auto def_max_trx_in_progress_size = 100*1024*1024; // 100 MB
constexpr auto def_max_consecutive_immediate_connection_close = 9; // back off if client keeps closing
constexpr auto def_max_clients = 25; // 0 for unlimited clients
constexpr auto def_max_nodes_per_host = 1;
constexpr auto def_conn_retry_wait = 30;
constexpr auto def_txn_expire_wait = std::chrono::seconds(3);
constexpr auto def_resp_expected_wait = std::chrono::seconds(5);
constexpr auto def_sync_fetch_span = 1000;
constexpr auto def_keepalive_interval = 10000;
constexpr auto message_header_size = sizeof(uint32_t);
constexpr uint32_t signed_block_which = fc::get_index<net_message, signed_block>(); // see protocol net_message
constexpr uint32_t packed_transaction_which = fc::get_index<net_message, packed_transaction>(); // see protocol net_message
class connections_manager {
public:
struct connection_detail {
std::string host;
connection_ptr c;
tcp::endpoint active_ip;
tcp::resolver::results_type ips;
};
using connection_details_index = multi_index_container<
connection_detail,
indexed_by<
ordered_non_unique<
tag<struct by_host>,
key<&connection_detail::host>
>,
ordered_unique<
tag<struct by_connection>,
key<&connection_detail::c>
>
>
>;
enum class timer_type { check, stats };
private:
alignas(hardware_destructive_interference_size)
mutable std::shared_mutex connections_mtx;
connection_details_index connections;
chain::flat_set<string> supplied_peers;
alignas(hardware_destructive_interference_size)
fc::mutex connector_check_timer_mtx;
unique_ptr<boost::asio::steady_timer> connector_check_timer GUARDED_BY(connector_check_timer_mtx);
fc::mutex connection_stats_timer_mtx;
unique_ptr<boost::asio::steady_timer> connection_stats_timer GUARDED_BY(connection_stats_timer_mtx);
/// thread safe, only modified on startup
std::chrono::milliseconds heartbeat_timeout{def_keepalive_interval*2};
fc::microseconds max_cleanup_time;
boost::asio::steady_timer::duration connector_period{0};
uint32_t max_client_count{def_max_clients};
std::function<void(net_plugin::p2p_connections_metrics)> update_p2p_connection_metrics;
private: // must call with held mutex
connection_ptr find_connection_i(const string& host) const;
void connection_monitor(const std::weak_ptr<connection>& from_connection);
void connection_statistics_monitor(const std::weak_ptr<connection>& from_connection);
public:
size_t number_connections() const;
void add_supplied_peers(const vector<string>& peers );
// not thread safe, only call on startup
void init(std::chrono::milliseconds heartbeat_timeout_ms,
fc::microseconds conn_max_cleanup_time,
boost::asio::steady_timer::duration conn_period,
uint32_t maximum_client_count);
uint32_t get_max_client_count() const { return max_client_count; }
fc::microseconds get_connector_period() const;
void register_update_p2p_connection_metrics(std::function<void(net_plugin::p2p_connections_metrics)>&& fun);
void connect_supplied_peers(const string& p2p_address);
void start_conn_timers();
void start_conn_timer(boost::asio::steady_timer::duration du,
std::weak_ptr<connection> from_connection,
timer_type which);
void stop_conn_timers();
void add(connection_ptr c);
string connect(const string& host, const string& p2p_address);
string resolve_and_connect(const string& host, const string& p2p_address);
void update_connection_endpoint(connection_ptr c, const tcp::endpoint& endpoint);
void connect(const connection_ptr& c);
string disconnect(const string& host);
void close_all();
std::optional<connection_status> status(const string& host) const;
vector<connection_status> connection_statuses() const;
template <typename Function>
bool any_of_supplied_peers(Function&& f) const;
template <typename Function>
void for_each_connection(Function&& f) const;
template <typename Function>
void for_each_block_connection(Function&& f) const;
template <typename UnaryPredicate>
bool any_of_connections(UnaryPredicate&& p) const;
template <typename UnaryPredicate>
bool any_of_block_connections(UnaryPredicate&& p) const;
}; // connections_manager
class net_plugin_impl : public std::enable_shared_from_this<net_plugin_impl>,
public auto_bp_peering::bp_connection_manager<net_plugin_impl, connection> {
public:
uint16_t thread_pool_size = 4;
eosio::chain::named_thread_pool<struct net> thread_pool;
std::atomic<uint32_t> current_connection_id{0};
unique_ptr< sync_manager > sync_master;
dispatch_manager dispatcher {thread_pool.get_executor()};
connections_manager connections;
/**
* Thread safe, only updated in plugin initialize
* @{
*/
vector<string> p2p_addresses;
vector<string> p2p_server_addresses;
const string& get_first_p2p_address() const;
vector<chain::public_key_type> allowed_peers; ///< peer keys allowed to connect
std::map<chain::public_key_type,
chain::private_key_type> private_keys; ///< overlapping with producer keys, also authenticating non-producing nodes
enum possible_connections : char {
None = 0,
Producers = 1 << 0,
Specified = 1 << 1,
Any = 1 << 2
};
possible_connections allowed_connections{None};
boost::asio::steady_timer::duration txn_exp_period{0};
boost::asio::steady_timer::duration resp_expected_period{0};
std::chrono::milliseconds keepalive_interval{std::chrono::milliseconds{def_keepalive_interval}};
uint32_t max_nodes_per_host = 1;
bool p2p_accept_transactions = true;
fc::microseconds p2p_dedup_cache_expire_time_us{};
chain_id_type chain_id;
fc::sha256 node_id;
string user_agent_name;
chain_plugin* chain_plug = nullptr;
producer_plugin* producer_plug = nullptr;
bool use_socket_read_watermark = false;
/** @} */
alignas(hardware_destructive_interference_size)
fc::mutex expire_timer_mtx;
boost::asio::steady_timer expire_timer GUARDED_BY(expire_timer_mtx) {thread_pool.get_executor()};
alignas(hardware_destructive_interference_size)
fc::mutex keepalive_timer_mtx;
boost::asio::steady_timer keepalive_timer GUARDED_BY(keepalive_timer_mtx) {thread_pool.get_executor()};
alignas(hardware_destructive_interference_size)
std::atomic<bool> in_shutdown{false};
alignas(hardware_destructive_interference_size)
compat::channels::transaction_ack::channel_type::handle incoming_transaction_ack_subscription;
boost::asio::deadline_timer accept_error_timer{thread_pool.get_executor()};
struct chain_info_t {
uint32_t lib_num = 0;
block_id_type lib_id;
uint32_t head_num = 0;
block_id_type head_id;
};
std::function<void()> increment_failed_p2p_connections;
std::function<void()> increment_dropped_trxs;
private:
alignas(hardware_destructive_interference_size)
mutable fc::mutex chain_info_mtx; // protects chain_info_t
chain_info_t chain_info GUARDED_BY(chain_info_mtx);
public:
void update_chain_info();
chain_info_t get_chain_info() const;
uint32_t get_chain_lib_num() const;
uint32_t get_chain_head_num() const;
void on_accepted_block_header( const signed_block_ptr& block, const block_id_type& id );
void on_accepted_block();
void transaction_ack(const std::pair<fc::exception_ptr, packed_transaction_ptr>&);
void on_irreversible_block( const block_id_type& id, uint32_t block_num );
void start_expire_timer();
void start_monitors();
// we currently pause on snapshot generation
void wait_if_paused() const {
controller& cc = chain_plug->chain();
while (cc.is_writing_snapshot()) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
void expire();
/** \name Peer Timestamps
* Time message handling
* @{
*/
/** \brief Peer heartbeat ticker.
*/
void ticker();
/** @} */
/** \brief Determine if a peer is allowed to connect.
*
* Checks current connection mode and key authentication.
*
* \return False if the peer should not connect, true otherwise.
*/
bool authenticate_peer(const handshake_message& msg) const;
/** \brief Retrieve public key used to authenticate with peers.
*
* Finds a key to use for authentication. If this node is a producer, use
* the front of the producer key map. If the node is not a producer but has
* a configured private key, use it. If the node is neither a producer nor has
* a private key, returns an empty key.
*
* \note On a node with multiple private keys configured, the key with the first
* numerically smaller byte will always be used.
*/
chain::public_key_type get_authentication_key() const;
/** \brief Returns a signature of the digest using the corresponding private key of the signer.
*
* If there are no configured private keys, returns an empty signature.
*/
chain::signature_type sign_compact(const chain::public_key_type& signer, const fc::sha256& digest) const;
constexpr static uint16_t to_protocol_version(uint16_t v);
void plugin_initialize(const variables_map& options);
void plugin_startup();
void plugin_shutdown();
bool in_sync() const;
fc::logger& get_logger() { return logger; }
void create_session(tcp::socket&& socket, const string listen_address, size_t limit);
std::string empty{};
}; //net_plugin_impl
// peer_[x]log must be called from thread in connection strand
#define peer_dlog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::debug ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( debug, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
#define peer_ilog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::info ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( info, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
#define peer_wlog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::warn ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( warn, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
#define peer_elog( PEER, FORMAT, ... ) \
FC_MULTILINE_MACRO_BEGIN \
if( logger.is_enabled( fc::log_level::error ) ) { \
verify_strand_in_this_thread( PEER->strand, __func__, __LINE__ ); \
logger.log( FC_LOG_MESSAGE( error, peer_log_format + FORMAT, __VA_ARGS__ (PEER->get_logger_variant()) ) ); \
} \
FC_MULTILINE_MACRO_END
template<class enum_type, class=typename std::enable_if<std::is_enum<enum_type>::value>::type>
inline enum_type& operator|=(enum_type& lhs, const enum_type& rhs)
{
using T = std::underlying_type_t <enum_type>;
return lhs = static_cast<enum_type>(static_cast<T>(lhs) | static_cast<T>(rhs));
}
static net_plugin_impl *my_impl;
/**
* For a while, network version was a 16 bit value equal to the second set of 16 bits
* of the current build's git commit id. We are now replacing that with an integer protocol
* identifier. Based on historical analysis of all git commit identifiers, the larges gap
* between ajacent commit id values is shown below.
* these numbers were found with the following commands on the master branch:
*
* git log | grep "^commit" | awk '{print substr($2,5,4)}' | sort -u > sorted.txt
* rm -f gap.txt; prev=0; for a in $(cat sorted.txt); do echo $prev $((0x$a - 0x$prev)) $a >> gap.txt; prev=$a; done; sort -k2 -n gap.txt | tail
*
* DO NOT EDIT net_version_base OR net_version_range!
*/
constexpr uint16_t net_version_base = 0x04b5;
constexpr uint16_t net_version_range = 106;
/**
* If there is a change to network protocol or behavior, increment net version to identify
* the need for compatibility hooks
*/
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-variable"
constexpr uint16_t proto_base = 0;
constexpr uint16_t proto_explicit_sync = 1; // version at time of eosio 1.0
constexpr uint16_t proto_block_id_notify = 2; // reserved. feature was removed. next net_version should be 3
constexpr uint16_t proto_pruned_types = 3; // eosio 2.1: supports new signed_block & packed_transaction types
constexpr uint16_t proto_heartbeat_interval = 4; // eosio 2.1: supports configurable heartbeat interval
constexpr uint16_t proto_dup_goaway_resolution = 5; // eosio 2.1: support peer address based duplicate connection resolution
constexpr uint16_t proto_dup_node_id_goaway = 6; // eosio 2.1: support peer node_id based duplicate connection resolution
constexpr uint16_t proto_leap_initial = 7; // leap client, needed because none of the 2.1 versions are supported
constexpr uint16_t proto_block_range = 8; // include block range in notice_message
#pragma GCC diagnostic pop
constexpr uint16_t net_version_max = proto_leap_initial;
/**
* Index by start_block_num
*/
struct peer_sync_state {
explicit peer_sync_state(uint32_t start = 0, uint32_t end = 0, uint32_t last_acted = 0)
:start_block( start ), end_block( end ), last( last_acted ),
start_time(time_point::now())
{}
uint32_t start_block;
uint32_t end_block;
uint32_t last; ///< last sent or received
time_point start_time; ///< time request made or received
};
// thread safe
class queued_buffer : boost::noncopyable {
public:
void clear_write_queue() {
fc::lock_guard g( _mtx );
_write_queue.clear();
_sync_write_queue.clear();
_write_queue_size = 0;
}
void clear_out_queue() {
fc::lock_guard g( _mtx );
while ( !_out_queue.empty() ) {
_out_queue.pop_front();
}
}
uint32_t write_queue_size() const {
fc::lock_guard g( _mtx );
return _write_queue_size;
}
bool is_out_queue_empty() const {
fc::lock_guard g( _mtx );
return _out_queue.empty();
}
bool ready_to_send() const {
fc::lock_guard g( _mtx );
// if out_queue is not empty then async_write is in progress
return ((!_sync_write_queue.empty() || !_write_queue.empty()) && _out_queue.empty());
}
// @param callback must not callback into queued_buffer
bool add_write_queue( const std::shared_ptr<vector<char>>& buff,
std::function<void( boost::system::error_code, std::size_t )> callback,
bool to_sync_queue ) {
fc::lock_guard g( _mtx );
if( to_sync_queue ) {
_sync_write_queue.push_back( {buff, std::move(callback)} );
} else {
_write_queue.push_back( {buff, std::move(callback)} );
}
_write_queue_size += buff->size();
if( _write_queue_size > 2 * def_max_write_queue_size ) {
return false;
}
return true;
}
void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs ) {
fc::lock_guard g( _mtx );
if( !_sync_write_queue.empty() ) { // always send msgs from sync_write_queue first
fill_out_buffer( bufs, _sync_write_queue );
} else { // postpone real_time write_queue if sync queue is not empty
fill_out_buffer( bufs, _write_queue );
EOS_ASSERT( _write_queue_size == 0, plugin_exception, "write queue size expected to be zero" );
}
}
void out_callback( boost::system::error_code ec, std::size_t w ) {
fc::lock_guard g( _mtx );
for( auto& m : _out_queue ) {
m.callback( ec, w );
}
}
private:
struct queued_write;
void fill_out_buffer( std::vector<boost::asio::const_buffer>& bufs,
deque<queued_write>& w_queue ) REQUIRES(_mtx) {
while ( !w_queue.empty() ) {
auto& m = w_queue.front();
bufs.emplace_back( m.buff->data(), m.buff->size() );
_write_queue_size -= m.buff->size();
_out_queue.emplace_back( m );
w_queue.pop_front();
}
}
private:
struct queued_write {
std::shared_ptr<vector<char>> buff;
std::function<void( boost::system::error_code, std::size_t )> callback;
};
alignas(hardware_destructive_interference_size)
mutable fc::mutex _mtx;
uint32_t _write_queue_size GUARDED_BY(_mtx) {0};
deque<queued_write> _write_queue GUARDED_BY(_mtx);
deque<queued_write> _sync_write_queue GUARDED_BY(_mtx); // sync_write_queue will be sent first
deque<queued_write> _out_queue GUARDED_BY(_mtx);
}; // queued_buffer
/// monitors the status of blocks as to whether a block is accepted (sync'd) or
/// rejected. It groups consecutive rejected blocks in a (configurable) time
/// window (rbw) and maintains a metric of the number of consecutive rejected block
/// time windows (rbws).
class block_status_monitor {
private:
bool in_accepted_state_ {true}; ///< indicates of accepted(true) or rejected(false) state
fc::microseconds window_size_{2*1000}; ///< rbw time interval (2ms)
fc::time_point window_start_; ///< The start of the recent rbw (0 implies not started)
uint32_t events_{0}; ///< The number of consecutive rbws
const uint32_t max_consecutive_rejected_windows_{13};
public:
/// ctor
///
/// @param[in] window_size The time, in microseconds, of the rejected block window
/// @param[in] max_rejected_windows The max consecutive number of rejected block windows
/// @note Copy ctor is not allowed
explicit block_status_monitor(fc::microseconds window_size = fc::microseconds(2*1000),
uint32_t max_rejected_windows = 13) :
window_size_(window_size) {}
block_status_monitor( const block_status_monitor& ) = delete;
block_status_monitor( block_status_monitor&& ) = delete;
~block_status_monitor() = default;
/// reset to initial state
void reset();
/// called when a block is accepted (sync_recv_block)
void accepted() { reset(); }
/// called when a block is rejected
void rejected();
/// returns number of consecutive rbws
auto events() const { return events_; }
/// indicates if the max number of consecutive rbws has been reached or exceeded
bool max_events_violated() const { return events_ >= max_consecutive_rejected_windows_; }
/// assignment not allowed
block_status_monitor& operator=( const block_status_monitor& ) = delete;
block_status_monitor& operator=( block_status_monitor&& ) = delete;
}; // block_status_monitor
class connection : public std::enable_shared_from_this<connection> {
public:
enum class connection_state { connecting, connected, closing, closed };
explicit connection( const string& endpoint, const string& listen_address );
/// @brief ctor
/// @param socket created by boost::asio in fc::listener
/// @param address identifier of listen socket which accepted this new connection
explicit connection( tcp::socket&& socket, const string& listen_address, size_t block_sync_rate_limit );
~connection() = default;
connection( const connection& ) = delete;
connection( connection&& ) = delete;
connection& operator=( const connection& ) = delete;
connection& operator=( connection&& ) = delete;
bool start_session();
bool socket_is_open() const { return socket_open.load(); } // thread safe, atomic
connection_state state() const { return conn_state.load(); } // thread safe atomic
void set_state(connection_state s);
static std::string state_str(connection_state s);
const string& peer_address() const { return peer_addr; } // thread safe, const
void set_connection_type( const string& peer_addr );
bool is_transactions_only_connection()const { return connection_type == transactions_only; } // thread safe, atomic
bool is_blocks_only_connection()const { return connection_type == blocks_only; }
bool is_transactions_connection() const { return connection_type != blocks_only; } // thread safe, atomic
bool is_blocks_connection() const { return connection_type != transactions_only; } // thread safe, atomic
uint32_t get_peer_start_block_num() const { return peer_start_block_num.load(); }
uint32_t get_peer_head_block_num() const { return peer_head_block_num.load(); }
uint32_t get_last_received_block_num() const { return last_received_block_num.load(); }
uint32_t get_unique_blocks_rcvd_count() const { return unique_blocks_rcvd_count.load(); }
size_t get_bytes_received() const { return bytes_received.load(); }
std::chrono::nanoseconds get_last_bytes_received() const { return last_bytes_received.load(); }
size_t get_bytes_sent() const { return bytes_sent.load(); }
std::chrono::nanoseconds get_last_bytes_sent() const { return last_bytes_sent.load(); }
size_t get_block_sync_bytes_received() const { return block_sync_bytes_received.load(); }
size_t get_block_sync_bytes_sent() const { return block_sync_total_bytes_sent.load(); }
bool get_block_sync_throttling() const { return block_sync_throttling.load(); }
boost::asio::ip::port_type get_remote_endpoint_port() const { return remote_endpoint_port.load(); }
void set_heartbeat_timeout(std::chrono::milliseconds msec) {
hb_timeout = msec;
}
uint64_t get_peer_ping_time_ns() const { return peer_ping_time_ns; }
private:
static const string unknown;
std::atomic<uint64_t> peer_ping_time_ns = std::numeric_limits<uint64_t>::max();
std::optional<peer_sync_state> peer_requested; // this peer is requesting info from us
alignas(hardware_destructive_interference_size)
std::atomic<bool> socket_open{false};
std::atomic<connection_state> conn_state{connection_state::connecting};
const string peer_addr;
enum connection_types : char {
both,
transactions_only,
blocks_only
};
size_t block_sync_rate_limit{0}; // bytes/second, default unlimited
std::atomic<connection_types> connection_type{both};
std::atomic<uint32_t> peer_start_block_num{0};
std::atomic<uint32_t> peer_head_block_num{0};
std::atomic<uint32_t> last_received_block_num{0};
std::atomic<uint32_t> unique_blocks_rcvd_count{0};
std::atomic<size_t> bytes_received{0};
std::atomic<std::chrono::nanoseconds> last_bytes_received{0ns};
std::atomic<size_t> bytes_sent{0};
std::atomic<size_t> block_sync_bytes_received{0};
std::atomic<size_t> block_sync_total_bytes_sent{0};
std::chrono::nanoseconds block_sync_send_start{0ns}; // start of enqueue blocks
size_t block_sync_frame_bytes_sent{0}; // bytes sent in this set of enqueue blocks
std::atomic<bool> block_sync_throttling{false};
std::atomic<std::chrono::nanoseconds> last_bytes_sent{0ns};
std::atomic<boost::asio::ip::port_type> remote_endpoint_port{0};
public:
boost::asio::io_context::strand strand;
std::shared_ptr<tcp::socket> socket; // only accessed through strand after construction
fc::message_buffer<1024*1024> pending_message_buffer;
std::size_t outstanding_read_bytes{0}; // accessed only from strand threads
queued_buffer buffer_queue;
fc::sha256 conn_node_id;
string short_conn_node_id;
string listen_address; // address sent to peer in handshake
string log_p2p_address;
string log_remote_endpoint_ip;
string log_remote_endpoint_port;
string local_endpoint_ip;
string local_endpoint_port;
// kept in sync with last_handshake_recv.last_irreversible_block_num, only accessed from connection strand
uint32_t peer_lib_num = 0;
std::atomic<uint32_t> sync_ordinal{0};
// when syncing from a peer, the last block expected of the current range
uint32_t sync_last_requested_block{0};
alignas(hardware_destructive_interference_size)
std::atomic<uint32_t> trx_in_progress_size{0};
fc::time_point last_dropped_trx_msg_time;
const uint32_t connection_id;
int16_t sent_handshake_count = 0;
alignas(hardware_destructive_interference_size)
std::atomic<bool> peer_syncing_from_us{false};
std::atomic<uint16_t> protocol_version = 0;
uint16_t net_version = net_version_max;
std::atomic<uint16_t> consecutive_immediate_connection_close = 0;
std::atomic<bool> is_bp_connection = false;
block_status_monitor block_status_monitor_;
alignas(hardware_destructive_interference_size)
fc::mutex response_expected_timer_mtx;
boost::asio::steady_timer response_expected_timer GUARDED_BY(response_expected_timer_mtx);
alignas(hardware_destructive_interference_size)
std::atomic<go_away_reason> no_retry{no_reason};
alignas(hardware_destructive_interference_size)
mutable fc::mutex conn_mtx; //< mtx for last_req .. remote_endpoint_ip
std::optional<request_message> last_req GUARDED_BY(conn_mtx);
handshake_message last_handshake_recv GUARDED_BY(conn_mtx);
handshake_message last_handshake_sent GUARDED_BY(conn_mtx);
block_id_type fork_head GUARDED_BY(conn_mtx);
uint32_t fork_head_num GUARDED_BY(conn_mtx) {0};
fc::time_point last_close GUARDED_BY(conn_mtx);
std::string p2p_address GUARDED_BY(conn_mtx);
std::string unique_conn_node_id GUARDED_BY(conn_mtx);
std::string remote_endpoint_ip GUARDED_BY(conn_mtx);
boost::asio::ip::address_v6::bytes_type remote_endpoint_ip_array GUARDED_BY(conn_mtx);
std::chrono::nanoseconds connection_start_time{0};
connection_status get_status()const;
/** \name Peer Timestamps
* Time message handling
* @{
*/
// See NTP protocol. https://datatracker.ietf.org/doc/rfc5905/
std::chrono::nanoseconds org{0}; //!< origin timestamp. Time at the client when the request departed for the server.
// std::chrono::nanoseconds (not used) rec{0}; //!< receive timestamp. Time at the server when the request arrived from the client.
std::chrono::nanoseconds xmt{0}; //!< transmit timestamp, Time at the server when the response left for the client.
// std::chrono::nanoseconds (not used) dst{0}; //!< destination timestamp, Time at the client when the reply arrived from the server.
/** @} */
// timestamp for the lastest message
std::chrono::system_clock::time_point latest_msg_time{std::chrono::system_clock::time_point::min()};
std::chrono::milliseconds hb_timeout{std::chrono::milliseconds{def_keepalive_interval}};
std::chrono::system_clock::time_point latest_blk_time{std::chrono::system_clock::time_point::min()};
bool connected() const;
bool closed() const; // socket is not open or is closed or closing, thread safe
bool current() const;
bool should_sync_from(uint32_t sync_next_expected_num, uint32_t sync_known_lib_num) const;
/// @param reconnect true if we should try and reconnect immediately after close
/// @param shutdown true only if plugin is shutting down
void close( bool reconnect = true, bool shutdown = false );
private:
void _close( bool reconnect, bool shutdown ); // for easy capture
bool process_next_block_message(uint32_t message_length);
bool process_next_trx_message(uint32_t message_length);
void update_endpoints(const tcp::endpoint& endpoint = tcp::endpoint());
public:
bool populate_handshake( handshake_message& hello ) const;