from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import substitute
from cwltool.pack import pack
from cwltool.update import INTERNAL_VERSION
import arvados.collection
from .util import collectionUUID
-import ruamel.yaml as yaml
+from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
import arvados_cwl.arvdocker
for i in viewvalues(d):
find_defaults(i, op)
-def make_builder(joborder, hints, requirements, runtimeContext):
+def make_builder(joborder, hints, requirements, runtimeContext, metadata):
return Builder(
job=joborder,
files=[], # type: List[Dict[Text, Text]]
outdir="", # type: Text
tmpdir="", # type: Text
stagedir="", # type: Text
+ cwlVersion=metadata.get("http://commonwl.org/cwltool#original_cwlVersion") or metadata.get("cwlVersion"),
+ container_engine="docker"
)
def search_schemadef(name, reqs):
specs = []
primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
- pattern = builder.do_eval(sf["pattern"], context=primary)
+ if builder.cwlVersion == "v1.0":
+ pattern = builder.do_eval(sf, context=primary)
+ else:
+ pattern = builder.do_eval(sf["pattern"], context=primary)
if pattern is None:
continue
if isinstance(pattern, list):
elif isinstance(pattern, dict):
specs.append(pattern)
elif isinstance(pattern, str):
- specs.append({"pattern": pattern})
+ if builder.cwlVersion == "v1.0":
+ specs.append({"pattern": pattern, "required": True})
+ else:
+ specs.append({"pattern": pattern, "required": sf.get("required")})
else:
raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
"Expression must return list, object, string or null")
for i, sf in enumerate(specs):
if isinstance(sf, dict):
if sf.get("class") == "File":
- pattern = sf["basename"]
+ pattern = None
+ if sf.get("location") is None:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "File object is missing 'location': %s" % sf)
+ sfpath = sf["location"]
+ required = True
else:
pattern = sf["pattern"]
required = sf.get("required")
raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
"Expression must return list, object, string or null")
- sfpath = substitute(primary["location"], pattern)
+ if pattern is not None:
+ sfpath = substitute(primary["location"], pattern)
+
required = builder.do_eval(required, context=primary)
if fsaccess.exists(sfpath):
- found.append({"location": sfpath, "class": "File"})
+ if pattern is not None:
+ found.append({"location": sfpath, "class": "File"})
+ else:
+ found.append(sf)
elif required:
raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
"Required secondary file '%s' does not exist" % sfpath)
textIO = StringIO(text.decode('utf-8'))
else:
textIO = StringIO(text)
- return yaml.safe_load(textIO)
+ yamlloader = YAML(typ='safe', pure=True)
+ return yamlloader.load(textIO)
else:
return {}
# that external references in $include and $mixin are captured.
scanobj = loadref("", workflowobj["id"])
+ metadata = scanobj
+
sc_result = scandeps(uri, scanobj,
- loadref_fields,
- set(("$include", "$schemas", "location")),
- loadref, urljoin=document_loader.fetcher.urljoin)
+ loadref_fields,
+ set(("$include", "location")),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ optional_deps = scandeps(uri, scanobj,
+ loadref_fields,
+ set(("$schemas",)),
+ loadref, urljoin=document_loader.fetcher.urljoin,
+ nestdirs=False)
+
+ sc_result.extend(optional_deps)
sc = []
uuids = {}
if include_primary and "id" in workflowobj:
sc.append({"class": "File", "location": workflowobj["id"]})
- if "$schemas" in workflowobj:
- for s in workflowobj["$schemas"]:
- sc.append({"class": "File", "location": s})
-
def visit_default(obj):
- remove = [False]
- def ensure_default_location(f):
+ def defaults_are_optional(f):
if "location" not in f and "path" in f:
f["location"] = f["path"]
del f["path"]
- if "location" in f and not arvrunner.fs_access.exists(f["location"]):
- # Doesn't exist, remove from list of dependencies to upload
- sc[:] = [x for x in sc if x["location"] != f["location"]]
- # Delete "default" from workflowobj
- remove[0] = True
- visit_class(obj["default"], ("File", "Directory"), ensure_default_location)
- if remove[0]:
- del obj["default"]
+ normalizeFilesDirs(f)
+ optional_deps.append(f)
+ visit_class(obj["default"], ("File", "Directory"), defaults_are_optional)
find_defaults(workflowobj, visit_default)
builder = make_builder(builder_job_order,
obj.get("hints", []),
obj.get("requirements", []),
- ArvRuntimeContext())
+ ArvRuntimeContext(),
+ metadata)
discover_secondary_files(arvrunner.fs_access,
builder,
obj["inputs"],
"keep:%s",
"keep:%s/%s",
name=name,
- single_collection=True)
+ single_collection=True,
+ optional_deps=optional_deps)
def setloc(p):
loc = p.get("location")
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
- sch.append(mapper.mapper(s).resolved)
+ if s in mapper:
+ sch.append(mapper.mapper(s).resolved)
workflowobj["$schemas"] = sch
return mapper
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
+ arvrunner.runtimeContext.force_docker_pull,
+ arvrunner.runtimeContext.tmp_outdir_prefix,
+ arvrunner.runtimeContext.match_local_docker)
else:
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs"}, True, arvrunner.project_uuid)
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
+ True, arvrunner.project_uuid,
+ arvrunner.runtimeContext.force_docker_pull,
+ arvrunner.runtimeContext.tmp_outdir_prefix,
+ arvrunner.runtimeContext.match_local_docker)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
upload_docker(arvrunner, s.embedded_tool)
def visit(v, cur_id):
if isinstance(v, dict):
- if v.get("class") in ("CommandLineTool", "Workflow"):
+ if v.get("class") in ("CommandLineTool", "Workflow", "ExpressionTool"):
if tool.metadata["cwlVersion"] == "v1.0" and "id" not in v:
raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field, add an 'id' or use to cwlVersion: v1.1")
if "id" in v:
if "path" in v and "location" not in v:
v["location"] = v["path"]
del v["path"]
- if "location" in v and not v["location"].startswith("keep:"):
- v["location"] = merged_map[cur_id].resolved[v["location"]]
- if "location" in v and v["location"] in merged_map[cur_id].secondaryFiles:
- v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
+ if "location" in v and cur_id in merged_map:
+ if v["location"] in merged_map[cur_id].resolved:
+ v["location"] = merged_map[cur_id].resolved[v["location"]]
+ if v["location"] in merged_map[cur_id].secondaryFiles:
+ v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
- v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True, arvrunner.project_uuid)
+ v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
+ arvrunner.project_uuid,
+ arvrunner.runtimeContext.force_docker_pull,
+ arvrunner.runtimeContext.tmp_outdir_prefix,
+ arvrunner.runtimeContext.match_local_docker)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
builder = make_builder(builder_job_order,
tool.hints,
tool.requirements,
- ArvRuntimeContext())
+ ArvRuntimeContext(),
+ tool.metadata)
# Now update job_order with secondaryFiles
discover_secondary_files(arvrunner.fs_access,
builder,
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
+ arvrunner.runtimeContext.force_docker_pull,
+ arvrunner.runtimeContext.tmp_outdir_prefix,
+ arvrunner.runtimeContext.match_local_docker)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )