# SPDX-License-Identifier: Apache-2.0
from cwltool.command_line_tool import CommandLineTool, ExpressionTool
-from cwltool.builder import Builder
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
+from .runner import make_builder
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
return runtimeContext
-def make_builder(joborder, hints, requirements, runtimeContext):
- return Builder(
- job=joborder,
- files=[], # type: List[Dict[Text, Text]]
- bindings=[], # type: List[Dict[Text, Any]]
- schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
- names=None, # type: Names
- requirements=requirements, # type: List[Dict[Text, Any]]
- hints=hints, # type: List[Dict[Text, Any]]
- resources={}, # type: Dict[str, int]
- mutation_manager=None, # type: Optional[MutationManager]
- formatgraph=None, # type: Optional[Graph]
- make_fs_access=None, # type: Type[StdFsAccess]
- fs_access=None, # type: StdFsAccess
- job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
- timeout=runtimeContext.eval_timeout, # type: float
- debug=runtimeContext.debug, # type: bool
- js_console=runtimeContext.js_console, # type: bool
- force_docker_pull=runtimeContext.force_docker_pull, # type: bool
- loadListing="", # type: Text
- outdir="", # type: Text
- tmpdir="", # type: Text
- stagedir="", # type: Text
- )
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
from functools import partial
import logging
import json
+import copy
from collections import namedtuple
from io import StringIO
+from typing import Mapping, Sequence
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+ shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
from cwltool.builder import substitute
from cwltool.pack import pack
from cwltool.update import INTERNAL_VERSION
+from cwltool.builder import Builder
import schema_salad.validate as validate
import arvados.collection
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
+from . context import ArvRuntimeContext
logger = logging.getLogger('arvados.cwl-runner')
for i in viewvalues(d):
find_defaults(i, op)
-def setSecondary(t, fileobj, discovered):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf["pattern"]), "class": "File"} for sf in t["secondaryFiles"]])
+def make_builder(joborder, hints, requirements, runtimeContext):
+ return Builder(
+ job=joborder,
+ files=[], # type: List[Dict[Text, Text]]
+ bindings=[], # type: List[Dict[Text, Any]]
+ schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
+ names=None, # type: Names
+ requirements=requirements, # type: List[Dict[Text, Any]]
+ hints=hints, # type: List[Dict[Text, Any]]
+ resources={}, # type: Dict[str, int]
+ mutation_manager=None, # type: Optional[MutationManager]
+ formatgraph=None, # type: Optional[Graph]
+ make_fs_access=None, # type: Type[StdFsAccess]
+ fs_access=None, # type: StdFsAccess
+ job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
+ timeout=runtimeContext.eval_timeout, # type: float
+ debug=runtimeContext.debug, # type: bool
+ js_console=runtimeContext.js_console, # type: bool
+ force_docker_pull=runtimeContext.force_docker_pull, # type: bool
+ loadListing="", # type: Text
+ outdir="", # type: Text
+ tmpdir="", # type: Text
+ stagedir="", # type: Text
+ )
+
+
+def set_secondary(fsaccess, builder, inputschema, primary, discovered):
+ if isinstance(primary, Mapping) and primary.get("class") == "File":
+ if "secondaryFiles" not in primary:
+ primary["secondaryFiles"] = []
+ for i, sf in enumerate(inputschema["secondaryFiles"]):
+ pattern = builder.do_eval(sf["pattern"], context=primary)
+ if pattern is None:
+ continue
+ sfpath = substitute(primary["location"], pattern)
+ required = builder.do_eval(sf["required"], context=primary)
+
+ if fsaccess.exists(sfpath):
+ primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
+
+ primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
if discovered is not None:
- discovered[fileobj["location"]] = fileobj["secondaryFiles"]
- elif isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(t, e, discovered)
+ discovered[primary["location"]] = primary["secondaryFiles"]
+ elif isinstance(primary, Sequence):
+ for e in primary:
+ set_secondary(fsaccess, builder, inputschema, e, discovered)
-def discover_secondary_files(inputs, job_order, discovered=None):
- for t in inputs:
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(t, job_order[shortname(t["id"])], discovered)
+def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
+ for inputschema in inputs:
+ primary = job_order.get(shortname(inputschema["id"]))
+ if isinstance(primary, (Mapping, Sequence)) and inputschema.get("secondaryFiles"):
+ set_secondary(fsaccess, builder, inputschema, primary, discovered)
collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
discovered = {}
def discover_default_secondary_files(obj):
- discover_secondary_files(obj["inputs"],
- {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
+ builder_job_order = {}
+ for t in obj["inputs"]:
+ builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
+ # Need to create a builder object to evaluate expressions.
+ builder = make_builder(builder_job_order,
+ obj.get("hints", []),
+ obj.get("requirements", []),
+ ArvRuntimeContext())
+ discover_secondary_files(arvrunner.fs_access,
+ builder,
+ obj["inputs"],
+ builder_job_order,
discovered)
visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
object with 'location' updated to the proper keep references.
"""
- discover_secondary_files(tool.tool["inputs"], job_order)
+ # Make a copy of the job order and set defaults.
+ builder_job_order = copy.copy(job_order)
+ fill_in_defaults(tool.tool["inputs"],
+ builder_job_order,
+ arvrunner.fs_access)
+ # Need to create a builder object to evaluate expressions.
+ builder = make_builder(builder_job_order,
+ tool.tool.get("hints", []),
+ tool.tool.get("requirements", []),
+ ArvRuntimeContext())
+ # Now update job_order with secondaryFiles
+ discover_secondary_files(arvrunner.fs_access,
+ builder,
+ tool.tool["inputs"],
+ job_order)
jobmapper = upload_dependencies(arvrunner,
name,