Skip to content

Commit

Permalink
Implement secondary file testing, staging, and indexing.
Browse files Browse the repository at this point in the history
- The staging part work previously, long ago but things have changed a lot and this needed to be touched up.
- Implements secondary files when summarizing cwl outputs for testing and CLI.
- Upgrades cwltool to latest since the previous target version had a bug in it, required to adapt the path mapper interface a bit.
- Implement indexing of secondary files - the test cases require that secondary files are ordered.
- Tweaks to upload.py to allow uploading files with secondaryFiles.
- Fix a bug related to not matching workflow step to correct tool.
- Fix a bug related to NO_REPLACEMENT handling for FieldParameters.

The big thing not implemented in this commit is Directories with secondaryFiles - so ultimately the search.cwl conformance test still fails.
  • Loading branch information
jmchilton committed Sep 5, 2017
1 parent 7a00dcb commit 03d1636
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 59 deletions.
2 changes: 1 addition & 1 deletion lib/galaxy/dependencies/pinned-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ chronos-python==0.38.0
python-genomespaceclient==0.1.8

# For CWL support.
cwltool==1.0.20170727112954
cwltool==1.0.20170828135420
cwltest==1.0.20170809112706 # TODO: only required for testing...
10 changes: 10 additions & 0 deletions lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4004,6 +4004,11 @@ def __init__(self):
self.merge_type = self.default_merge_type
self.scatter_type = self.default_scatter_type

def log_str(self):
return "WorkflowStepInput[name=%s]" % (
self.name,
)


class WorkflowStepConnection(object):
# Constant used in lieu of output_name and input_name to indicate an
Expand Down Expand Up @@ -4035,6 +4040,11 @@ def copy(self):
copied_connection.input_name = self.input_name
return copied_connection

def log_str(self):
return "WorkflowStepConnection[output_step_id=%s,output_name=%s,input_step_id=%s,input_name=%s]" % (
self.output_step_id, self.output_name, self.input_step_id, self.input_name
)


class WorkflowOutput(object):

Expand Down
10 changes: 6 additions & 4 deletions lib/galaxy/tools/cwl/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
)

from .schema import non_strict_schema_loader, schema_loader
from .util import SECONDARY_FILES_EXTRA_PREFIX

log = logging.getLogger(__name__)

JOB_JSON_FILE = ".cwl_job.json"
SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__"

DOCKER_REQUIREMENT = "DockerRequirement"
SUPPORTED_TOOL_REQUIREMENTS = [
Expand Down Expand Up @@ -223,7 +223,8 @@ def id(self):
def galaxy_id(self):
raw_id = self.id
tool_id = None
if raw_id:
# don't reduce "search.cwl#index" to search
if raw_id and "#" not in raw_id:
tool_id = os.path.splitext(os.path.basename(raw_id))[0]
if not tool_id:
from galaxy.tools.hash import build_tool_hash
Expand Down Expand Up @@ -485,14 +486,15 @@ def stageFunc(resolved_path, target_path):
process.stageFiles(cwl_job.pathmapper, stageFunc, ignoreWritable=True, symLink=False)

if hasattr(cwl_job, "generatefiles"):
outdir = os.path.join(self._job_directory, "working")
# TODO: Why doesn't cwl_job.generatemapper work?
generate_mapper = pathmapper.PathMapper(cwl_job.generatefiles["listing"],
os.path.join(self._job_directory, "working"), os.path.join(self._job_directory, "working"), separateDirs=False)
outdir, outdir, separateDirs=False)
# TODO: figure out what inplace_update should be.
inplace_update = getattr(cwl_job, "inplace_update")
process.stageFiles(generate_mapper, stageFunc, ignoreWritable=inplace_update, symLink=False)
from cwltool import job
job.relink_initialworkdir(generate_mapper, inplace_update=inplace_update)
job.relink_initialworkdir(generate_mapper, outdir, outdir, inplace_update=inplace_update)
# else: expression tools do not have a path mapper.

@staticmethod
Expand Down
14 changes: 11 additions & 3 deletions lib/galaxy/tools/cwl/representation.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,26 @@ def dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper):
extra_files_path = dataset_wrapper.extra_files_path
secondary_files_path = os.path.join(extra_files_path, "__secondary_files__")
path = str(dataset_wrapper)
raw_file_object = {"class": "File"}

if os.path.exists(secondary_files_path):
safe_makedirs(inputs_dir)
name = os.path.basename(path)
new_input_path = os.path.join(inputs_dir, name)
os.symlink(path, new_input_path)
secondary_files = []
for secondary_file_name in os.listdir(secondary_files_path):
secondary_file_path = os.path.join(secondary_files_path, secondary_file_name)
os.symlink(secondary_file_path, new_input_path + secondary_file_name)
target = os.path.join(inputs_dir, secondary_file_name)
log.info("linking [%s] to [%s]" % (secondary_file_path, target))
os.symlink(secondary_file_path, target)
is_dir = os.path.isdir(os.path.realpath(secondary_file_path))
secondary_files.append({"class": "File" if not is_dir else "Directory", "location": target})

raw_file_object["secondaryFiles"] = secondary_files
path = new_input_path

raw_file_object = {"location": path,
"class": "File"}
raw_file_object["location"] = path
set_basename_and_derived_properties(raw_file_object, str(dataset_wrapper.cwl_filename or dataset_wrapper.name))
return raw_file_object

Expand Down
76 changes: 59 additions & 17 deletions lib/galaxy/tools/cwl/runtime_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,19 @@
load_job_proxy,
)

from .util import (
SECONDARY_FILES_INDEX_PATH,
STORE_SECONDARY_FILES_WITH_BASENAME,
)


def _possible_uri_to_path(location):
if location.startswith("file://"):
path = ref_resolver.uri_file_path(location)
else:
path = location
return path


def handle_outputs(job_directory=None):
# Relocate dynamically collected files to pre-determined locations
Expand All @@ -30,25 +43,54 @@ def handle_outputs(job_directory=None):

def move_output_file(output, target_path, output_name=None):
assert output["class"] == "File"
output_path = ref_resolver.uri_file_path(output["location"])
output_path = _possible_uri_to_path(output["location"])
shutil.move(output_path, target_path)

for secondary_file in output.get("secondaryFiles", []):
if output_name is None:
raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements")

# TODO: handle nested files...
secondary_file_path = ref_resolver.uri_file_path(secondary_file["location"])
assert secondary_file_path.startswith(output_path)
secondary_file_name = secondary_file_path[len(output_path):]
secondary_files_dir = job_proxy.output_secondary_files_dir(
output_name, create=True
)
extra_target = os.path.join(secondary_files_dir, secondary_file_name)
shutil.move(
secondary_file_path,
extra_target,
)
secondary_files = output.get("secondaryFiles", [])
if secondary_files:

order = []
index_contents = {
"order": order
}

for secondary_file in secondary_files:
if output_name is None:
raise NotImplementedError("secondaryFiles are unimplemented for dynamic list elements")

# TODO: handle nested files...
secondary_file_path = _possible_uri_to_path(secondary_file["location"])
# assert secondary_file_path.startswith(output_path), "[%s] does not start with [%s]" % (secondary_file_path, output_path)
secondary_file_basename = secondary_file["basename"]

if not STORE_SECONDARY_FILES_WITH_BASENAME:
output_basename = output["basename"]
prefix = ""
while True:
if secondary_file_basename.startswith(output_basename):
secondary_file_name = prefix + secondary_file_basename[len(output_basename):]
break
prefix = "^%s" % prefix
if "." not in output_basename:
secondary_file_name = prefix + secondary_file_name
break
else:
output_basename = output_basename.rsplit(".", 1)[0]
else:
secondary_file_name = secondary_file_basename
# Convert to ^ format....
secondary_files_dir = job_proxy.output_secondary_files_dir(
output_name, create=True
)
extra_target = os.path.join(secondary_files_dir, secondary_file_name)
shutil.move(
secondary_file_path,
extra_target,
)
order.append(secondary_file_name)

with open(os.path.join(secondary_files_dir, "..", SECONDARY_FILES_INDEX_PATH), "w") as f:
json.dump(index_contents, f)

return {"cwl_filename": output["basename"]}

Expand Down
80 changes: 63 additions & 17 deletions lib/galaxy/tools/cwl/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

from six import iteritems, StringIO

STORE_SECONDARY_FILES_WITH_BASENAME = True
SECONDARY_FILES_EXTRA_PREFIX = "__secondary_files__"
SECONDARY_FILES_INDEX_PATH = "__secondary_files_index.json"


def set_basename_and_derived_properties(properties, basename):
properties["basename"] = basename
Expand Down Expand Up @@ -62,11 +66,11 @@ def galactic_job_json(
datasets = []
dataset_collections = []

def upload_file(file_path):
def upload_file(file_path, secondary_files):
if not os.path.isabs(file_path):
file_path = os.path.join(test_data_directory, file_path)
_ensure_file_exists(file_path)
target = FileUploadTarget(file_path)
target = FileUploadTarget(file_path, secondary_files)
upload_response = upload_func(target)
dataset = upload_response["outputs"][0]
datasets.append((dataset, target))
Expand Down Expand Up @@ -125,7 +129,30 @@ def replacement_file(value):
if file_path is None:
return value

return upload_file(file_path)
secondary_files = value.get("secondaryFiles", [])
secondary_files_tar_path = None
if secondary_files:
tmp = tempfile.NamedTemporaryFile(delete=False)
tf = tarfile.open(fileobj=tmp, mode='w:')
order = []
index_contents = {
"order": order
}
for secondary_file in secondary_files:
secondary_file_path = secondary_file.get("location", None) or value.get("path", None)
assert secondary_file_path, "Invalid secondaryFile entry found [%s]" % secondary_file
full_secondary_file_path = os.path.join(test_data_directory, secondary_file_path)
basename = secondary_file.get("basename") or secondary_file_path
order.append(basename)
tf.add(full_secondary_file_path, os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename))
tmp_index = tempfile.NamedTemporaryFile(delete=False)
json.dump(index_contents, tmp_index)
tmp_index.close()
tf.add(tmp_index.name, SECONDARY_FILES_INDEX_PATH)
tf.close()
secondary_files_tar_path = tmp.name

return upload_file(file_path, secondary_files_tar_path)

def replacement_directory(value):
file_path = value.get("location", None) or value.get("path", None)
Expand Down Expand Up @@ -196,8 +223,9 @@ def _ensure_file_exists(file_path):

class FileUploadTarget(object):

def __init__(self, path):
def __init__(self, path, secondary_files=None):
self.path = path
self.secondary_files = secondary_files


class ObjectUploadTarget(object):
Expand Down Expand Up @@ -257,31 +285,49 @@ def element_to_cwl_json(element):
return output_to_cwl_json(element_output, get_metadata, get_dataset)

output_metadata = get_metadata(galaxy_output.history_content_type, galaxy_output.history_content_id)

def dataset_dict_to_json_content(dataset_dict):
if "content" in dataset_dict:
return json.loads(dataset_dict["content"])
else:
with open(dataset_dict["path"]) as f:
return json.load(f)

if output_metadata["history_content_type"] == "dataset":
ext = output_metadata["file_ext"]
assert output_metadata["state"] == "ok"
dataset_dict = get_dataset(output_metadata)
if ext == "expression.json":
if "content" in dataset_dict:
return json.loads(dataset_dict["content"])
else:
with open(dataset_dict["path"]) as f:
return json.load(f)
return dataset_dict_to_json_content(dataset_dict)
else:
properties = output_properties(pseduo_location=pseduo_location, **dataset_dict)
basename = properties["basename"]
extra_files = get_extra_files(output_metadata)
found_index = False
for extra_file in extra_files:
if extra_file["class"] == "File":
path = extra_file["path"]
if path.startswith("__secondary_files__/"):
ec = get_dataset(output_metadata, filename=path)
ec["basename"] = basename + os.path.basename(path)
ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
if "secondaryFiles" not in properties:
properties["secondaryFiles"] = []

properties["secondaryFiles"].append(ec_properties)
if path == SECONDARY_FILES_INDEX_PATH:
found_index = True

if found_index:
ec = get_dataset(output_metadata, filename=SECONDARY_FILES_INDEX_PATH)
index = dataset_dict_to_json_content(ec)
for basename in index["order"]:
for extra_file in extra_files:
if extra_file["class"] == "File":
path = extra_file["path"]
if path == os.path.join(SECONDARY_FILES_EXTRA_PREFIX, basename):
ec = get_dataset(output_metadata, filename=path)
if not STORE_SECONDARY_FILES_WITH_BASENAME:
ec["basename"] = basename + os.path.basename(path)
else:
ec["basename"] = os.path.basename(path)
ec_properties = output_properties(pseduo_location=pseduo_location, **ec)
if "secondaryFiles" not in properties:
properties["secondaryFiles"] = []

properties["secondaryFiles"].append(ec_properties)

return properties

Expand Down
2 changes: 2 additions & 0 deletions lib/galaxy/tools/parameters/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ def get_composite_dataset_name(self, context):
dataset_name = context.get('files_metadata|base_name', None)
if dataset_name is None:
dataset_name = context.get('files_metadata', {}).get('base_name', None)
if dataset_name is None:
dataset_name = context.get("files")[0].get("NAME", None)
if dataset_name is None:
dataset_name = 'Uploaded Composite Dataset (%s)' % self.get_file_type(context)
return dataset_name
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/workflow/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ def callback(input, prefixed_name, **kwargs):
replacement = {"src": "hda", "value": replacement}
elif isinstance(replacement, model.HistoryDatasetCollectionAssociation):
replacement = {"src": "hdca", "value": replacement}
else:
elif replacement is not NO_REPLACEMENT:
replacement = {"src": "json", "value": replacement}

log.info("replacement for [%s] is [%s]" % (prefixed_name, replacement))
Expand Down
18 changes: 18 additions & 0 deletions test/api/test_workflows_cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ def test_simplest_wf(self):
output = self.dataset_populator.get_history_dataset_content(self.history_id, hid=2)
assert re.search(r"\s+4\s+9\s+47\s+", output)

def test_load_ids(self):
workflow_id = self._load_workflow("v1.0/search.cwl#main")
workflow_content = self._download_workflow(workflow_id)
for step_index, step in workflow_content["steps"].items():
if "tool_representation" in step:
del step["tool_representation"]

print(workflow_content)
steps = workflow_content["steps"]
step_3 = steps["3"]
step_4 = steps["4"]

assert step_3["label"] == "index", step_3
assert step_4["label"] == "search", step_4

print(step_3)
print(step_4)

def test_count_line1_v1(self):
"""Test simple workflow v1.0/count-lines1-wf.cwl."""
self._run_count_lines_wf("v1.0/count-lines1-wf.cwl")
Expand Down
17 changes: 15 additions & 2 deletions test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,24 @@ def upload_func(upload_target):
with open(path, "rb") as f:
content = f.read()

name = os.path.basename(path)

extra_inputs = dict()
if upload_target.secondary_files:
assert UPLOAD_VIA == "path"
extra_inputs["files_1|url_paste"] = "file://%s" % upload_target.secondary_files
extra_inputs["files_1|type"] = "upload_dataset"
extra_inputs["files_1|auto_decompress"] = True
extra_inputs["file_count"] = "2"
extra_inputs["force_composite"] = "True"

return self.dataset_populator.new_dataset_request(
history_id=history_id,
content='content',
content=content,
file_type="auto",
name=os.path.basename(path),
name=name,
auto_decompress=False,
extra_inputs=extra_inputs,
).json()
elif isinstance(upload_target, DirectoryUploadTarget):
path = upload_target.tar_path
Expand Down
Loading

0 comments on commit 03d1636

Please sign in to comment.