Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydroflow)!: remove import!, fix #1110 #1600

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions benches/benches/fork_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ fn benchmark_hydroflow(c: &mut Criterion) {
fn benchmark_hydroflow_surface(c: &mut Criterion) {
c.bench_function("fork_join/hydroflow/surface", |b| {
b.iter(|| {
let mut hf = hydroflow_syntax! {
source_iter(0..NUM_INTS) -> import!("fork_join_20.hf") -> for_each(|x| { black_box(x); });
};
let mut hf = include!("fork_join_20.hf");
hf.run_available();
})
});
Expand Down
12 changes: 8 additions & 4 deletions benches/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ pub fn fork_join() -> std::io::Result<()> {
let file = File::create(path)?;
let mut write = BufWriter::new(file);

writeln!(write, "a0 = mod -> tee();")?;

writeln!(write, "hydroflow_syntax! {{")?;
writeln!(write, "a0 = source_iter(0..NUM_INTS) -> tee();")?;
for i in 0..NUM_OPS {
if i > 0 {
writeln!(write, "a{} = union() -> tee();", i)?;
}
writeln!(write, "a{} -> filter(|x| x % 2 == 0) -> a{};", i, i + 1)?;
writeln!(write, "a{} -> filter(|x| x % 2 == 1) -> a{};", i, i + 1)?;
}

writeln!(write, "a{} = union() -> mod;", NUM_OPS)?;
writeln!(
write,
"a{} = union() -> for_each(|x| {{ black_box(x); }});",
NUM_OPS
)?;
writeln!(write, "}}")?;

write.flush()?;

Expand Down
8 changes: 0 additions & 8 deletions hydroflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,6 @@ required-features = [ "nightly" ]
name = "python_udf"
required-features = [ "python" ]

[[example]]
name = "modules_outer_join"
required-features = [ "debugging" ]

[[example]]
name = "modules_triple_cross_join"
required-features = [ "debugging" ]

[dependencies]
bincode = "1.3.1"
byteorder = "1.3.2"
Expand Down
23 changes: 0 additions & 23 deletions hydroflow/examples/modules_outer_join/full_outer_join.hf

This file was deleted.

16 changes: 0 additions & 16 deletions hydroflow/examples/modules_outer_join/left_outer_join.hf

This file was deleted.

30 changes: 0 additions & 30 deletions hydroflow/examples/modules_outer_join/main.rs

This file was deleted.

6 changes: 0 additions & 6 deletions hydroflow/examples/modules_outer_join/right_outer_join.hf

This file was deleted.

51 changes: 0 additions & 51 deletions hydroflow/examples/modules_triple_cross_join/main.rs

This file was deleted.

15 changes: 0 additions & 15 deletions hydroflow/examples/modules_triple_cross_join/triple_cross_join.hf

This file was deleted.

146 changes: 1 addition & 145 deletions hydroflow_lang/src/graph/flat_graph_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::borrow::Cow;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::path::PathBuf;

use itertools::Itertools;
use proc_macro2::Span;
Expand Down Expand Up @@ -71,9 +70,6 @@ pub struct FlatGraphBuilder {
/// Use statements.
uses: Vec<ItemUse>,

/// In order to make import!() statements relative to the current file, we need to know where the file is that is building the flat graph.
invocating_file_path: PathBuf,

/// If the flat graph is being loaded as a module, then two initial ModuleBoundary nodes are inserted into the graph. One
/// for the input into the module and one for the output out of the module.
module_boundary_nodes: Option<(GraphNodeId, GraphNodeId)>,
Expand All @@ -86,37 +82,8 @@ impl FlatGraphBuilder {
}

/// Convert the Hydroflow code AST into a graph builder.
pub fn from_hfcode(input: HfCode, macro_invocation_path: PathBuf) -> Self {
let mut builder = Self {
invocating_file_path: macro_invocation_path,
..Default::default()
};
builder.process_statements(input.statements);
builder
}

/// Convert the Hydroflow code AST into a graph builder.
pub fn from_hfmodule(input: HfCode, root_path: PathBuf) -> Self {
pub fn from_hfcode(input: HfCode) -> Self {
let mut builder = Self::default();
builder.invocating_file_path = root_path; // imports inside of modules should be relative to the importing file.
builder.module_boundary_nodes = Some((
builder.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: true,
import_expr: Span::call_site(),
},
Some(Ident::new("input", Span::call_site())),
None,
),
builder.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: false,
import_expr: Span::call_site(),
},
Some(Ident::new("output", Span::call_site())),
None,
),
));
builder.process_statements(input.statements);
builder
}
Expand Down Expand Up @@ -276,118 +243,7 @@ impl FlatGraphBuilder {
out: Some((PortIndexValue::Elided(op_span), GraphDet::Determined(nid))),
}
}
Pipeline::Import(import) => {
// TODO: https://github.com/rust-lang/rfcs/pull/3200
// this would be way better...
let file_path = {
let mut dir = self.invocating_file_path.clone();
dir.pop();
dir.join(import.filename.value())
};

let file_contents = match std::fs::read_to_string(&file_path) {
Ok(contents) => contents,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.filename.span(),
Level::Error,
format!("filename: {}, err: {err}", import.filename.value()),
));

return Ends {
inn: None,
out: None,
};
}
};

let statements = match syn::parse_str::<HfCode>(&file_contents) {
Ok(code) => code,
Err(err) => {
self.diagnostics.push(Diagnostic::spanned(
import.span(),
Level::Error,
format!("Error in module: {}", err),
));

return Ends {
inn: None,
out: None,
};
}
};

let flat_graph_builder = FlatGraphBuilder::from_hfmodule(statements, file_path);
let (flat_graph, _uses, diagnostics) = flat_graph_builder.build();
diagnostics.iter().for_each(Diagnostic::emit);

self.merge_in(flat_graph, import.span())
}
}
}

/// Merge one flatgraph into the current flatgraph
/// other must be a flatgraph and not be partitioned yet.
fn merge_in(&mut self, other: HydroflowGraph, parent_span: Span) -> Ends {
assert_eq!(other.subgraphs().count(), 0);

let mut ends = Ends {
inn: None,
out: None,
};

let mut node_mapping = BTreeMap::new();

for (other_node_id, node) in other.nodes() {
match node {
GraphNode::Operator(_) => {
let varname = other.node_varname(other_node_id);
let new_id = self.flat_graph.insert_node(node.clone(), varname, None);
node_mapping.insert(other_node_id, new_id);
}
GraphNode::ModuleBoundary { input, .. } => {
let new_id = self.flat_graph.insert_node(
GraphNode::ModuleBoundary {
input: *input,
import_expr: parent_span,
},
Some(Ident::new(&format!("module_{}", input), parent_span)),
None,
);
node_mapping.insert(other_node_id, new_id);

// in the case of nested imports, this module boundary might not be the module boundary into or out of the top-most module
// So we have to be careful to only target those two boundaries.
// There should be no inputs to it, if it is an input boundary, if it is the top-most one.
// and there should be no outputs from it, if it is an output boundary, if it is the top-most one.
if *input && other.node_predecessor_nodes(other_node_id).count() == 0 {
if other.node_predecessor_nodes(other_node_id).count() == 0 {
ends.inn =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
} else if !(*input) && other.node_successor_nodes(other_node_id).count() == 0 {
ends.out =
Some((PortIndexValue::Elided(None), GraphDet::Determined(new_id)));
}
}
GraphNode::Handoff { .. } => {
panic!("Handoff in graph that is being merged into self")
}
}
}

for (other_edge_id, (other_src, other_dst)) in other.edges() {
let (src_port, dst_port) = other.edge_ports(other_edge_id);

let _new_edge_id = self.flat_graph.insert_edge(
*node_mapping.get(&other_src).unwrap(),
src_port.clone(),
*node_mapping.get(&other_dst).unwrap(),
dst_port.clone(),
);
}

ends
}

/// Connects operator links as a final building step. Processes all the links stored in
Expand Down
4 changes: 1 addition & 3 deletions hydroflow_lang/src/graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ mod hydroflow_graph;
mod hydroflow_graph_debugging;

use std::fmt::Display;
use std::path::PathBuf;

pub use di_mul_graph::DiMulGraph;
pub use eliminate_extra_unions_tees::eliminate_extra_unions_tees;
Expand Down Expand Up @@ -376,9 +375,8 @@ impl Display for PortIndexValue {
pub fn build_hfcode(
hf_code: HfCode,
root: &TokenStream,
macro_invocation_path: PathBuf,
) -> (Option<(HydroflowGraph, TokenStream)>, Vec<Diagnostic>) {
let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code, macro_invocation_path);
let flat_graph_builder = FlatGraphBuilder::from_hfcode(hf_code);
let (mut flat_graph, uses, mut diagnostics) = flat_graph_builder.build();
if !diagnostics.iter().any(Diagnostic::is_error) {
if let Err(diagnostic) = flat_graph.merge_modules() {
Expand Down
Loading
Loading