From e488f529df7ae3f9672b084706775a808c430c62 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 26 Sep 2022 11:23:31 +0000 Subject: [PATCH 1/6] Impl dynamic window size. Keep sessions for unfinalized chain Signed-off-by: Andrei Sandu --- .../src/rolling_session_window.rs | 117 ++++++++++++++++-- 1 file changed, 110 insertions(+), 7 deletions(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index dd9282b50fe5..31c8bcc370bc 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -24,11 +24,13 @@ use polkadot_primitives::v2::{Hash, SessionIndex, SessionInfo}; use futures::channel::oneshot; use polkadot_node_subsystem::{ - errors::RuntimeApiError, - messages::{RuntimeApiMessage, RuntimeApiRequest}, + errors::{ChainApiError, RuntimeApiError}, + messages::{ChainApiMessage, RuntimeApiMessage, RuntimeApiRequest}, overseer, }; +const LOG_TARGET: &str = "parachain::rolling-session-window"; + /// Sessions unavailable in state to cache. #[derive(Debug, Clone, thiserror::Error)] pub enum SessionsUnavailableReason { @@ -38,9 +40,18 @@ pub enum SessionsUnavailableReason { /// The runtime API itself returned an error. #[error(transparent)] RuntimeApi(#[from] RuntimeApiError), + /// The chain API itself returned an error. + #[error(transparent)] + ChainApi(#[from] ChainApiError), /// Missing session info from runtime API for given `SessionIndex`. #[error("Missing session index {0:?}")] Missing(SessionIndex), + /// Missing last finalized block number. + #[error("Missing last finalized block number")] + MissingLastFinalizedBlock, + /// Missing last finalized block hash. + #[error("Missing last finalized block hash")] + MissingLastFinalizedBlockHash, } /// Information about the sessions being fetched. @@ -98,7 +109,8 @@ impl RollingSessionWindow { block_hash: Hash, ) -> Result where - Sender: overseer::SubsystemSender, + Sender: overseer::SubsystemSender + + overseer::SubsystemSender, { let session_index = get_session_index_for_child(&mut sender, block_hash).await?; @@ -146,6 +158,87 @@ impl RollingSessionWindow { self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1) } + async fn last_finalized_block_session( + &self, + sender: &mut Sender, + ) -> Result + where + Sender: overseer::SubsystemSender + + overseer::SubsystemSender, + { + let last_finalized_height = { + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; + match rx.await { + Ok(Ok(number)) => number, + Ok(Err(e)) => + return Err(SessionsUnavailable { + kind: SessionsUnavailableReason::ChainApi(e), + info: None, + }), + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?err, + "Failed fetching last finalized block number" + ); + return Err(SessionsUnavailable { + kind: SessionsUnavailableReason::MissingLastFinalizedBlock, + info: None, + }) + }, + } + }; + + let (tx, rx) = oneshot::channel(); + // We want to get the parent of the last finalized block. + sender + .send_message(ChainApiMessage::FinalizedBlockHash( + last_finalized_height.saturating_sub(1), + tx, + )) + .await; + let last_finalized_hash_parent = match rx.await { + Ok(Ok(maybe_hash)) => maybe_hash, + Ok(Err(e)) => + return Err(SessionsUnavailable { + kind: SessionsUnavailableReason::ChainApi(e), + info: None, + }), + Err(err) => { + gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash"); + return Err(SessionsUnavailable { + kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash, + info: None, + }) + }, + }; + + // Get the session in which the last finalized block was authored. + if let Some(last_finalized_hash_parent) = last_finalized_hash_parent { + let session = + match get_session_index_for_child(sender, last_finalized_hash_parent).await { + Ok(session_index) => session_index, + Err(err) => { + gum::warn!( + target: LOG_TARGET, + ?err, + ?last_finalized_hash_parent, + "Failed fetching session index" + ); + return Err(err) + }, + }; + + Ok(session) + } else { + return Err(SessionsUnavailable { + kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash, + info: None, + }) + } + } + /// When inspecting a new import notification, updates the session info cache to match /// the session of the imported block's child. /// @@ -153,12 +246,17 @@ impl RollingSessionWindow { /// not change often and import notifications are expected to be typically increasing in session number. /// /// some backwards drift in session index is acceptable. - pub async fn cache_session_info_for_head( + pub async fn cache_session_info_for_head( &mut self, - sender: &mut impl overseer::SubsystemSender, + sender: &mut Sender, block_hash: Hash, - ) -> Result { + ) -> Result + where + Sender: overseer::SubsystemSender + + overseer::SubsystemSender, + { let session_index = get_session_index_for_child(sender, block_hash).await?; + let last_finalized_block_session = self.last_finalized_block_session(sender).await?; let old_window_start = self.earliest_session; @@ -171,7 +269,12 @@ impl RollingSessionWindow { let old_window_end = latest; - let window_start = session_index.saturating_sub(self.window_size.get() - 1); + // Ensure we keep sessions up to last finalized block by adjusting the window start. + // This will increase the session window to cover the full unfinalized chain. + let window_start = std::cmp::min( + session_index.saturating_sub(self.window_size.get() - 1), + last_finalized_block_session, + ); // keep some of the old window, if applicable. let overlap_start = window_start.saturating_sub(old_window_start); From 5780cc96f37171e10db59001813770a9192099cc Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 26 Sep 2022 15:30:25 +0000 Subject: [PATCH 2/6] feedback Signed-off-by: Andrei Sandu --- node/subsystem-util/src/rolling_session_window.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 31c8bcc370bc..06951fe3a216 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -158,7 +158,7 @@ impl RollingSessionWindow { self.earliest_session + (self.session_info.len() as SessionIndex).saturating_sub(1) } - async fn last_finalized_block_session( + async fn earliest_non_finalized_block_session( &self, sender: &mut Sender, ) -> Result @@ -193,10 +193,7 @@ impl RollingSessionWindow { let (tx, rx) = oneshot::channel(); // We want to get the parent of the last finalized block. sender - .send_message(ChainApiMessage::FinalizedBlockHash( - last_finalized_height.saturating_sub(1), - tx, - )) + .send_message(ChainApiMessage::FinalizedBlockHash(last_finalized_height, tx)) .await; let last_finalized_hash_parent = match rx.await { Ok(Ok(maybe_hash)) => maybe_hash, @@ -256,7 +253,8 @@ impl RollingSessionWindow { + overseer::SubsystemSender, { let session_index = get_session_index_for_child(sender, block_hash).await?; - let last_finalized_block_session = self.last_finalized_block_session(sender).await?; + let earliest_non_finalized_block_session = + self.earliest_non_finalized_block_session(sender).await?; let old_window_start = self.earliest_session; @@ -273,7 +271,7 @@ impl RollingSessionWindow { // This will increase the session window to cover the full unfinalized chain. let window_start = std::cmp::min( session_index.saturating_sub(self.window_size.get() - 1), - last_finalized_block_session, + earliest_non_finalized_block_session, ); // keep some of the old window, if applicable. From 2132d9c8d34b2781dd4cc925fca88eaa9a19de8c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 27 Sep 2022 08:00:02 +0000 Subject: [PATCH 3/6] Stretch also in contructor plus tests Signed-off-by: Andrei Sandu --- .../src/rolling_session_window.rs | 222 +++++++++++++++++- 1 file changed, 219 insertions(+), 3 deletions(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 06951fe3a216..17b8a24739f9 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -113,8 +113,14 @@ impl RollingSessionWindow { + overseer::SubsystemSender, { let session_index = get_session_index_for_child(&mut sender, block_hash).await?; + let earliest_non_finalized_block_session = + Self::earliest_non_finalized_block_session(&mut sender).await?; - let window_start = session_index.saturating_sub(window_size.get() - 1); + // This will increase the session window to cover the full unfinalized chain. + let window_start = std::cmp::min( + session_index.saturating_sub(window_size.get() - 1), + earliest_non_finalized_block_session, + ); match load_all_sessions(&mut sender, block_hash, window_start, session_index).await { Err(kind) => Err(SessionsUnavailable { @@ -159,7 +165,6 @@ impl RollingSessionWindow { } async fn earliest_non_finalized_block_session( - &self, sender: &mut Sender, ) -> Result where @@ -254,7 +259,7 @@ impl RollingSessionWindow { { let session_index = get_session_index_for_child(sender, block_hash).await?; let earliest_non_finalized_block_session = - self.earliest_non_finalized_block_session(sender).await?; + Self::earliest_non_finalized_block_session(sender).await?; let old_window_start = self.earliest_session; @@ -420,6 +425,14 @@ mod tests { parent_hash: Default::default(), }; + let finalized_header = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: 0, + state_root: Default::default(), + parent_hash: Default::default(), + }; + let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::(pool.clone()); @@ -459,6 +472,37 @@ mod tests { } ); + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(finalized_header.number)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, finalized_header.number); + let _ = s_tx.send(Ok(Some(finalized_header.hash()))); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, finalized_header.hash()); + let _ = s_tx.send(Ok(session)); + } + ); + for i in expect_requests_from..=session { assert_matches!( handle.recv().await, @@ -586,6 +630,108 @@ mod tests { cache_session_info_test(0, 3, Some(window), 2); } + #[test] + fn any_session_stretch_for_unfinalized_chain() { + // Session index of the tip of our fake test chain. + let session: SessionIndex = 100; + let genesis_session: SessionIndex = 0; + + let header = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: 5, + state_root: Default::default(), + parent_hash: Default::default(), + }; + + let finalized_header = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: 0, + state_root: Default::default(), + parent_hash: Default::default(), + }; + + let pool = TaskExecutor::new(); + let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); + + let hash = header.hash(); + + let test_fut = { + let sender = ctx.sender().clone(); + Box::pin(async move { + let res = RollingSessionWindow::new(sender, TEST_WINDOW_SIZE, hash).await; + assert!(res.is_err()); + }) + }; + + let aux_fut = Box::pin(async move { + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, hash); + let _ = s_tx.send(Ok(session)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(finalized_header.number)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, finalized_header.number); + let _ = s_tx.send(Ok(Some(finalized_header.hash()))); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, finalized_header.hash()); + let _ = s_tx.send(Ok(0)); + } + ); + + // Unfinalized chain starts at geneisis block, so session 0 is how far we stretch. + for i in genesis_session..=session { + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionInfo(j, s_tx), + )) => { + assert_eq!(h, hash); + assert_eq!(i, j); + + let _ = s_tx.send(Ok(if i == session { + None + } else { + Some(dummy_session_info(i)) + })); + } + ); + } + }); + + futures::executor::block_on(futures::future::join(test_fut, aux_fut)); + } + #[test] fn any_session_unavailable_for_caching_means_no_change() { let session: SessionIndex = 6; @@ -599,6 +745,14 @@ mod tests { parent_hash: Default::default(), }; + let finalized_header = Header { + digest: Default::default(), + extrinsics_root: Default::default(), + number: 0, + state_root: Default::default(), + parent_hash: Default::default(), + }; + let pool = TaskExecutor::new(); let (mut ctx, mut handle) = make_subsystem_context::<(), _>(pool.clone()); @@ -624,6 +778,37 @@ mod tests { } ); + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(finalized_header.number)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, finalized_header.number); + let _ = s_tx.send(Ok(Some(finalized_header.hash()))); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, finalized_header.hash()); + let _ = s_tx.send(Ok(session)); + } + ); + for i in start_session..=session { assert_matches!( handle.recv().await, @@ -687,6 +872,37 @@ mod tests { } ); + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(header.number)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, header.number); + let _ = s_tx.send(Ok(Some(header.hash()))); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, header.hash()); + let _ = s_tx.send(Ok(session)); + } + ); + assert_matches!( handle.recv().await, AllMessages::RuntimeApi(RuntimeApiMessage::Request( From eb0695940a654c3587c1184342499b58bf773453 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 27 Sep 2022 08:06:43 +0000 Subject: [PATCH 4/6] review feedback Signed-off-by: Andrei Sandu --- node/subsystem-util/src/rolling_session_window.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/node/subsystem-util/src/rolling_session_window.rs b/node/subsystem-util/src/rolling_session_window.rs index 17b8a24739f9..0ff2dc6deb13 100644 --- a/node/subsystem-util/src/rolling_session_window.rs +++ b/node/subsystem-util/src/rolling_session_window.rs @@ -20,7 +20,7 @@ //! care about the state of particular blocks. pub use polkadot_node_primitives::{new_session_window_size, SessionWindowSize}; -use polkadot_primitives::v2::{Hash, SessionIndex, SessionInfo}; +use polkadot_primitives::v2::{BlockNumber, Hash, SessionIndex, SessionInfo}; use futures::channel::oneshot; use polkadot_node_subsystem::{ @@ -51,7 +51,7 @@ pub enum SessionsUnavailableReason { MissingLastFinalizedBlock, /// Missing last finalized block hash. #[error("Missing last finalized block hash")] - MissingLastFinalizedBlockHash, + MissingLastFinalizedBlockHash(BlockNumber), } /// Information about the sessions being fetched. @@ -196,7 +196,7 @@ impl RollingSessionWindow { }; let (tx, rx) = oneshot::channel(); - // We want to get the parent of the last finalized block. + // We want to get the session index for the child of the last finalized block. sender .send_message(ChainApiMessage::FinalizedBlockHash(last_finalized_height, tx)) .await; @@ -210,7 +210,9 @@ impl RollingSessionWindow { Err(err) => { gum::warn!(target: LOG_TARGET, ?err, "Failed fetching last finalized block hash"); return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash, + kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash( + last_finalized_height, + ), info: None, }) }, @@ -235,7 +237,9 @@ impl RollingSessionWindow { Ok(session) } else { return Err(SessionsUnavailable { - kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash, + kind: SessionsUnavailableReason::MissingLastFinalizedBlockHash( + last_finalized_height, + ), info: None, }) } From 88ac66f8255371e06d763964dbf7356fdde29cf8 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 27 Sep 2022 08:14:40 +0000 Subject: [PATCH 5/6] fix approval-voting tests Signed-off-by: Andrei Sandu --- node/core/approval-voting/src/import.rs | 32 +++++++++++++++++++++++++ node/core/approval-voting/src/tests.rs | 31 ++++++++++++++++++++++++ 2 files changed, 63 insertions(+) diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index d43bf40546ae..5413c271e0d6 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -1296,6 +1296,38 @@ pub(crate) mod tests { } ); + // Caching of sesssions needs sessoion of first unfinalied block. + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(header.number)); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, header.number); + let _ = s_tx.send(Ok(Some(header.hash()))); + } + ); + + assert_matches!( + handle.recv().await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, header.hash()); + let _ = s_tx.send(Ok(session)); + } + ); + // determine_new_blocks exits early as the parent_hash is in the DB assert_matches!( diff --git a/node/core/approval-voting/src/tests.rs b/node/core/approval-voting/src/tests.rs index 66d1402ed6dc..bdb7a8c929b3 100644 --- a/node/core/approval-voting/src/tests.rs +++ b/node/core/approval-voting/src/tests.rs @@ -807,6 +807,37 @@ async fn import_block( } ); + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(number)); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + block_number, + s_tx, + )) => { + assert_eq!(block_number, number); + let _ = s_tx.send(Ok(Some(hashes[number as usize].0))); + } + ); + + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, hashes[number as usize].0); + let _ = s_tx.send(Ok(number.into())); + } + ); + if !fork { assert_matches!( overseer_recv(overseer).await, From a7d97d085ff81ad1bea7bf2d7caf480564c35938 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 27 Sep 2022 08:44:14 +0000 Subject: [PATCH 6/6] grunting: dispute coordinator tests Signed-off-by: Andrei Sandu --- node/core/dispute-coordinator/src/tests.rs | 41 ++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index ff85319599ce..aaef00999259 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -239,13 +239,15 @@ impl TestState { ))) .await; - self.handle_sync_queries(virtual_overseer, block_hash, session).await; + self.handle_sync_queries(virtual_overseer, block_hash, block_number, session) + .await; } async fn handle_sync_queries( &mut self, virtual_overseer: &mut VirtualOverseer, block_hash: Hash, + block_number: BlockNumber, session: SessionIndex, ) { // Order of messages is not fixed (different on initializing): @@ -278,11 +280,45 @@ impl TestState { finished_steps.got_session_information = true; assert_eq!(h, block_hash); let _ = tx.send(Ok(session)); + + // Queries for fetching earliest unfinalized block session. See `RollingSessionWindow`. + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber( + s_tx, + )) => { + let _ = s_tx.send(Ok(block_number)); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockHash( + number, + s_tx, + )) => { + assert_eq!(block_number, number); + let _ = s_tx.send(Ok(Some(block_hash))); + } + ); + + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + h, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) => { + assert_eq!(h, block_hash); + let _ = s_tx.send(Ok(session)); + } + ); + // No queries, if subsystem knows about this session already. if self.known_session == Some(session) { continue } self.known_session = Some(session); + loop { // answer session info queries until the current session is reached. assert_matches!( @@ -361,7 +397,8 @@ impl TestState { ))) .await; - self.handle_sync_queries(virtual_overseer, *leaf, session).await; + self.handle_sync_queries(virtual_overseer, *leaf, n as BlockNumber, session) + .await; } }