Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refuse relative path for remote in coordinator #538

Merged
merged 15 commits into from
Jun 12, 2024
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
tool-cache: true

# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
Expand All @@ -96,7 +96,7 @@ jobs:
haskell: true
large-packages: false
docker-images: true
swap-storage: false
swap-storage: true
- name: Free disk Space (Windows)
if: runner.os == 'Windows'
run: |
Expand Down
14 changes: 11 additions & 3 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,17 @@ fn run() -> eyre::Result<()> {
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
if !coordinator_addr.is_loopback() {
// use empty string to indicate that all nodes should be
// referred to as remote node, such that all paths are
// checked as absolute, the machine id of remote daemon
// should not be empty string.
dataflow_descriptor.check_in_daemon(&working_dir, &[&String::default()])?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}

let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
Expand Down
12 changes: 11 additions & 1 deletion binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow(
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;
let remote_machine_id: Vec<_> = daemon_connections
.iter()
.filter_map(|(id, c)| {
if !c.listen_socket.ip().is_loopback() {
Some(id.as_str())
} else {
None
}
})
.collect();
dataflow.check_in_daemon(&working_dir, &remote_machine_id)?;

let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
Expand Down
12 changes: 11 additions & 1 deletion libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,17 @@ impl Descriptor {
}

pub fn check(&self, working_dir: &Path) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.")
validate::check_dataflow(self, working_dir, None)
.wrap_err("Dataflow could not be validated.")
}

pub fn check_in_daemon(
&self,
working_dir: &Path,
remote_machine_id: &[&str],
) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir, Some(remote_machine_id))
.wrap_err("Dataflow could not be validated.")
}
}

Expand Down
32 changes: 29 additions & 3 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
adjust_shared_library_path,
config::{DataId, Input, InputMapping, OperatorId, UserInputMapping},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION},
get_python_path,
};

Expand All @@ -12,18 +12,44 @@ use tracing::info;
use super::{resolve_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
pub fn check_dataflow(
dataflow: &Descriptor,
working_dir: &Path,
remote_daemon_id: Option<&[&str]>,
) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;

// check that nodes and operators exist
for node in &nodes {
match &node.kind {
descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if remote_daemon_id.contains(&node.deploy.machine.as_str())
|| remote_daemon_id.contains(&String::default().as_str())
XxChang marked this conversation as resolved.
Show resolved Hide resolved
{
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
if path.is_relative() {
eyre::bail!(
"paths of remote nodes must be absolute (node `{}`)",
node.id
);
}
info!("skipping path check for remote node `{}`", node.id);
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
}
} else {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;
Expand Down
Loading