Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Subsystem::start takes self by-value #1325

Merged
merged 2 commits into from
Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions node/network/bridge/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ impl Network for Arc<sc_network::NetworkService<Block, Hash>> {
}

/// The network bridge subsystem.
pub struct NetworkBridge<N>(Option<N>);
pub struct NetworkBridge<N>(N);

impl<N> NetworkBridge<N> {
/// Create a new network bridge subsystem with underlying network service.
///
/// This assumes that the network service has had the notifications protocol for the network
/// bridge already registered. See [`notifications_protocol_info`](notifications_protocol_info).
pub fn new(net_service: N) -> Self {
NetworkBridge(Some(net_service))
NetworkBridge(net_service)
}
}

Expand All @@ -181,18 +181,10 @@ impl<Net, Context> Subsystem<Context> for NetworkBridge<Net>
Net: Network,
Context: SubsystemContext<Message=NetworkBridgeMessage>,
{
fn start(&mut self, mut ctx: Context) -> SpawnedSubsystem {
SpawnedSubsystem(match self.0.take() {
None => async move { for _ in ctx.recv().await { } }.boxed(),
Some(net) => {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
run_network(net, ctx).map(|_| ()).boxed()
}
})



fn start(self, ctx: Context) -> SpawnedSubsystem {
// Swallow error because failure is fatal to the node and we log with more precision
// within `run_network`.
SpawnedSubsystem(run_network(self.0, ctx).map(|_| ()).boxed())
}
}

Expand Down
8 changes: 4 additions & 4 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Subsystem1 {
impl<C> Subsystem<C> for Subsystem1
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, ctx: C) -> SpawnedSubsystem {
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
Expand Down Expand Up @@ -111,7 +111,7 @@ impl Subsystem2 {
impl<C> Subsystem<C> for Subsystem2
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(&mut self, ctx: C) -> SpawnedSubsystem {
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
Self::run(ctx).await;
}))
Expand All @@ -129,8 +129,8 @@ fn main() {

let (overseer, _handler) = Overseer::new(
vec![],
Box::new(Subsystem2),
Box::new(Subsystem1),
Subsystem2,
Subsystem1,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
Expand Down
46 changes: 23 additions & 23 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ pub type CompatibleSubsystem<M> = Box<dyn Subsystem<OverseerSubsystemContext<M>>
/// [`Subsystem`]: trait.Subsystem.html
#[allow(dead_code)]
struct OverseenSubsystem<M> {
subsystem: CompatibleSubsystem<M>,
instance: Option<SubsystemInstance<M>>,
}

Expand Down Expand Up @@ -407,7 +406,7 @@ where
/// where C: SubsystemContext<Message=CandidateValidationMessage>
/// {
/// fn start(
/// &mut self,
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
Expand All @@ -423,7 +422,7 @@ where
/// where C: SubsystemContext<Message=CandidateBackingMessage>
/// {
/// fn start(
/// &mut self,
/// self,
/// mut ctx: C,
/// ) -> SpawnedSubsystem {
/// SpawnedSubsystem(Box::pin(async move {
Expand All @@ -438,8 +437,8 @@ where
/// let spawner = executor::ThreadPool::new().unwrap();
/// let (overseer, _handler) = Overseer::new(
/// vec![],
/// Box::new(ValidationSubsystem),
/// Box::new(CandidateBackingSubsystem),
/// ValidationSubsystem,
/// CandidateBackingSubsystem,
/// spawner,
/// ).unwrap();
///
Expand All @@ -458,8 +457,8 @@ where
/// ```
pub fn new(
leaves: impl IntoIterator<Item = BlockInfo>,
validation: CompatibleSubsystem<CandidateValidationMessage>,
candidate_backing: CompatibleSubsystem<CandidateBackingMessage>,
validation: impl Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
candidate_backing: impl Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)> {
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);
Expand Down Expand Up @@ -658,7 +657,7 @@ fn spawn<S: Spawn, M: Send + 'static>(
spawner: &mut S,
futures: &mut FuturesUnordered<RemoteHandle<()>>,
streams: &mut StreamUnordered<mpsc::Receiver<ToOverseer>>,
mut s: CompatibleSubsystem<M>,
s: impl Subsystem<OverseerSubsystemContext<M>>,
) -> SubsystemResult<OverseenSubsystem<M>> {
let (to_tx, to_rx) = mpsc::channel(CHANNEL_CAPACITY);
let (from_tx, from_rx) = mpsc::channel(CHANNEL_CAPACITY);
Expand All @@ -675,7 +674,6 @@ fn spawn<S: Spawn, M: Send + 'static>(
});

Ok(OverseenSubsystem {
subsystem: s,
instance,
})
}
Expand All @@ -692,8 +690,8 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem1
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0;
SpawnedSubsystem(Box::pin(async move {
let mut i = 0;
loop {
Expand All @@ -717,8 +715,10 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem2
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let sender = self.0.clone();
SpawnedSubsystem(Box::pin(async move {
let _sender = sender;
let mut c: usize = 0;
loop {
if c < 10 {
Expand Down Expand Up @@ -759,7 +759,7 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem4
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, mut _ctx: C) -> SpawnedSubsystem {
fn start(self, mut _ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
// Do nothing and exit.
}))
Expand All @@ -777,8 +777,8 @@ mod tests {

let (overseer, mut handler) = Overseer::new(
vec![],
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem2(s2_tx)),
TestSubsystem1(s1_tx),
TestSubsystem2(s2_tx),
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
Expand Down Expand Up @@ -827,8 +827,8 @@ mod tests {
let (s1_tx, _) = mpsc::channel(64);
let (overseer, _handle) = Overseer::new(
vec![],
Box::new(TestSubsystem1(s1_tx)),
Box::new(TestSubsystem4),
TestSubsystem1(s1_tx),
TestSubsystem4,
spawner,
).unwrap();
let overseer_fut = overseer.run().fuse();
Expand All @@ -846,7 +846,7 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem5
where C: SubsystemContext<Message=CandidateValidationMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();

SpawnedSubsystem(Box::pin(async move {
Expand All @@ -872,7 +872,7 @@ mod tests {
impl<C> Subsystem<C> for TestSubsystem6
where C: SubsystemContext<Message=CandidateBackingMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
fn start(self, mut ctx: C) -> SpawnedSubsystem {
let mut sender = self.0.clone();

SpawnedSubsystem(Box::pin(async move {
Expand Down Expand Up @@ -925,8 +925,8 @@ mod tests {

let (overseer, mut handler) = Overseer::new(
vec![first_block],
Box::new(TestSubsystem5(tx_5)),
Box::new(TestSubsystem6(tx_6)),
TestSubsystem5(tx_5),
TestSubsystem6(tx_6),
spawner,
).unwrap();

Expand Down Expand Up @@ -1010,8 +1010,8 @@ mod tests {
// start with two forks of different height.
let (overseer, mut handler) = Overseer::new(
vec![first_block, second_block],
Box::new(TestSubsystem5(tx_5)),
Box::new(TestSubsystem6(tx_6)),
TestSubsystem5(tx_5),
TestSubsystem6(tx_6),
spawner,
).unwrap();

Expand Down
8 changes: 4 additions & 4 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ struct CandidateValidationSubsystem;
impl<C> Subsystem<C> for CandidateValidationSubsystem
where C: SubsystemContext<Message = CandidateValidationMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
Expand All @@ -284,7 +284,7 @@ struct CandidateBackingSubsystem;
impl<C> Subsystem<C> for CandidateBackingSubsystem
where C: SubsystemContext<Message = CandidateBackingMessage>
{
fn start(&mut self, mut ctx: C) -> SpawnedSubsystem {
fn start(self, mut ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem(Box::pin(async move {
while let Ok(_) = ctx.recv().await {}
}))
Expand All @@ -295,8 +295,8 @@ fn real_overseer<S: futures::task::Spawn>(
leaves: impl IntoIterator<Item = BlockInfo>,
s: S,
) -> Result<(Overseer<S>, OverseerHandler), ServiceError> {
let validation = Box::new(CandidateValidationSubsystem);
let candidate_backing = Box::new(CandidateBackingSubsystem);
let validation = CandidateValidationSubsystem;
let candidate_backing = CandidateBackingSubsystem;
Overseer::new(leaves, validation, candidate_backing, s)
.map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e)))
}
Expand Down
2 changes: 1 addition & 1 deletion node/subsystem/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,5 +146,5 @@ pub trait SubsystemContext: Send + 'static {
/// [`Subsystem`]: trait.Subsystem.html
pub trait Subsystem<C: SubsystemContext> {
/// Start this `Subsystem` and return `SpawnedSubsystem`.
fn start(&mut self, ctx: C) -> SpawnedSubsystem;
fn start(self, ctx: C) -> SpawnedSubsystem;
}