from functools import partial
import logging
import json
-import subprocess
+import subprocess32 as subprocess
from collections import namedtuple
from StringIO import StringIO
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
# that external references in $include and $mixin are captured.
scanobj = loadref("", workflowobj["id"])
- sc = scandeps(uri, scanobj,
+ sc_result = scandeps(uri, scanobj,
loadref_fields,
set(("$include", "$schemas", "location")),
loadref, urljoin=document_loader.fetcher.urljoin)
+ sc = []
+ def only_real(obj):
+ # Only interested in local files than need to be uploaded,
+ # don't include file literals, keep references, etc.
+ if obj.get("location", "").startswith("file:"):
+ sc.append(obj)
+
+ visit_class(sc_result, ("File", "Directory"), only_real)
+
normalizeFilesDirs(sc)
if include_primary and "id" in workflowobj:
visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
- for d in discovered:
- sc.extend(discovered[d])
+ for d in list(discovered.keys()):
+ # Only interested in discovered secondaryFiles which are local
+ # files that need to be uploaded.
+ if d.startswith("file:"):
+ sc.extend(discovered[d])
+ else:
+ del discovered[d]
mapper = ArvPathMapper(arvrunner, sc, "",
"keep:%s",
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
- (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+ (docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
# TODO: can be supported by containers API, but not jobs API.
packed = pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
tool.tool["id"], tool.metadata, rewrite_out=rewrites)
- rewrite_to_orig = {}
- for k,v in rewrites.items():
- rewrite_to_orig[v] = k
+ rewrite_to_orig = {v: k for k,v in rewrites.items()}
def visit(v, cur_id):
if isinstance(v, dict):
if v.get("class") in ("CommandLineTool", "Workflow"):
+ if "id" not in v:
+ raise SourceLine(v, None, Exception).makeError("Embedded process object is missing required 'id' field")
cur_id = rewrite_to_orig.get(v["id"], v["id"])
if "location" in v and not v["location"].startswith("keep:"):
v["location"] = merged_map[cur_id].resolved[v["location"]]
def __init__(self, runner, tool, job_order, enable_reuse,
output_name, output_tags, submit_runner_ram=0,
name=None, on_error=None, submit_runner_image=None,
- intermediate_output_ttl=0, merged_map=None, priority=None,
- secret_store=None):
+ intermediate_output_ttl=0, merged_map=None,
+ priority=None, secret_store=None):
self.arvrunner = runner
self.tool = tool
self.job_order = job_order
if enable_reuse:
# If reuse is permitted by command line arguments but
# disabled by the workflow itself, disable it.
- reuse_req, _ = get_feature(self.tool, "http://arvados.org/cwl#ReuseRequirement")
+ reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
self.enable_reuse = enable_reuse
self.priority = priority
self.secret_store = secret_store
+ self.submit_runner_cores = 1
+ self.submit_runner_ram = 1024 # defaut 1 GiB
+
+ runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ if runner_resource_req:
+ if runner_resource_req.get("coresMin"):
+ self.submit_runner_cores = runner_resource_req["coresMin"]
+ if runner_resource_req.get("ramMin"):
+ self.submit_runner_ram = runner_resource_req["ramMin"]
+
if submit_runner_ram:
+ # Command line / initializer overrides default and/or spec from workflow
self.submit_runner_ram = submit_runner_ram
- else:
- self.submit_runner_ram = 3000
if self.submit_runner_ram <= 0:
- raise Exception("Value of --submit-runner-ram must be greater than zero")
+ raise Exception("Value of submit-runner-ram must be greater than zero")
+
+ if self.submit_runner_cores <= 0:
+ raise Exception("Value of submit-runner-cores must be greater than zero")
self.merged_map = merged_map or {}
api_client=self.arvrunner.api,
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
+ done.logtail(logc, logger.error, "%s (%s) error log:" % (self.arvrunner.label(self), record["uuid"]), maxlen=40)
self.final_output = record["output"]
outc = arvados.collection.CollectionReader(self.final_output,