Skip to content

Commit

Permalink
Add AggregationRule
Browse files Browse the repository at this point in the history
  • Loading branch information
stuhood committed Sep 9, 2018
1 parent 3b95c02 commit 25322b1
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
void tasks_add_select(Tasks*, TypeConstraint);
void tasks_add_select_variant(Tasks*, TypeConstraint, Buffer);
void tasks_task_end(Tasks*);
void tasks_aggregation_add(Tasks*, Function, TypeConstraint);
void tasks_singleton_add(Tasks*, Handle, TypeConstraint);
void tasks_destroy(Tasks*);
Expand Down
23 changes: 23 additions & 0 deletions src/python/pants/engine/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ def __str__(self):
self.func.__name__)


class AggregationRule(datatype(['output_constraint', 'func']), Rule):
"""A rule that receives the results of all other rules for a product to aggregate them.
An AggregationRule supports composability of @rules by allowing additional rules to be installed
to provide some type without removing or otherwise modifying the installed rules.
"""

def __new__(cls, output_type, func):
# Validate result type.
if isinstance(output_type, Exactly):
constraint = output_type
elif isinstance(output_type, type):
constraint = Exactly(output_type)
else:
raise TypeError("Expected an output_type for rule; got: {}".format(output_type))

return super(AggregationRule, cls).__new__(cls, constraint, func)

def __repr__(self):
return '{}({}, {})'.format(
type(self).__name__, type_or_constraint_repr(self.output_constraint), self.func.__name__)


class SingletonRule(datatype(['output_constraint', 'value']), Rule):
"""A default rule for a product, which is thus a singleton for that product."""

Expand Down
16 changes: 14 additions & 2 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pants.engine.isolated_process import ExecuteProcessRequest, FallibleExecuteProcessResult
from pants.engine.native import Function, TypeConstraint, TypeId
from pants.engine.nodes import Return, State, Throw
from pants.engine.rules import RuleIndex, SingletonRule, TaskRule
from pants.engine.rules import AggregationRule, RuleIndex, SingletonRule, TaskRule
from pants.engine.selectors import Select, SelectVariant, constraint_for
from pants.engine.struct import HasProducts, Variants
from pants.util.contextutil import temporary_file_path
Expand Down Expand Up @@ -212,13 +212,25 @@ def _register_rules(self, rule_index):
continue
registered.add(key)

if type(rule) is SingletonRule:
if type(rule) is AggregationRule:
self._register_aggregation(output_constraint, rule)
elif type(rule) is SingletonRule:
self._register_singleton(output_constraint, rule)
elif type(rule) is TaskRule:
self._register_task(output_constraint, rule)
else:
raise ValueError('Unexpected Rule type: {}'.format(rule))

def _register_aggregation(self, output_constraint, rule):
"""Register the given AggregationRule.
An AggregationRule installed for a type merges the results of all other providers of that type.
"""
func = Function(self._to_key(rule.func))
self._native.lib.tasks_aggregation_add(self._tasks,
func,
output_constraint)

def _register_singleton(self, output_constraint, rule):
"""Register the given SingletonRule.
Expand Down
11 changes: 11 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ pub extern "C" fn tasks_create() -> *const Tasks {
Box::into_raw(Box::new(Tasks::new()))
}

#[no_mangle]
pub extern "C" fn tasks_aggregation_add(
tasks_ptr: *mut Tasks,
func: Function,
output_constraint: TypeConstraint,
) {
with_tasks(tasks_ptr, |tasks| {
tasks.aggregation_add(func, output_constraint);
})
}

#[no_mangle]
pub extern "C" fn tasks_singleton_add(
tasks_ptr: *mut Tasks,
Expand Down
18 changes: 18 additions & 0 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,24 @@ impl Select {
}
}
}
&rule_graph::Entry::WithDeps(rule_graph::EntryWithDeps::Aggregation(ref a)) => {
let edges = context
.core
.rule_graph
.edges_for_inner(&self.entry)
.expect("Expected edges to exist for Aggregation.");
let deps = edges
.all_dependencies()
.map(|dep_entry| {
Select::new_with_entries(a.product, self.params.clone(), dep_entry.clone())
.run(context.clone())
})
.collect::<Vec<_>>();
let func_val = externs::val_for(&a.func.0);
future::join_all(deps)
.and_then(move |dep_values| externs::call(&func_val, &dep_values))
.to_boxed()
}
&rule_graph::Entry::WithDeps(rule_graph::EntryWithDeps::Root(_))
| &rule_graph::Entry::Param(_)
| &rule_graph::Entry::Singleton { .. } => {
Expand Down
93 changes: 77 additions & 16 deletions src/rust/engine/src/rule_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ impl UnreachableError {

#[derive(Eq, Hash, PartialEq, Clone, Debug)]
pub enum EntryWithDeps {
Aggregation(Aggregation),
Root(RootEntry),
Inner(InnerEntry),
}

impl EntryWithDeps {
fn params(&self) -> &ParamTypes {
match self {
&EntryWithDeps::Aggregation(ref a) => &a.params,
&EntryWithDeps::Inner(ref ie) => &ie.params,
&EntryWithDeps::Root(ref re) => &re.params,
}
Expand All @@ -57,10 +59,17 @@ impl EntryWithDeps {
}

///
/// Returns the set of SelectKeys representing the dependencies of this EntryWithDeps.
/// Returns the set of SelectKeys representing the dependencies of this EntryWithDeps, and a
/// boolean indicating whether multiple sources of each key should be allowed.
///
fn dependency_keys(&self) -> Vec<SelectKey> {
fn dependency_keys(&self) -> (Vec<SelectKey>, bool) {
match self {
&EntryWithDeps::Aggregation(Aggregation { ref product, .. }) => {
let keys = vec![SelectKey::JustSelect(Select::without_variant(
product.clone(),
))];
(keys, true)
}
&EntryWithDeps::Inner(InnerEntry {
rule: Rule::Task(Task {
ref clause,
Expand All @@ -73,15 +82,21 @@ impl EntryWithDeps {
ref clause,
ref gets,
..
}) => clause
.iter()
.map(|s| SelectKey::JustSelect(s.clone()))
.chain(gets.iter().map(|g| SelectKey::JustGet(*g)))
.collect(),
}) => {
let keys = clause
.iter()
.map(|s| SelectKey::JustSelect(s.clone()))
.chain(gets.iter().map(|g| SelectKey::JustGet(*g)))
.collect();
(keys, false)
}
&EntryWithDeps::Inner(InnerEntry {
rule: Rule::Intrinsic(Intrinsic { ref input, .. }),
..
}) => vec![SelectKey::JustSelect(Select::without_variant(*input))],
}) => {
let keys = vec![SelectKey::JustSelect(Select::without_variant(*input))];
(keys, false)
}
}
}

Expand All @@ -93,6 +108,7 @@ impl EntryWithDeps {
let mut simplified = self.clone();
{
let simplified_params = match &mut simplified {
&mut EntryWithDeps::Aggregation(ref mut a) => &mut a.params,
&mut EntryWithDeps::Inner(ref mut ie) => &mut ie.params,
&mut EntryWithDeps::Root(ref mut re) => &mut re.params,
};
Expand All @@ -119,6 +135,13 @@ pub enum Entry {
Singleton(Key, TypeConstraint),
}

#[derive(Eq, Hash, PartialEq, Clone, Debug)]
pub struct Aggregation {
pub params: ParamTypes,
pub product: TypeConstraint,
pub func: Function,
}

#[derive(Eq, Hash, PartialEq, Clone, Debug)]
pub struct RootEntry {
params: ParamTypes,
Expand Down Expand Up @@ -380,7 +403,7 @@ impl<'t> GraphMaker<'t> {
let mut cycled_on = HashSet::new();
let mut unfulfillable_diagnostics = Vec::new();

let dependency_keys = entry.dependency_keys();
let (dependency_keys, is_aggregation) = entry.dependency_keys();

for select_key in dependency_keys {
let (params, product) = match &select_key {
Expand All @@ -400,7 +423,7 @@ impl<'t> GraphMaker<'t> {
let mut fulfillable_candidates = fulfillable_candidates_by_key
.entry(select_key.clone())
.or_insert_with(Vec::new);
for candidate in rhs(&self.tasks, &params, &product) {
for candidate in rhs(&self.tasks, &params, &product, is_aggregation) {
match candidate {
Entry::WithDeps(c) => match self.construct_graph_helper(
rule_dependency_edges,
Expand Down Expand Up @@ -476,10 +499,22 @@ impl<'t> GraphMaker<'t> {
//
// If this is an Aggregration, flatten the candidates by duplicating the SelectKey to treat
// each concrete rule as a group of candidates. Otherwise, flatten each group of candidates.
let flattened_fulfillable_candidates_by_key = fulfillable_candidates_by_key
.into_iter()
.map(|(k, candidate_group)| (k, Itertools::flatten(candidate_group.into_iter()).collect()))
.collect();
let flattened_fulfillable_candidates_by_key = if is_aggregation {
// Expect one key. TODO: Adjust the `dependency_keys` method to make this typesafe.
let (key, fulfillable_candidates): (SelectKey, Vec<Vec<Entry>>) =
fulfillable_candidates_by_key.into_iter().next().unwrap();
// Treat each group of candidates as an input, all with the same key.
// TODO: RuleEdges will need adjustment to support this duplication of SelectKey.
fulfillable_candidates
.into_iter()
.map(|candidates| (key.clone(), candidates))
.collect()
} else {
fulfillable_candidates_by_key
.into_iter()
.map(|(k, candidate_group)| (k, Itertools::flatten(candidate_group.into_iter()).collect()))
.collect()
};

// Generate one Entry per legal combination of parameters.
let simplified_entries =
Expand Down Expand Up @@ -746,7 +781,7 @@ impl<'t> GraphMaker<'t> {
param_types: &ParamTypes,
product_type: &TypeConstraint,
) -> Option<RootEntry> {
let candidates = rhs(&self.tasks, param_types, product_type);
let candidates = rhs(&self.tasks, param_types, product_type, false);
if candidates.is_empty() {
None
} else {
Expand Down Expand Up @@ -861,6 +896,16 @@ pub fn entry_str(entry: &Entry) -> String {

fn entry_with_deps_str(entry: &EntryWithDeps) -> String {
match entry {
&EntryWithDeps::Aggregation(Aggregation {
ref params,
ref product,
ref func,
}) => format!(
"Aggregation({}, {}) for {}",
type_constraint_str(product.clone()),
function_str(func),
params_str(params)
),
&EntryWithDeps::Inner(InnerEntry {
rule: Rule::Task(ref task_rule),
ref params,
Expand Down Expand Up @@ -1118,9 +1163,25 @@ impl RuleEdges {
///
/// Select Entries that can provide the given product type with the given parameters.
///
fn rhs(tasks: &Tasks, params: &ParamTypes, product_type: &TypeConstraint) -> Vec<Entry> {
/// If from_aggregation, then we are already selecting in the context of an Aggregation, and so we
/// ignore installed Aggregation rules to avoid recursion.
///
fn rhs(
tasks: &Tasks,
params: &ParamTypes,
product_type: &TypeConstraint,
from_aggregation: bool,
) -> Vec<Entry> {
if let Some(&(ref key, _)) = tasks.gen_singleton(product_type) {
return vec![Entry::Singleton(*key, *product_type)];
} else if !from_aggregation {
if let Some(func) = tasks.gen_aggregation(product_type) {
return vec![Entry::WithDeps(EntryWithDeps::Aggregation(Aggregation {
params: params.clone(),
product: *product_type,
func: func.clone(),
}))];
}
}

let mut entries = Vec::new();
Expand Down
20 changes: 19 additions & 1 deletion src/rust/engine/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ pub struct Task {
pub struct Tasks {
// output product type -> Intrinsic providing it
intrinsics: HashMap<TypeConstraint, Intrinsic, FNV>,
// Aggregations aggregate the outputs of all other rules for a type.
aggregations: HashMap<TypeConstraint, Function, FNV>,
// Singleton Values to be returned for a given TypeConstraint.
singletons: HashMap<TypeConstraint, (Key, Value), FNV>,
// output product type -> list of tasks providing it
Expand All @@ -38,7 +40,8 @@ pub struct Tasks {
/// 2. add_*() - zero or more times per task to add input clauses
/// 3. task_end() - once per task
///
/// Also has a one-shot method for adding Singletons (which have no Selects):
/// Also has one-shot methods for adding Singletons and Aggregations (which have no Selects):
/// * aggregation_add()
/// * singleton_add()
///
/// (This protocol was original defined in a Builder, but that complicated the C lifecycle.)
Expand All @@ -47,6 +50,7 @@ impl Tasks {
pub fn new() -> Tasks {
Tasks {
intrinsics: HashMap::default(),
aggregations: HashMap::default(),
singletons: HashMap::default(),
tasks: HashMap::default(),
preparing: None,
Expand Down Expand Up @@ -75,6 +79,10 @@ impl Tasks {
.collect()
}

pub fn gen_aggregation(&self, product: &TypeConstraint) -> Option<&Function> {
self.aggregations.get(product)
}

pub fn gen_singleton(&self, product: &TypeConstraint) -> Option<&(Key, Value)> {
self.singletons.get(product)
}
Expand Down Expand Up @@ -109,6 +117,16 @@ impl Tasks {
.collect();
}

pub fn aggregation_add(&mut self, func: Function, product: TypeConstraint) {
if let Some(existing_func) = self.aggregations.get(&product) {
panic!(
"More than one Aggregation rule was installed for the product {:?}: {:?} vs {:?}",
product, existing_func, func,
);
}
self.aggregations.insert(product, func);
}

pub fn singleton_add(&mut self, value: Value, product: TypeConstraint) {
if let Some(&(_, ref existing_value)) = self.singletons.get(&product) {
panic!(
Expand Down

0 comments on commit 25322b1

Please sign in to comment.