Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pantsd] Address pantsd-runner hang on Linux and re-enable integration test. #4407

Merged
merged 6 commits into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/python/pants/bin/daemon_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def run(self):
"""Fork, daemonize and invoke self.post_fork_child() (via ProcessManager)."""
self.daemonize(write_pid=False)

def pre_fork(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, post_fork is called in two places... does this need to be as well?

"""Pre-fork callback executed via ProcessManager.daemonize()."""
if self._graph_helper:
self._graph_helper.scheduler.pre_fork()

def post_fork_child(self):
"""Post-fork child process callback executed via ProcessManager.daemonize()."""
# Set the Exiter exception hook post-fork so as not to affect the pantsd processes exception
Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,9 @@ def visualize_to_dir(self):
def to_keys(self, subjects):
return list(self._to_key(subject) for subject in subjects)

def pre_fork(self):
self._native.lib.scheduler_pre_fork(self._scheduler)

def post_fork(self):
self._native.lib.scheduler_post_fork(self._scheduler)

Expand Down Expand Up @@ -442,6 +445,9 @@ def _execution_add_roots(self, execution_request):
for subject, selector in execution_request.roots:
self._scheduler.add_root_selection(subject, selector)

def pre_fork(self):
self._scheduler.pre_fork()

def post_fork(self):
self._scheduler.post_fork()

Expand Down
1 change: 1 addition & 0 deletions src/python/pants/engine/subsystem/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
Buffer,
Buffer,
BufferBuffer);
void scheduler_pre_fork(Scheduler*);
void scheduler_post_fork(Scheduler*);
void scheduler_destroy(Scheduler*);

Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/subsystem/native_engine_version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7357cc56f4a01aca2b89f98706d41d9202802129
a780f41d9addd75db310272b010ae3ad65b08045
3 changes: 3 additions & 0 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ def _run(self):

def pre_fork(self):
"""Pre-fork() callback for ProcessManager.daemonize()."""
for service in self._services:
service.pre_fork()

# Teardown the RunTracker's SubprocPool pre-fork.
RunTracker.global_instance().shutdown_worker_pool()
# TODO(kwlzn): This currently aborts tracking of the remainder of the pants run that launched
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/pantsd/service/pants_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def is_killed(self):
"""
return self._kill_switch.is_set()

def pre_fork(self):
"""Called pre-fork, before `run` to allow for service->service or other side-effecting setup."""

def setup(self):
"""Called before `run` to allow for service->service or other side-effecting setup."""

Expand Down
4 changes: 4 additions & 0 deletions src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ def change_calculator(self):
"""Surfaces the change calculator."""
return self._graph_helper.change_calculator

def pre_fork(self):
"""Pre-fork controls."""
self._scheduler.pre_fork()

def setup(self):
"""Service setup."""
# Register filesystem event handlers on an FSEventService instance.
Expand Down
18 changes: 12 additions & 6 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct Core {
pub vfs: PosixFS,
// TODO: This is a second pool (relative to the VFS pool), upon which all work is
// submitted. See https://github.com/pantsbuild/pants/issues/4298
pool: RwLock<CpuPool>,
pool: RwLock<Option<CpuPool>>,
}

impl Core {
Expand Down Expand Up @@ -66,11 +66,11 @@ impl Core {
.unwrap_or_else(|e| {
panic!("Could not initialize VFS: {:?}", e);
}),
pool: RwLock::new(Core::create_pool()),
pool: RwLock::new(Some(Core::create_pool())),
}
}

pub fn pool(&self) -> RwLockReadGuard<CpuPool> {
pub fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
self.pool.read().unwrap()
}

Expand All @@ -80,6 +80,12 @@ impl Core {
.create()
}

pub fn pre_fork(&self) {
self.vfs.pre_fork();
let mut pool = self.pool.write().unwrap();
*pool = None;
}

/**
* Reinitializes a Core in a new process (basically, recreates its CpuPool).
*/
Expand All @@ -88,7 +94,7 @@ impl Core {
self.vfs.post_fork();
// And our own.
let mut pool = self.pool.write().unwrap();
*pool = Core::create_pool();
*pool = Some(Core::create_pool());
}
}

Expand All @@ -109,7 +115,7 @@ impl Context {

pub trait ContextFactory {
fn create(&self, entry_id: EntryId) -> Context;
fn pool(&self) -> RwLockReadGuard<CpuPool>;
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>>;
}

impl ContextFactory for Context {
Expand All @@ -124,7 +130,7 @@ impl ContextFactory for Context {
}
}

fn pool(&self) -> RwLockReadGuard<CpuPool> {
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
self.core.pool()
}
}
67 changes: 39 additions & 28 deletions src/rust/engine/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ struct PathGlobsExpansion<T: Sized> {
pub struct PosixFS {
build_root: Dir,
// The pool needs to be reinitialized after a fork, so it is protected by a lock.
pool: RwLock<CpuPool>,
pool: RwLock<Option<CpuPool>>,
ignore: Gitignore,
}

Expand All @@ -285,7 +285,7 @@ impl PosixFS {
build_root: PathBuf,
ignore_patterns: Vec<String>,
) -> Result<PosixFS, String> {
let pool = RwLock::new(PosixFS::create_pool());
let pool = RwLock::new(Some(PosixFS::create_pool()));
let canonical_build_root =
build_root.canonicalize().and_then(|canonical|
canonical.metadata().and_then(|metadata|
Expand Down Expand Up @@ -346,13 +346,18 @@ impl PosixFS {
Ok(stats)
}

fn pool(&self) -> RwLockReadGuard<CpuPool> {
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
self.pool.read().unwrap()
}

pub fn pre_fork(&self) {
let mut pool = self.pool.write().unwrap();
*pool = None;
}

pub fn post_fork(&self) {
let mut pool = self.pool.write().unwrap();
*pool = PosixFS::create_pool();
*pool = Some(PosixFS::create_pool());
}

pub fn ignore<P: AsRef<Path>>(&self, path: P, is_dir: bool) -> bool {
Expand All @@ -365,7 +370,8 @@ impl PosixFS {
pub fn read_link(&self, link: &Link) -> BoxFuture<PathBuf, io::Error> {
let link_parent = link.0.parent().map(|p| p.to_owned());
let link_abs = self.build_root.0.join(link.0.as_path()).to_owned();
self.pool()
let pool = self.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
link_abs
.read_link()
Expand Down Expand Up @@ -393,7 +399,8 @@ impl PosixFS {
pub fn scandir(&self, dir: &Dir) -> BoxFuture<Vec<Stat>, io::Error> {
let dir = dir.to_owned();
let dir_abs = self.build_root.0.join(dir.0.as_path());
self.pool()
let pool = self.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
PosixFS::scandir_sync(dir, dir_abs)
})
Expand Down Expand Up @@ -839,24 +846,26 @@ impl Snapshots {
let build_root = fs.build_root.clone();
let temp_path = self.next_temp_path().expect("Couldn't get the next temp path.");

fs.pool().spawn_fn(move || {
// Write the tar deterministically to a temporary file while fingerprinting.
let fingerprint =
Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &build_root)?;

// Rename to the final path if it does not already exist.
Snapshots::finalize(
temp_path.as_path(),
Snapshots::path_under_for(&dest_dir, &fingerprint).as_path()
)?;
let pool = fs.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
// Write the tar deterministically to a temporary file while fingerprinting.
let fingerprint =
Snapshots::tar_create_fingerprinted(temp_path.as_path(), &paths, &build_root)?;

// Rename to the final path if it does not already exist.
Snapshots::finalize(
temp_path.as_path(),
Snapshots::path_under_for(&dest_dir, &fingerprint).as_path()
)?;

Ok(
Snapshot {
fingerprint: fingerprint,
path_stats: paths,
}
)
})
Ok(
Snapshot {
fingerprint: fingerprint,
path_stats: paths,
}
)
})
}

fn contents_for_sync(snapshot: Snapshot, path: PathBuf) -> Result<Vec<FileContent>, io::Error> {
Expand Down Expand Up @@ -887,10 +896,12 @@ impl Snapshots {

pub fn contents_for(&self, fs: &PosixFS, snapshot: Snapshot) -> CpuFuture<Vec<FileContent>, String> {
let archive_path = self.path_for(&snapshot.fingerprint);
fs.pool().spawn_fn(move || {
let snapshot_str = format!("{:?}", snapshot);
Snapshots::contents_for_sync(snapshot, archive_path)
.map_err(|e| format!("Failed to open Snapshot {}: {:?}", snapshot_str, e))
})
let pool = fs.pool();
pool.as_ref().expect("Uninitialized CpuPool!")
.spawn_fn(move || {
let snapshot_str = format!("{:?}", snapshot);
Snapshots::contents_for_sync(snapshot, archive_path)
.map_err(|e| format!("Failed to open Snapshot {}: {:?}", snapshot_str, e))
})
}
}
3 changes: 2 additions & 1 deletion src/rust/engine/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ impl Entry {
let state =
match self.node.clone() {
EntryKey::Valid(n) => {
let pool = context_factory.pool();
let pool_opt = context_factory.pool();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One line for this is probably fine.

Copy link
Member Author

@kwlzn kwlzn Mar 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not according to the compiler.

error: borrowed value does not live long enough
   --> src/rust/engine/src/graph.rs:115:88
    |
115 |             let pool = context_factory.pool().as_ref().expect("Uninitialized CpuPool!");
    |                        ---------------------- temporary value created here             ^ temporary value dropped here while still borrowed
...
122 |           },
    |           - temporary value needs to live until here
    |
    = note: consider using a `let` binding to increase its lifetime

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Yea, makes sense. Thanks.

let pool = pool_opt.as_ref().expect("Uninitialized CpuPool!");
let context = context_factory.create(entry_id);
pool
.spawn_fn(move || {
Expand Down
7 changes: 7 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,13 @@ pub extern fn scheduler_create(
)
}

#[no_mangle]
pub extern fn scheduler_pre_fork(scheduler_ptr: *mut Scheduler) {
with_scheduler(scheduler_ptr, |scheduler| {
scheduler.core.pre_fork();
})
}

#[no_mangle]
pub extern fn scheduler_post_fork(scheduler_ptr: *mut Scheduler) {
with_scheduler(scheduler_ptr, |scheduler| {
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl ContextFactory for Arc<Core> {
Context::new(entry_id, self.clone())
}

fn pool(&self) -> RwLockReadGuard<CpuPool> {
fn pool(&self) -> RwLockReadGuard<Option<CpuPool>> {
Core::pool(self)
}
}
Expand Down
2 changes: 0 additions & 2 deletions tests/python/pants_test/pantsd/test_pantsd_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
unicode_literals, with_statement)

import os
import unittest
from contextlib import contextmanager

from pants.pantsd.process_manager import ProcessManager
Expand Down Expand Up @@ -78,7 +77,6 @@ def test_pantsd_run(self):
for line in read_pantsd_log(workdir):
print(line)

@unittest.skip('TODO: See https://github.com/pantsbuild/pants/issues/4301')
def test_pantsd_run_with_watchman(self):
config = {'pantsd': {'fs_event_detection': True},
# The absolute paths in CI can exceed the UNIX socket path limitation
Expand Down