15028: Support optional secondaryFiles
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Thu, 2 May 2019 19:25:40 +0000 (15:25 -0400)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Fri, 3 May 2019 15:44:37 +0000 (11:44 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/runner.py

index 31e6be12b533cb9d96c65f74189935fad8c3fcbe..4fc02a0166455c13f6853cbce714fd6d7f2d90f4 100644 (file)
@@ -3,10 +3,10 @@
 # SPDX-License-Identifier: Apache-2.0
 
 from cwltool.command_line_tool import CommandLineTool, ExpressionTool
-from cwltool.builder import Builder
 from .arvjob import ArvadosJob
 from .arvcontainer import ArvadosContainer
 from .pathmapper import ArvPathMapper
+from .runner import make_builder
 from functools import partial
 from schema_salad.sourceline import SourceLine
 from cwltool.errors import WorkflowException
@@ -37,30 +37,6 @@ def set_cluster_target(tool, arvrunner, builder, runtimeContext):
 
     return runtimeContext
 
-def make_builder(joborder, hints, requirements, runtimeContext):
-    return Builder(
-                 job=joborder,
-                 files=[],               # type: List[Dict[Text, Text]]
-                 bindings=[],            # type: List[Dict[Text, Any]]
-                 schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
-                 names=None,               # type: Names
-                 requirements=requirements,        # type: List[Dict[Text, Any]]
-                 hints=hints,               # type: List[Dict[Text, Any]]
-                 resources={},           # type: Dict[str, int]
-                 mutation_manager=None,    # type: Optional[MutationManager]
-                 formatgraph=None,         # type: Optional[Graph]
-                 make_fs_access=None,      # type: Type[StdFsAccess]
-                 fs_access=None,           # type: StdFsAccess
-                 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
-                 timeout=runtimeContext.eval_timeout,             # type: float
-                 debug=runtimeContext.debug,               # type: bool
-                 js_console=runtimeContext.js_console,          # type: bool
-                 force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
-                 loadListing="",         # type: Text
-                 outdir="",              # type: Text
-                 tmpdir="",              # type: Text
-                 stagedir="",            # type: Text
-                )
 
 class ArvadosCommandTool(CommandLineTool):
     """Wrap cwltool CommandLineTool to override selected methods."""
index 325fccb2c97e1dd9259ececbdbaab13bfa7cf39d..3c60ac9fd58eebf3c9a8c87dcb8050f5132f9258 100644 (file)
@@ -22,9 +22,11 @@ from cwltool.context import LoadingContext
 import ruamel.yaml as yaml
 
 from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
-                     trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
+                     trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
+                     make_builder)
 from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder
+from .arvtool import ArvadosCommandTool, set_cluster_target
+
 from .perf import Perf
 
 logger = logging.getLogger('arvados.cwl-runner')
index ed6bcd008e101240dd774105ab58fa1ebddd7fbb..4a8a22a4a99c23ca8f7b0df04ceba5b0a67d5890 100644 (file)
@@ -13,8 +13,10 @@ import urllib.parse
 from functools import partial
 import logging
 import json
+import copy
 from collections import namedtuple
 from io import StringIO
+from typing import Mapping, Sequence
 
 if os.name == "posix" and sys.version_info[0] < 3:
     import subprocess32 as subprocess
@@ -25,13 +27,15 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+                             shortname, Process, fill_in_defaults)
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
 from cwltool.builder import substitute
 from cwltool.pack import pack
 from cwltool.update import INTERNAL_VERSION
+from cwltool.builder import Builder
 import schema_salad.validate as validate
 
 import arvados.collection
@@ -42,6 +46,7 @@ import arvados_cwl.arvdocker
 from .pathmapper import ArvPathMapper, trim_listing
 from ._version import __version__
 from . import done
+from . context import ArvRuntimeContext
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -76,20 +81,61 @@ def find_defaults(d, op):
             for i in viewvalues(d):
                 find_defaults(i, op)
 
-def setSecondary(t, fileobj, discovered):
-    if isinstance(fileobj, dict) and fileobj.get("class") == "File":
-        if "secondaryFiles" not in fileobj:
-            fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf["pattern"]), "class": "File"} for sf in t["secondaryFiles"]])
+def make_builder(joborder, hints, requirements, runtimeContext):
+    return Builder(
+                 job=joborder,
+                 files=[],               # type: List[Dict[Text, Text]]
+                 bindings=[],            # type: List[Dict[Text, Any]]
+                 schemaDefs={},          # type: Dict[Text, Dict[Text, Any]]
+                 names=None,               # type: Names
+                 requirements=requirements,        # type: List[Dict[Text, Any]]
+                 hints=hints,               # type: List[Dict[Text, Any]]
+                 resources={},           # type: Dict[str, int]
+                 mutation_manager=None,    # type: Optional[MutationManager]
+                 formatgraph=None,         # type: Optional[Graph]
+                 make_fs_access=None,      # type: Type[StdFsAccess]
+                 fs_access=None,           # type: StdFsAccess
+                 job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
+                 timeout=runtimeContext.eval_timeout,             # type: float
+                 debug=runtimeContext.debug,               # type: bool
+                 js_console=runtimeContext.js_console,          # type: bool
+                 force_docker_pull=runtimeContext.force_docker_pull,   # type: bool
+                 loadListing="",         # type: Text
+                 outdir="",              # type: Text
+                 tmpdir="",              # type: Text
+                 stagedir="",            # type: Text
+                )
+
+
+def set_secondary(fsaccess, builder, inputschema, primary, discovered):
+    if isinstance(primary, Mapping) and primary.get("class") == "File":
+        if "secondaryFiles" not in primary:
+            primary["secondaryFiles"] = []
+            for i, sf in enumerate(inputschema["secondaryFiles"]):
+                pattern = builder.do_eval(sf["pattern"], context=primary)
+                if pattern is None:
+                    continue
+                sfpath = substitute(primary["location"], pattern)
+                required = builder.do_eval(sf["required"], context=primary)
+
+                if fsaccess.exists(sfpath):
+                    primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+                elif required:
+                    raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+                        "Required secondary file '%s' does not exist" % sfpath)
+
+            primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
             if discovered is not None:
-                discovered[fileobj["location"]] = fileobj["secondaryFiles"]
-    elif isinstance(fileobj, list):
-        for e in fileobj:
-            setSecondary(t, e, discovered)
+                discovered[primary["location"]] = primary["secondaryFiles"]
+    elif isinstance(primary, Sequence):
+        for e in primary:
+            set_secondary(fsaccess, builder, inputschema, e, discovered)
 
-def discover_secondary_files(inputs, job_order, discovered=None):
-    for t in inputs:
-        if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
-            setSecondary(t, job_order[shortname(t["id"])], discovered)
+def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
+    for inputschema in inputs:
+        primary = job_order.get(shortname(inputschema["id"]))
+        if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"):
+            set_secondary(fsaccess, builder, inputschema, primary, discovered)
 
 collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
 collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
@@ -220,8 +266,18 @@ def upload_dependencies(arvrunner, name, document_loader,
 
     discovered = {}
     def discover_default_secondary_files(obj):
-        discover_secondary_files(obj["inputs"],
-                                 {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
+        builder_job_order = {}
+        for t in obj["inputs"]:
+            builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
+        # Need to create a builder object to evaluate expressions.
+        builder = make_builder(builder_job_order,
+                               obj.get("hints", []),
+                               obj.get("requirements", []),
+                               ArvRuntimeContext())
+        discover_secondary_files(arvrunner.fs_access,
+                                 builder,
+                                 obj["inputs"],
+                                 builder_job_order,
                                  discovered)
 
     visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
@@ -355,7 +411,21 @@ def upload_job_order(arvrunner, name, tool, job_order):
     object with 'location' updated to the proper keep references.
     """
 
-    discover_secondary_files(tool.tool["inputs"], job_order)
+    # Make a copy of the job order and set defaults.
+    builder_job_order = copy.copy(job_order)
+    fill_in_defaults(tool.tool["inputs"],
+                     builder_job_order,
+                     arvrunner.fs_access)
+    # Need to create a builder object to evaluate expressions.
+    builder = make_builder(builder_job_order,
+                           tool.tool.get("hints", []),
+                           tool.tool.get("requirements", []),
+                           ArvRuntimeContext())
+    # Now update job_order with secondaryFiles
+    discover_secondary_files(arvrunner.fs_access,
+                             builder,
+                             tool.tool["inputs"],
+                             job_order)
 
     jobmapper = upload_dependencies(arvrunner,
                                     name,