This repository has been archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add test suite and minor refinements to the utility subsystem #1403
Merged
Merged
Changes from 4 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
c72e59c
get conclude signal working properly; don't allocate a vector
coriolinus 00bbe47
wip: add test suite / example / explanation for using utility subsystem
coriolinus 837a62a
explicitly import everything
coriolinus 5601550
fix subsystem-util test
coriolinus 3ff4f66
fully commit to moving test helpers into a subsystem module
coriolinus 356c26f
add some more tests
coriolinus f0c23de
get rid of log tests in favor of real error forwarding
coriolinus 68a745d
Merge remote-tracking branch 'origin/master' into prgn-subsystem-util…
coriolinus 66e7d50
fix issue which caused test to hang on osx
coriolinus a9b49af
only require that job errors are PartialEq when testing
coriolinus 812b35a
get rid of any notion of partialeq
coriolinus 7e758b5
rethink testing
coriolinus 6672e2a
rename fwd_errors -> forward_errors
coriolinus 6b056bf
warn on error propagation failure
coriolinus f3aa719
Merge remote-tracking branch 'origin/master' into prgn-subsystem-util…
coriolinus bcd0a6d
fix unused import leftover from merge
coriolinus 5b464b0
derive eq for subsystemerror
coriolinus File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,229 @@ | ||
// Copyright 2017-2020 Parity Technologies (UK) Ltd. | ||
// This file is part of Polkadot. | ||
|
||
// Polkadot is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// Polkadot is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>. | ||
|
||
//! Utilities for testing subsystems. | ||
|
||
use crate::{SubsystemContext, FromOverseer, SubsystemResult, SubsystemError}; | ||
use crate::messages::AllMessages; | ||
|
||
use futures::prelude::*; | ||
use futures::channel::mpsc; | ||
use futures::task::{Spawn, SpawnExt}; | ||
use futures::poll; | ||
use parking_lot::Mutex; | ||
|
||
use std::convert::Infallible; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
use std::task::{Context, Poll, Waker}; | ||
|
||
enum SinkState<T> { | ||
Empty { | ||
read_waker: Option<Waker>, | ||
}, | ||
Item { | ||
item: T, | ||
ready_waker: Option<Waker>, | ||
flush_waker: Option<Waker>, | ||
}, | ||
} | ||
|
||
/// The sink half of a single-item sink that does not resolve until the item has been read. | ||
pub struct SingleItemSink<T>(Arc<Mutex<SinkState<T>>>); | ||
|
||
/// The stream half of a single-item sink. | ||
pub struct SingleItemStream<T>(Arc<Mutex<SinkState<T>>>); | ||
|
||
impl<T> Sink<T> for SingleItemSink<T> { | ||
type Error = Infallible; | ||
|
||
fn poll_ready( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context, | ||
) -> Poll<Result<(), Infallible>> { | ||
let mut state = self.0.lock(); | ||
match *state { | ||
SinkState::Empty { .. } => Poll::Ready(Ok(())), | ||
SinkState::Item { ref mut ready_waker, .. } => { | ||
*ready_waker = Some(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
} | ||
} | ||
|
||
fn start_send( | ||
self: Pin<&mut Self>, | ||
item: T, | ||
) -> Result<(), Infallible> { | ||
let mut state = self.0.lock(); | ||
|
||
match *state { | ||
SinkState::Empty { ref mut read_waker } => { | ||
if let Some(waker) = read_waker.take() { | ||
waker.wake(); | ||
} | ||
} | ||
_ => panic!("start_send called outside of empty sink state ensured by poll_ready"), | ||
} | ||
|
||
*state = SinkState::Item { | ||
item, | ||
ready_waker: None, | ||
flush_waker: None, | ||
}; | ||
|
||
Ok(()) | ||
} | ||
|
||
fn poll_flush( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context, | ||
) -> Poll<Result<(), Infallible>> { | ||
let mut state = self.0.lock(); | ||
match *state { | ||
SinkState::Empty { .. } => Poll::Ready(Ok(())), | ||
SinkState::Item { ref mut flush_waker, .. } => { | ||
*flush_waker = Some(cx.waker().clone()); | ||
Poll::Pending | ||
} | ||
} | ||
} | ||
|
||
fn poll_close( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context, | ||
) -> Poll<Result<(), Infallible>> { | ||
self.poll_flush(cx) | ||
} | ||
} | ||
|
||
impl<T> Stream for SingleItemStream<T> { | ||
type Item = T; | ||
|
||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||
let mut state = self.0.lock(); | ||
|
||
let read_waker = Some(cx.waker().clone()); | ||
|
||
match std::mem::replace(&mut *state, SinkState::Empty { read_waker }) { | ||
SinkState::Empty { .. } => Poll::Pending, | ||
SinkState::Item { item, ready_waker, flush_waker } => { | ||
if let Some(waker) = ready_waker { | ||
waker.wake(); | ||
} | ||
|
||
if let Some(waker) = flush_waker { | ||
waker.wake(); | ||
} | ||
|
||
Poll::Ready(Some(item)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Create a single-item Sink/Stream pair. | ||
/// | ||
/// The sink's send methods resolve at the point which the stream reads the item, | ||
/// not when the item is buffered. | ||
pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) { | ||
let inner = Arc::new(Mutex::new(SinkState::Empty { read_waker: None })); | ||
( | ||
SingleItemSink(inner.clone()), | ||
SingleItemStream(inner), | ||
) | ||
} | ||
|
||
/// A test subsystem context. | ||
pub struct TestSubsystemContext<M, S> { | ||
tx: mpsc::UnboundedSender<AllMessages>, | ||
rx: SingleItemStream<FromOverseer<M>>, | ||
spawn: S, | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl<M: Send + 'static, S: Spawn + Send + 'static> SubsystemContext for TestSubsystemContext<M, S> { | ||
type Message = M; | ||
|
||
async fn try_recv(&mut self) -> Result<Option<FromOverseer<M>>, ()> { | ||
match poll!(self.rx.next()) { | ||
Poll::Ready(Some(msg)) => Ok(Some(msg)), | ||
Poll::Ready(None) => Err(()), | ||
Poll::Pending => Ok(None), | ||
} | ||
} | ||
|
||
async fn recv(&mut self) -> SubsystemResult<FromOverseer<M>> { | ||
self.rx.next().await.ok_or(SubsystemError) | ||
} | ||
|
||
async fn spawn(&mut self, s: Pin<Box<dyn Future<Output = ()> + Send>>) -> SubsystemResult<()> { | ||
self.spawn.spawn(s).map_err(Into::into) | ||
} | ||
|
||
async fn send_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { | ||
self.tx.send(msg).await.expect("test overseer no longer live"); | ||
Ok(()) | ||
} | ||
|
||
async fn send_messages<T>(&mut self, msgs: T) -> SubsystemResult<()> | ||
where T: IntoIterator<Item = AllMessages> + Send, T::IntoIter: Send | ||
{ | ||
let mut iter = stream::iter(msgs.into_iter().map(Ok)); | ||
self.tx.send_all(&mut iter).await.expect("test overseer no longer live"); | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
/// A handle for interacting with the subsystem context. | ||
pub struct TestSubsystemContextHandle<M> { | ||
tx: SingleItemSink<FromOverseer<M>>, | ||
rx: mpsc::UnboundedReceiver<AllMessages>, | ||
} | ||
|
||
impl<M> TestSubsystemContextHandle<M> { | ||
/// Send a message or signal to the subsystem. This resolves at the point in time where the | ||
/// subsystem has _read_ the message. | ||
pub async fn send(&mut self, from_overseer: FromOverseer<M>) { | ||
self.tx.send(from_overseer).await.expect("Test subsystem no longer live"); | ||
} | ||
|
||
/// Receive the next message from the subsystem. | ||
pub async fn recv(&mut self) -> AllMessages { | ||
self.rx.next().await.expect("Test subsystem no longer live") | ||
} | ||
} | ||
|
||
/// Make a test subsystem context. | ||
pub fn make_subsystem_context<M, S>(spawn: S) | ||
-> (TestSubsystemContext<M, S>, TestSubsystemContextHandle<M>) | ||
{ | ||
let (overseer_tx, overseer_rx) = single_item_sink(); | ||
let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); | ||
|
||
( | ||
TestSubsystemContext { | ||
tx: all_messages_tx, | ||
rx: overseer_rx, | ||
spawn, | ||
}, | ||
TestSubsystemContextHandle { | ||
tx: overseer_tx, | ||
rx: all_messages_rx | ||
}, | ||
) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want
#[cfg(any(test, feature = "test-helpers"))]
here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No: this is only used within the internal unit tests of the job manager. If we come up with a use for it later, we can add it then. Until then, I think YAGNI is the better part of valor.