Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
sync_peer option to specify preferred nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
joecaswell committed Apr 28, 2022
1 parent ddc431a commit 08574df
Showing 1 changed file with 41 additions and 7 deletions.
48 changes: 41 additions & 7 deletions src/blockchain_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -879,21 +879,29 @@ reset_ledger_to_snap(Hash, Height, State) ->
snapshot_sync(State1#state{snapshot_info=SnapInfo}).

start_sync(#state{blockchain = Chain, swarm_tid = SwarmTID} = State) ->
case get_random_peer(SwarmTID) of
Peer = get_configured_or_random_peer(SwarmTID),
case Peer of
no_peers ->
%% try again later when there's peers
schedule_sync(State);
RandomPeer ->
{Pid, Ref} = start_block_sync(SwarmTID, Chain, RandomPeer, [], <<>>),
Peer ->
{Pid, Ref} = start_block_sync(SwarmTID, Chain, Peer, [], <<>>),
lager:info("new block sync starting with Pid: ~p, Ref: ~p, Peer: ~p",
[Pid, Ref, RandomPeer]),
[Pid, Ref, Peer]),
State#state{sync_pid = Pid, sync_ref = Ref}
end.

get_configured_or_random_peer(SwarmTID) ->
case get_configured_sync_peer(SwarmTID) of
undefined-> get_random_peer(SwarmTID);
P -> P
end.

-spec get_random_peer(SwarmTID :: ets:tab()) -> no_peers | string().
get_random_peer(SwarmTID) ->
lager:debug("Get random peer"),
Peerbook = libp2p_swarm:peerbook(SwarmTID),
%% limit peers to random connections with public addresses
%% limit peers to connections with a public address
F = fun(Peer) ->
case application:get_env(blockchain, testing, false) of
false ->
Expand All @@ -906,9 +914,35 @@ get_random_peer(SwarmTID) ->
case libp2p_peerbook:random(Peerbook, [], F, 100) of
false -> no_peers;
{Addr, _Peer} ->
"/p2p/" ++ libp2p_crypto:bin_to_b58(Addr)
libp2p_crypto:pubkey_bin_to_p2p(Addr)
end.

-spec get_configured_sync_peer(ets:tab()) -> string() | undefined.
get_configured_sync_peer(SwarmTID) ->
case application:get_env(blockchain, sync_peers, []) of
[] ->
lager:debug("No sync_peers configured"),
undefined;
ConfiguredPeers ->
Peerbook = libp2p_swarm:peerbook(SwarmTID),
{Left, Right} = lists:split(rand:uniform(length(ConfiguredPeers)), ConfiguredPeers),
get_configured_sync_peer(SwarmTID, Peerbook, Right ++ Left)
end.

get_configured_sync_peer(SwarmTID, Peerbook, [ Peer | RestPeers ]) ->
case libp2p_swarm:connect(SwarmTID, Peer) of
{ok, _Session} ->
lager:debug("Connected to configured peer ~p",[Peer]),
Peer;
_Error ->
lager:debug("Failed to connect to configured peer ~p: ~p",[Peer, _Error]),
get_configured_sync_peer(SwarmTID, Peerbook, RestPeers)
end;
get_configured_sync_peer(_SwarmTID, _, []) ->
% unable to connect to any of the provided peers
lager:debug("Failed to connect to any configured peer"),
undefined.

reset_sync_timer(State) ->
lager:info("try again in ~p", [?SYNC_TIME]),
erlang:cancel_timer(State#state.sync_timer),
Expand Down Expand Up @@ -1028,7 +1062,7 @@ grab_snapshot(Height, Hash) ->
Chain = blockchain_worker:blockchain(),
SwarmTID = blockchain_swarm:tid(),

case get_random_peer(SwarmTID) of
case get_configured_or_random_peer(SwarmTID) of
no_peers -> {error, no_peers};
Peer ->
case libp2p_swarm:dial_framed_stream(SwarmTID,
Expand Down

0 comments on commit 08574df

Please sign in to comment.