From 1746985ec50d72cad304640f1ea39e2a9dff07bf Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Thu, 2 May 2019 15:25:40 -0400 Subject: [PATCH] 15028: Support optional secondaryFiles Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/arvtool.py | 26 +------- sdk/cwl/arvados_cwl/arvworkflow.py | 6 +- sdk/cwl/arvados_cwl/runner.py | 102 ++++++++++++++++++++++++----- 3 files changed, 91 insertions(+), 43 deletions(-) diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py index 31e6be12b5..4fc02a0166 100644 --- a/sdk/cwl/arvados_cwl/arvtool.py +++ b/sdk/cwl/arvados_cwl/arvtool.py @@ -3,10 +3,10 @@ # SPDX-License-Identifier: Apache-2.0 from cwltool.command_line_tool import CommandLineTool, ExpressionTool -from cwltool.builder import Builder from .arvjob import ArvadosJob from .arvcontainer import ArvadosContainer from .pathmapper import ArvPathMapper +from .runner import make_builder from functools import partial from schema_salad.sourceline import SourceLine from cwltool.errors import WorkflowException @@ -37,30 +37,6 @@ def set_cluster_target(tool, arvrunner, builder, runtimeContext): return runtimeContext -def make_builder(joborder, hints, requirements, runtimeContext): - return Builder( - job=joborder, - files=[], # type: List[Dict[Text, Text]] - bindings=[], # type: List[Dict[Text, Any]] - schemaDefs={}, # type: Dict[Text, Dict[Text, Any]] - names=None, # type: Names - requirements=requirements, # type: List[Dict[Text, Any]] - hints=hints, # type: List[Dict[Text, Any]] - resources={}, # type: Dict[str, int] - mutation_manager=None, # type: Optional[MutationManager] - formatgraph=None, # type: Optional[Graph] - make_fs_access=None, # type: Type[StdFsAccess] - fs_access=None, # type: StdFsAccess - job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any] - timeout=runtimeContext.eval_timeout, # type: float - debug=runtimeContext.debug, # type: bool - js_console=runtimeContext.js_console, # type: bool - force_docker_pull=runtimeContext.force_docker_pull, # type: bool - loadListing="", # type: Text - outdir="", # type: Text - tmpdir="", # type: Text - stagedir="", # type: Text - ) class ArvadosCommandTool(CommandLineTool): """Wrap cwltool CommandLineTool to override selected methods.""" diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 325fccb2c9..3c60ac9fd5 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -22,9 +22,11 @@ from cwltool.context import LoadingContext import ruamel.yaml as yaml from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection, - trim_anonymous_location, remove_redundant_fields, discover_secondary_files) + trim_anonymous_location, remove_redundant_fields, discover_secondary_files, + make_builder) from .pathmapper import ArvPathMapper, trim_listing -from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder +from .arvtool import ArvadosCommandTool, set_cluster_target + from .perf import Perf logger = logging.getLogger('arvados.cwl-runner') diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index ed6bcd008e..4a8a22a4a9 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -13,8 +13,10 @@ import urllib.parse from functools import partial import logging import json +import copy from collections import namedtuple from io import StringIO +from typing import Mapping, Sequence if os.name == "posix" and sys.version_info[0] < 3: import subprocess32 as subprocess @@ -25,13 +27,15 @@ from schema_salad.sourceline import SourceLine, cmap from cwltool.command_line_tool import CommandLineTool import cwltool.workflow -from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process +from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs, + shortname, Process, fill_in_defaults) from cwltool.load_tool import fetch_document from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist from cwltool.builder import substitute from cwltool.pack import pack from cwltool.update import INTERNAL_VERSION +from cwltool.builder import Builder import schema_salad.validate as validate import arvados.collection @@ -42,6 +46,7 @@ import arvados_cwl.arvdocker from .pathmapper import ArvPathMapper, trim_listing from ._version import __version__ from . import done +from . context import ArvRuntimeContext logger = logging.getLogger('arvados.cwl-runner') @@ -76,20 +81,61 @@ def find_defaults(d, op): for i in viewvalues(d): find_defaults(i, op) -def setSecondary(t, fileobj, discovered): - if isinstance(fileobj, dict) and fileobj.get("class") == "File": - if "secondaryFiles" not in fileobj: - fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf["pattern"]), "class": "File"} for sf in t["secondaryFiles"]]) +def make_builder(joborder, hints, requirements, runtimeContext): + return Builder( + job=joborder, + files=[], # type: List[Dict[Text, Text]] + bindings=[], # type: List[Dict[Text, Any]] + schemaDefs={}, # type: Dict[Text, Dict[Text, Any]] + names=None, # type: Names + requirements=requirements, # type: List[Dict[Text, Any]] + hints=hints, # type: List[Dict[Text, Any]] + resources={}, # type: Dict[str, int] + mutation_manager=None, # type: Optional[MutationManager] + formatgraph=None, # type: Optional[Graph] + make_fs_access=None, # type: Type[StdFsAccess] + fs_access=None, # type: StdFsAccess + job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any] + timeout=runtimeContext.eval_timeout, # type: float + debug=runtimeContext.debug, # type: bool + js_console=runtimeContext.js_console, # type: bool + force_docker_pull=runtimeContext.force_docker_pull, # type: bool + loadListing="", # type: Text + outdir="", # type: Text + tmpdir="", # type: Text + stagedir="", # type: Text + ) + + +def set_secondary(fsaccess, builder, inputschema, primary, discovered): + if isinstance(primary, Mapping) and primary.get("class") == "File": + if "secondaryFiles" not in primary: + primary["secondaryFiles"] = [] + for i, sf in enumerate(inputschema["secondaryFiles"]): + pattern = builder.do_eval(sf["pattern"], context=primary) + if pattern is None: + continue + sfpath = substitute(primary["location"], pattern) + required = builder.do_eval(sf["required"], context=primary) + + if fsaccess.exists(sfpath): + primary["secondaryFiles"].append({"location": sfpath, "class": "File"}) + elif required: + raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError( + "Required secondary file '%s' does not exist" % sfpath) + + primary["secondaryFiles"] = cmap(primary["secondaryFiles"]) if discovered is not None: - discovered[fileobj["location"]] = fileobj["secondaryFiles"] - elif isinstance(fileobj, list): - for e in fileobj: - setSecondary(t, e, discovered) + discovered[primary["location"]] = primary["secondaryFiles"] + elif isinstance(primary, Sequence): + for e in primary: + set_secondary(fsaccess, builder, inputschema, e, discovered) -def discover_secondary_files(inputs, job_order, discovered=None): - for t in inputs: - if shortname(t["id"]) in job_order and t.get("secondaryFiles"): - setSecondary(t, job_order[shortname(t["id"])], discovered) +def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None): + for inputschema in inputs: + primary = job_order.get(shortname(inputschema["id"])) + if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"): + set_secondary(fsaccess, builder, inputschema, primary, discovered) collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$') collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?') @@ -220,8 +266,18 @@ def upload_dependencies(arvrunner, name, document_loader, discovered = {} def discover_default_secondary_files(obj): - discover_secondary_files(obj["inputs"], - {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t}, + builder_job_order = {} + for t in obj["inputs"]: + builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None + # Need to create a builder object to evaluate expressions. + builder = make_builder(builder_job_order, + obj.get("hints", []), + obj.get("requirements", []), + ArvRuntimeContext()) + discover_secondary_files(arvrunner.fs_access, + builder, + obj["inputs"], + builder_job_order, discovered) visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files) @@ -355,7 +411,21 @@ def upload_job_order(arvrunner, name, tool, job_order): object with 'location' updated to the proper keep references. """ - discover_secondary_files(tool.tool["inputs"], job_order) + # Make a copy of the job order and set defaults. + builder_job_order = copy.copy(job_order) + fill_in_defaults(tool.tool["inputs"], + builder_job_order, + arvrunner.fs_access) + # Need to create a builder object to evaluate expressions. + builder = make_builder(builder_job_order, + tool.tool.get("hints", []), + tool.tool.get("requirements", []), + ArvRuntimeContext()) + # Now update job_order with secondaryFiles + discover_secondary_files(arvrunner.fs_access, + builder, + tool.tool["inputs"], + job_order) jobmapper = upload_dependencies(arvrunner, name, -- 2.39.5