From: Peter Amstutz Date: Fri, 30 Mar 2018 20:12:49 +0000 (-0400) Subject: 13301: Correctly handle dynamically computed resources with RunInSingleContainer X-Git-Tag: 1.1.4~13^2~3 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/9a92b8605be087ea76f2db614ea62173578948d9?hp=2baac8a6702551064b065f752f75a9b40dbec0f5 13301: Correctly handle dynamically computed resources with RunInSingleContainer Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 5aed871a12..99e8652d21 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -89,7 +89,7 @@ def get_overall_res_req(res_reqs): if isinstance(res_req[a], int): # integer check all_res_req[a].append(res_req[a]) else: - msg = SourceLine(res_req).makeError( + msg = SourceLine(res_req, a).makeError( "Non-top-level ResourceRequirement in single container cannot have expressions") exception_msgs.append(msg) if exception_msgs: @@ -114,6 +114,8 @@ class ArvadosWorkflow(Workflow): self.arvrunner = arvrunner self.work_api = kwargs["work_api"] self.wf_pdh = None + self.dynamic_resource_req = [] + self.static_resource_req = [] def job(self, joborder, output_callback, **kwargs): kwargs["work_api"] = self.work_api @@ -146,32 +148,26 @@ class ArvadosWorkflow(Workflow): builder.hints = workflowobj["hints"] builder.resources = {} - res_reqs = {"requirements": [], "hints": []} - for t in ("requirements", "hints"): - for item in packed["$graph"]: - if t in item: - if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints - for req in item[t]: - if req["class"] == "ResourceRequirement": - eval_req = {"class": "ResourceRequirement"} - for a in max_res_pars + sum_res_pars: - if a in req: - eval_req[a] = builder.do_eval(req[a]) - res_reqs[t].append(eval_req) - else: - for req in item[t]: - if req["class"] == "ResourceRequirement": - res_reqs[t].append(req) - overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]), - "hints": get_overall_res_req(res_reqs["hints"])} - - new_spec = {"requirements": self.requirements, "hints": self.hints} - for t in ("requirements", "hints"): - for req in new_spec[t]: - if req["class"] == "ResourceRequirement": - new_spec[t].remove(req) - if overall_res_req[t]: - new_spec[t].append(overall_res_req[t]) + for item in packed["$graph"]: + for t in ("hints", "requirements"): + for req in item[t]: + if req["class"] == "ResourceRequirement": + dyn = False + for k in max_res_pars + sum_res_pars: + if isinstance(req[k], basestr): + if item["id"] == "#main": + # only the top-level requirements/hints may contain expressions + self.dynamic_resource_req.append(req) + dyn = True + break + else: + with SourceLine(req, k, WorkflowException): + raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions") + if not dyn: + self.static_resource_req.append(req) + + if self.static_resource_req: + self.static_resource_req = [get_overall_res_req(self.static_resource_req)] upload_dependencies(self.arvrunner, kwargs.get("name", ""), @@ -180,6 +176,19 @@ class ArvadosWorkflow(Workflow): uri, False) + if self.dynamic_resource_req: + # Evaluate dynamic resource requirements using current builder + rs = copy.copy(self.static_resource_req) + for dyn_rs in self.dynamic_resource_req: + eval_req = {"class": "ResourceRequirement"} + for a in max_res_pars + sum_res_pars: + if a in dyn_rs: + eval_req[a] = builder.do_eval(dyn_rs[a]) + rs.append(eval_req) + job_res_reqs = [get_overall_res_req(rs)] + else: + job_res_reqs = self.static_resource_req + with Perf(metrics, "subworkflow adjust"): joborder_resolved = copy.deepcopy(joborder) joborder_keepmount = copy.deepcopy(joborder) @@ -226,7 +235,7 @@ class ArvadosWorkflow(Workflow): "inputs": self.tool["inputs"], "outputs": self.tool["outputs"], "stdout": "cwl.output.json", - "requirements": self.requirements+[ + "requirements": self.requirements+job_res_reqs+[ { "class": "InitialWorkDirRequirement", "listing": [{