Skip to content

Commit

Permalink
Reconnect on-demand clients from MessagesSource::reconnect and Messag…
Browse files Browse the repository at this point in the history
…esTarget::reconnect (#1927)

* reconnect on-demand clients from MessagesSource::reconnect and MessagesTarget::reconnect

* add issue reference

* fmt
  • Loading branch information
svyatonik authored Mar 2, 2023
1 parent 4161b51 commit d86c3ce
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 2 deletions.
16 changes: 15 additions & 1 deletion relays/lib-substrate-relay/src/messages_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,22 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesSource<P> {
type Error = SubstrateError;

async fn reconnect(&mut self) -> Result<(), SubstrateError> {
// since the client calls RPC methods on both sides, we need to reconnect both
self.source_client.reconnect().await?;
self.target_client.reconnect().await
self.target_client.reconnect().await?;

// call reconnect on on-demand headers relay, because we may use different chains there
// and the error that has lead to reconnect may have came from those other chains
// (see `require_target_header_on_source`)
//
// this may lead to multiple reconnects to the same node during the same call and it
// needs to be addressed in the future
// TODO: https://github.com/paritytech/parity-bridges-common/issues/1928
if let Some(ref mut target_to_source_headers_relay) = self.target_to_source_headers_relay {
target_to_source_headers_relay.reconnect().await?;
}

Ok(())
}
}

Expand Down
16 changes: 15 additions & 1 deletion relays/lib-substrate-relay/src/messages_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,22 @@ impl<P: SubstrateMessageLane> RelayClient for SubstrateMessagesTarget<P> {
type Error = SubstrateError;

async fn reconnect(&mut self) -> Result<(), SubstrateError> {
// since the client calls RPC methods on both sides, we need to reconnect both
self.target_client.reconnect().await?;
self.source_client.reconnect().await
self.source_client.reconnect().await?;

// call reconnect on on-demand headers relay, because we may use different chains there
// and the error that has lead to reconnect may have came from those other chains
// (see `require_source_header_on_target`)
//
// this may lead to multiple reconnects to the same node during the same call and it
// needs to be addressed in the future
// TODO: https://github.com/paritytech/parity-bridges-common/issues/1928
if let Some(ref mut source_to_target_headers_relay) = self.source_to_target_headers_relay {
source_to_target_headers_relay.reconnect().await?;
}

Ok(())
}
}

Expand Down
10 changes: 10 additions & 0 deletions relays/lib-substrate-relay/src/on_demand/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub struct OnDemandHeadersRelay<P: SubstrateFinalitySyncPipeline> {
required_header_number: RequiredHeaderNumberRef<P::SourceChain>,
/// Client of the source chain.
source_client: Client<P::SourceChain>,
/// Client of the target chain.
target_client: Client<P::TargetChain>,
}

impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
Expand All @@ -83,6 +85,7 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
relay_task_name: on_demand_headers_relay_name::<P::SourceChain, P::TargetChain>(),
required_header_number: required_header_number.clone(),
source_client: source_client.clone(),
target_client: target_client.clone(),
};
async_std::task::spawn(async move {
background_task::<P>(
Expand All @@ -104,6 +107,13 @@ impl<P: SubstrateFinalitySyncPipeline> OnDemandHeadersRelay<P> {
impl<P: SubstrateFinalitySyncPipeline> OnDemandRelay<P::SourceChain, P::TargetChain>
for OnDemandHeadersRelay<P>
{
async fn reconnect(&self) -> Result<(), SubstrateError> {
// using clone is fine here (to avoid mut requirement), because clone on Client clones
// internal references
self.source_client.clone().reconnect().await?;
self.target_client.clone().reconnect().await
}

async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceChain>) {
let mut required_header_number = self.required_header_number.lock().await;
if required_header > *required_header_number {
Expand Down
3 changes: 3 additions & 0 deletions relays/lib-substrate-relay/src/on_demand/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub mod parachains;
/// On-demand headers relay that is relaying finalizing headers only when requested.
#[async_trait]
pub trait OnDemandRelay<SourceChain: Chain, TargetChain: Chain>: Send + Sync {
/// Reconnect to source and target nodes.
async fn reconnect(&self) -> Result<(), SubstrateError>;

/// Ask relay to relay source header with given number to the target chain.
///
/// Depending on implementation, on-demand relay may also relay `required_header` ancestors
Expand Down
9 changes: 9 additions & 0 deletions relays/lib-substrate-relay/src/on_demand/parachains.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ impl<P: SubstrateParachainsPipeline> OnDemandRelay<P::SourceParachain, P::Target
where
P::SourceParachain: Chain<Hash = ParaHash>,
{
async fn reconnect(&self) -> Result<(), SubstrateError> {
// using clone is fine here (to avoid mut requirement), because clone on Client clones
// internal references
self.source_relay_client.clone().reconnect().await?;
self.target_client.clone().reconnect().await?;
// we'll probably need to reconnect relay chain relayer clients also
self.on_demand_source_relay_to_target_headers.reconnect().await
}

async fn require_more_headers(&self, required_header: BlockNumberOf<P::SourceParachain>) {
if let Err(e) = self.required_header_number_sender.send(required_header).await {
log::trace!(
Expand Down

0 comments on commit d86c3ce

Please sign in to comment.