Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak/growth when creating many runtimes #3564

Merged
merged 20 commits into from
Mar 16, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio/src/io/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl<E: Source> PollEvented<E> {
}

/// Returns a reference to the registration
#[cfg(any(feature = "net", all(unix, feature = "process"), feature = "signal",))]
pub(crate) fn registration(&self) -> &Registration {
&self.registration
}
Expand Down
60 changes: 18 additions & 42 deletions tokio/src/process/unix/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::park::Park;
use crate::process::unix::orphan::ReapOrphanQueue;
use crate::process::unix::GlobalOrphanQueue;
use crate::signal::unix::driver::Driver as SignalDriver;
use crate::signal::unix::{signal_with_handle, InternalStream, Signal, SignalKind};
use crate::sync::mpsc::error::TryRecvError;
use crate::signal::unix::{signal_with_handle, SignalKind};
use crate::sync::watch;

use std::io;
use std::time::Duration;
Expand All @@ -16,7 +16,7 @@ use std::time::Duration;
#[derive(Debug)]
pub(crate) struct Driver {
park: SignalDriver,
inner: CoreDriver<Signal, GlobalOrphanQueue>,
inner: CoreDriver<watch::Receiver<()>, GlobalOrphanQueue>,
}

#[derive(Debug)]
Expand All @@ -25,27 +25,25 @@ struct CoreDriver<S, Q> {
orphan_queue: Q,
}

trait HasChanged {
fn has_changed(&mut self) -> bool;
}

impl<T> HasChanged for watch::Receiver<T> {
fn has_changed(&mut self) -> bool {
self.try_has_changed().and_then(Result::ok).is_some()
}
}

// ===== impl CoreDriver =====

impl<S, Q> CoreDriver<S, Q>
where
S: InternalStream,
S: HasChanged,
Q: ReapOrphanQueue,
{
fn got_signal(&mut self) -> bool {
match self.sigchild.try_recv() {
Ok(()) => true,
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Closed) => panic!("signal was deregistered"),
}
}

fn process(&mut self) {
if self.got_signal() {
// Drain all notifications which may have been buffered
// so we can try to reap all orphans in one batch
while self.got_signal() {}

if self.sigchild.has_changed() {
self.orphan_queue.reap_orphans();
}
}
Expand Down Expand Up @@ -97,8 +95,6 @@ impl Park for Driver {
mod test {
use super::*;
use crate::process::unix::orphan::test::MockQueue;
use crate::sync::mpsc::error::TryRecvError;
use std::task::{Context, Poll};

struct MockStream {
total_try_recv: usize,
Expand All @@ -114,17 +110,10 @@ mod test {
}
}

impl InternalStream for MockStream {
fn poll_recv(&mut self, _cx: &mut Context<'_>) -> Poll<Option<()>> {
unimplemented!();
}

fn try_recv(&mut self) -> Result<(), TryRecvError> {
impl HasChanged for MockStream {
fn has_changed(&mut self) -> bool {
self.total_try_recv += 1;
match self.values.remove(0) {
Some(()) => Ok(()),
None => Err(TryRecvError::Empty),
}
self.values.remove(0).is_some()
}
}

Expand All @@ -140,17 +129,4 @@ mod test {
assert_eq!(1, driver.sigchild.total_try_recv);
assert_eq!(0, driver.orphan_queue.total_reaps.get());
}

#[test]
fn coalesce_signals_before_reaping() {
let mut driver = CoreDriver {
sigchild: MockStream::new(vec![Some(()), Some(()), None]),
orphan_queue: MockQueue::<()>::new(),
};

driver.process();

assert_eq!(3, driver.sigchild.total_try_recv);
assert_eq!(1, driver.orphan_queue.total_reaps.get());
}
}
17 changes: 6 additions & 11 deletions tokio/src/process/unix/reap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::task::Poll;
#[derive(Debug)]
pub(crate) struct Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
inner: Option<W>,
Expand All @@ -25,7 +25,7 @@ where

impl<W, Q, S> Deref for Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
type Target = W;
Expand All @@ -37,7 +37,7 @@ where

impl<W, Q, S> Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
pub(crate) fn new(inner: W, orphan_queue: Q, signal: S) -> Self {
Expand All @@ -61,7 +61,7 @@ impl<W, Q, S> Future for Reaper<W, Q, S>
where
W: Wait + Unpin,
Q: OrphanQueue<W> + Unpin,
S: InternalStream,
S: InternalStream + Unpin,
{
type Output = io::Result<ExitStatus>;

Expand Down Expand Up @@ -106,7 +106,7 @@ where

impl<W, Q, S> Kill for Reaper<W, Q, S>
where
W: Kill + Wait + Unpin,
W: Kill + Wait,
Q: OrphanQueue<W>,
{
fn kill(&mut self) -> io::Result<()> {
Expand All @@ -116,7 +116,7 @@ where

impl<W, Q, S> Drop for Reaper<W, Q, S>
where
W: Wait + Unpin,
W: Wait,
Q: OrphanQueue<W>,
{
fn drop(&mut self) {
Expand All @@ -134,7 +134,6 @@ mod test {
use super::*;

use crate::process::unix::orphan::test::MockQueue;
use crate::sync::mpsc::error::TryRecvError;
use futures::future::FutureExt;
use std::os::unix::process::ExitStatusExt;
use std::process::ExitStatus;
Expand Down Expand Up @@ -206,10 +205,6 @@ mod test {
None => Poll::Pending,
}
}

fn try_recv(&mut self) -> Result<(), TryRecvError> {
unimplemented!();
}
}

#[test]
Expand Down
40 changes: 40 additions & 0 deletions tokio/src/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
//! }
//! # }
//! ```
use crate::sync::watch::Receiver;
use std::task::{Context, Poll};

mod ctrl_c;
pub use ctrl_c::ctrl_c;
Expand All @@ -58,3 +60,41 @@ mod os {

pub mod unix;
pub mod windows;

mod reusable_box;
use self::reusable_box::ReusableBoxFuture;

#[derive(Debug)]
struct RxFuture {
inner: ReusableBoxFuture<Receiver<()>>,
}

async fn make_future(mut rx: Receiver<()>) -> Receiver<()> {
match rx.changed().await {
Ok(()) => rx,
Err(_) => panic!("signal sender went away"),
}
}

impl RxFuture {
fn new(rx: Receiver<()>) -> Self {
Self {
inner: ReusableBoxFuture::new(make_future(rx)),
}
}

async fn recv(&mut self) -> Option<()> {
use crate::future::poll_fn;
poll_fn(|cx| self.poll_recv(cx)).await
}

fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
match self.inner.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(rx) => {
self.inner.set(make_future(rx));
Poll::Ready(Some(()))
}
}
}
}
Loading