X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6346a7c4c0cb5d7e8c5f01392b6cc64d329b68ec..8680023bb3804d3bd1f2dcac7d1a86ff14053fca:/sdk/cwl/arvados_cwl/arvworkflow.py diff --git a/sdk/cwl/arvados_cwl/arvworkflow.py b/sdk/cwl/arvados_cwl/arvworkflow.py index 5aed871a12..bdc2e274b0 100644 --- a/sdk/cwl/arvados_cwl/arvworkflow.py +++ b/sdk/cwl/arvados_cwl/arvworkflow.py @@ -89,7 +89,7 @@ def get_overall_res_req(res_reqs): if isinstance(res_req[a], int): # integer check all_res_req[a].append(res_req[a]) else: - msg = SourceLine(res_req).makeError( + msg = SourceLine(res_req, a).makeError( "Non-top-level ResourceRequirement in single container cannot have expressions") exception_msgs.append(msg) if exception_msgs: @@ -114,6 +114,8 @@ class ArvadosWorkflow(Workflow): self.arvrunner = arvrunner self.work_api = kwargs["work_api"] self.wf_pdh = None + self.dynamic_resource_req = [] + self.static_resource_req = [] def job(self, joborder, output_callback, **kwargs): kwargs["work_api"] = self.work_api @@ -146,32 +148,31 @@ class ArvadosWorkflow(Workflow): builder.hints = workflowobj["hints"] builder.resources = {} - res_reqs = {"requirements": [], "hints": []} - for t in ("requirements", "hints"): - for item in packed["$graph"]: - if t in item: - if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints - for req in item[t]: - if req["class"] == "ResourceRequirement": - eval_req = {"class": "ResourceRequirement"} - for a in max_res_pars + sum_res_pars: - if a in req: - eval_req[a] = builder.do_eval(req[a]) - res_reqs[t].append(eval_req) - else: - for req in item[t]: - if req["class"] == "ResourceRequirement": - res_reqs[t].append(req) - overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]), - "hints": get_overall_res_req(res_reqs["hints"])} - - new_spec = {"requirements": self.requirements, "hints": self.hints} - for t in ("requirements", "hints"): - for req in new_spec[t]: - if req["class"] == "ResourceRequirement": - new_spec[t].remove(req) - if overall_res_req[t]: - new_spec[t].append(overall_res_req[t]) + def visit(item): + for t in ("hints", "requirements"): + if t not in item: + continue + for req in item[t]: + if req["class"] == "ResourceRequirement": + dyn = False + for k in max_res_pars + sum_res_pars: + if k in req: + if isinstance(req[k], basestring): + if item["id"] == "#main": + # only the top-level requirements/hints may contain expressions + self.dynamic_resource_req.append(req) + dyn = True + break + else: + with SourceLine(req, k, WorkflowException): + raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions") + if not dyn: + self.static_resource_req.append(req) + + visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit) + + if self.static_resource_req: + self.static_resource_req = [get_overall_res_req(self.static_resource_req)] upload_dependencies(self.arvrunner, kwargs.get("name", ""), @@ -180,6 +181,25 @@ class ArvadosWorkflow(Workflow): uri, False) + if self.dynamic_resource_req: + builder = Builder() + builder.job = joborder + builder.requirements = self.requirements + builder.hints = self.hints + builder.resources = {} + + # Evaluate dynamic resource requirements using current builder + rs = copy.copy(self.static_resource_req) + for dyn_rs in self.dynamic_resource_req: + eval_req = {"class": "ResourceRequirement"} + for a in max_res_pars + sum_res_pars: + if a in dyn_rs: + eval_req[a] = builder.do_eval(dyn_rs[a]) + rs.append(eval_req) + job_res_reqs = [get_overall_res_req(rs)] + else: + job_res_reqs = self.static_resource_req + with Perf(metrics, "subworkflow adjust"): joborder_resolved = copy.deepcopy(joborder) joborder_keepmount = copy.deepcopy(joborder) @@ -226,7 +246,7 @@ class ArvadosWorkflow(Workflow): "inputs": self.tool["inputs"], "outputs": self.tool["outputs"], "stdout": "cwl.output.json", - "requirements": self.requirements+[ + "requirements": self.requirements+job_res_reqs+[ { "class": "InitialWorkDirRequirement", "listing": [{