- if self.wf_pdh is None:
- workflowobj["requirements"] = dedup_reqs(self.requirements)
- workflowobj["hints"] = dedup_reqs(self.hints)
-
- packed = pack(document_loader, workflowobj, uri, self.metadata)
-
- builder = Builder()
- builder.job = joborder
- builder.requirements = workflowobj["requirements"]
- builder.hints = workflowobj["hints"]
- builder.resources = {}
-
- res_reqs = {"requirements": [], "hints": []}
- for t in ("requirements", "hints"):
- for item in packed["$graph"]:
- if t in item:
- if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- eval_req = {"class": "ResourceRequirement"}
- for a in max_res_pars + sum_res_pars:
- if a in req:
- eval_req[a] = builder.do_eval(req[a])
- res_reqs[t].append(eval_req)
- else:
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- res_reqs[t].append(req)
- overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]),
- "hints": get_overall_res_req(res_reqs["hints"])}
-
- new_spec = {"requirements": self.requirements, "hints": self.hints}
- for t in ("requirements", "hints"):
- for req in new_spec[t]:
- if req["class"] == "ResourceRequirement":
- new_spec[t].remove(req)
- if overall_res_req[t]:
- new_spec[t].append(overall_res_req[t])
-
- upload_dependencies(self.arvrunner,
- kwargs.get("name", ""),
- document_loader,
- packed,
- uri,
- False)
-
- with Perf(metrics, "subworkflow adjust"):
- joborder_resolved = copy.deepcopy(joborder)
- joborder_keepmount = copy.deepcopy(joborder)
-
- reffiles = []
- visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
-
- mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
- "/keep/%s",
- "/keep/%s/%s",
- **kwargs)
-
- def keepmount(obj):
- remove_redundant_fields(obj)
- with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if "location" not in obj:
- raise WorkflowException("%s object is missing required 'location' field: %s" % (obj["class"], obj))
- with SourceLine(obj, "location", WorkflowException, logger.isEnabledFor(logging.DEBUG)):
- if obj["location"].startswith("keep:"):
- obj["location"] = mapper.mapper(obj["location"]).target
- if "listing" in obj:
- del obj["listing"]
- elif obj["location"].startswith("_:"):
- del obj["location"]
- else:
- raise WorkflowException("Location is not a keep reference or a literal: '%s'" % obj["location"])
-
- visit_class(joborder_keepmount, ("File", "Directory"), keepmount)
-
- def resolved(obj):
- if obj["location"].startswith("keep:"):
- obj["location"] = mapper.mapper(obj["location"]).resolved
-
- visit_class(joborder_resolved, ("File", "Directory"), resolved)
-
- if self.wf_pdh is None:
- adjustFileObjs(packed, keepmount)
- adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
-
- wf_runner = cmap({
- "class": "CommandLineTool",
- "baseCommand": "cwltool",
- "inputs": self.tool["inputs"],
- "outputs": self.tool["outputs"],
- "stdout": "cwl.output.json",
- "requirements": self.requirements+[
- {
- "class": "InitialWorkDirRequirement",
- "listing": [{
- "entryname": "workflow.cwl",
- "entry": {
- "class": "File",
- "location": "keep:%s/workflow.cwl" % self.wf_pdh
- }
- }, {
- "entryname": "cwl.input.yml",
- "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
- }]
- }],
- "hints": self.hints,
- "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
- "id": "#"
- })
- kwargs["loader"] = self.doc_loader
- kwargs["avsc_names"] = self.doc_schema
- return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)