from cwltool.workflow import Workflow, WorkflowException
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.builder import Builder
+from cwltool.context import LoadingContext
import ruamel.yaml as yaml
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
+max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
+sum_res_pars = ("outdirMin", "outdirMax")
+
def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
submit_runner_ram=0, name=None, merged_map=None):
dedup[r["class"]] = r
return [dedup[r] for r in sorted(dedup.keys())]
-def get_max_res_req(res_reqs):
- """Take the max of a list of ResourceRequirement."""
+def get_overall_res_req(res_reqs):
+ """Take the overall of a list of ResourceRequirement,
+ i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
+ and the sum of outdirMin, outdirMax."""
- total_res_req = {}
+ all_res_req = {}
exception_msgs = []
- for a in ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"):
- total_res_req[a] = []
+ for a in max_res_pars + sum_res_pars:
+ all_res_req[a] = []
for res_req in res_reqs:
if a in res_req:
if isinstance(res_req[a], int): # integer check
- total_res_req[a].append(res_req[a])
+ all_res_req[a].append(res_req[a])
else:
- msg = SourceLine(res_req).makeError(
+ msg = SourceLine(res_req, a).makeError(
"Non-top-level ResourceRequirement in single container cannot have expressions")
exception_msgs.append(msg)
if exception_msgs:
raise WorkflowException("\n".join(exception_msgs))
else:
- max_res_req = {}
- for a in total_res_req:
- if total_res_req[a]:
- max_res_req[a] = max(total_res_req[a])
- if max_res_req:
- max_res_req["class"] = "ResourceRequirement"
- return cmap(max_res_req)
+ overall_res_req = {}
+ for a in all_res_req:
+ if all_res_req[a]:
+ if a in max_res_pars:
+ overall_res_req[a] = max(all_res_req[a])
+ elif a in sum_res_pars:
+ overall_res_req[a] = sum(all_res_req[a])
+ if overall_res_req:
+ overall_res_req["class"] = "ResourceRequirement"
+ return cmap(overall_res_req)
class ArvadosWorkflow(Workflow):
"""Wrap cwltool Workflow to override selected methods."""
- def __init__(self, arvrunner, toolpath_object, **kwargs):
- super(ArvadosWorkflow, self).__init__(toolpath_object, **kwargs)
+ def __init__(self, arvrunner, toolpath_object, loadingContext):
+ super(ArvadosWorkflow, self).__init__(toolpath_object, loadingContext)
self.arvrunner = arvrunner
- self.work_api = kwargs["work_api"]
self.wf_pdh = None
+ self.dynamic_resource_req = []
+ self.static_resource_req = []
+ self.wf_reffiles = []
+ self.loadingContext = loadingContext
- def job(self, joborder, output_callback, **kwargs):
- kwargs["work_api"] = self.work_api
+ def job(self, joborder, output_callback, runtimeContext):
req, _ = self.get_requirement("http://arvados.org/cwl#RunInSingleContainer")
if req:
with SourceLine(self.tool, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
packed = pack(document_loader, workflowobj, uri, self.metadata)
- builder = Builder()
- builder.job = joborder
- builder.requirements = self.requirements
- builder.hints = self.hints
- builder.resources = {}
-
- res_reqs = {"requirements": [], "hints": []}
- for t in ("requirements", "hints"):
- for item in packed["$graph"]:
- if t in item:
- if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- eval_req = {"class": "ResourceRequirement"}
- for a in ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"):
- if a in req:
- eval_req[a] = builder.do_eval(req[a])
- res_reqs[t].append(eval_req)
- else:
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- res_reqs[t].append(req)
- max_res_req = {"requirements": get_max_res_req(res_reqs["requirements"]),
- "hints": get_max_res_req(res_reqs["hints"])}
-
- new_spec = {"requirements": self.requirements, "hints": self.hints}
- for t in ("requirements", "hints"):
- for req in new_spec[t]:
- if req["class"] == "ResourceRequirement":
- new_spec[t].remove(req)
- if max_res_req[t]:
- new_spec[t].append(max_res_req[t])
+ builder = Builder(joborder,
+ requirements=workflowobj["requirements"],
+ hints=workflowobj["hints"],
+ resources={})
+
+ def visit(item):
+ for t in ("hints", "requirements"):
+ if t not in item:
+ continue
+ for req in item[t]:
+ if req["class"] == "ResourceRequirement":
+ dyn = False
+ for k in max_res_pars + sum_res_pars:
+ if k in req:
+ if isinstance(req[k], basestring):
+ if item["id"] == "#main":
+ # only the top-level requirements/hints may contain expressions
+ self.dynamic_resource_req.append(req)
+ dyn = True
+ break
+ else:
+ with SourceLine(req, k, WorkflowException):
+ raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
+ if not dyn:
+ self.static_resource_req.append(req)
+
+ visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
+
+ if self.static_resource_req:
+ self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
+ runtimeContext.name,
document_loader,
packed,
uri,
False)
+ # Discover files/directories referenced by the
+ # workflow (mainly "default" values)
+ visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
+
+
+ if self.dynamic_resource_req:
+ builder = Builder()
+ builder.job = joborder
+ builder.requirements = self.requirements
+ builder.hints = self.hints
+ builder.resources = {}
+
+ # Evaluate dynamic resource requirements using current builder
+ rs = copy.copy(self.static_resource_req)
+ for dyn_rs in self.dynamic_resource_req:
+ eval_req = {"class": "ResourceRequirement"}
+ for a in max_res_pars + sum_res_pars:
+ if a in dyn_rs:
+ eval_req[a] = builder.do_eval(dyn_rs[a])
+ rs.append(eval_req)
+ job_res_reqs = [get_overall_res_req(rs)]
+ else:
+ job_res_reqs = self.static_resource_req
+
with Perf(metrics, "subworkflow adjust"):
joborder_resolved = copy.deepcopy(joborder)
joborder_keepmount = copy.deepcopy(joborder)
reffiles = []
- visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
+ visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
+
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, runtimeContext.basedir,
+ "/keep/%s",
+ "/keep/%s/%s")
- mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
+ # For containers API, we need to make sure any extra
+ # referenced files (ie referenced by the workflow but
+ # not in the inputs) are included in the mounts.
+ if self.wf_reffiles:
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.extra_reffiles = copy.deepcopy(self.wf_reffiles)
def keepmount(obj):
remove_redundant_fields(obj)
"inputs": self.tool["inputs"],
"outputs": self.tool["outputs"],
"stdout": "cwl.output.json",
- "requirements": new_spec["requirements"]+[
+ "requirements": self.requirements+job_res_reqs+[
+ {"class": "InlineJavascriptRequirement"},
{
"class": "InitialWorkDirRequirement",
"listing": [{
"entryname": "workflow.cwl",
- "entry": {
- "class": "File",
- "location": "keep:%s/workflow.cwl" % self.wf_pdh
- }
+ "entry": '$({"class": "File", "location": "keep:%s/workflow.cwl"})' % self.wf_pdh
}, {
"entryname": "cwl.input.yml",
"entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
}]
}],
- "hints": new_spec["hints"],
+ "hints": self.hints,
"arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
"id": "#"
})
- kwargs["loader"] = self.doc_loader
- kwargs["avsc_names"] = self.doc_schema
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
+ return ArvadosCommandTool(self.arvrunner, wf_runner, self.loadingContext).job(joborder_resolved, output_callback, runtimeContext)
else:
- return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
+ return super(ArvadosWorkflow, self).job(joborder, output_callback, runtimeContext)