+def dedup_reqs(reqs):
+ dedup = {}
+ for r in reversed(reqs):
+ if r["class"] not in dedup and not r["class"].startswith("http://arvados.org/cwl#"):
+ dedup[r["class"]] = r
+ return [dedup[r] for r in sorted(dedup.keys())]
+
+def get_overall_res_req(res_reqs):
+ """Take the overall of a list of ResourceRequirement,
+ i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
+ and the sum of outdirMin, outdirMax."""
+
+ all_res_req = {}
+ exception_msgs = []
+ for a in max_res_pars + sum_res_pars:
+ all_res_req[a] = []
+ for res_req in res_reqs:
+ if a in res_req:
+ if isinstance(res_req[a], int): # integer check
+ all_res_req[a].append(res_req[a])
+ else:
+ msg = SourceLine(res_req, a).makeError(
+ "Non-top-level ResourceRequirement in single container cannot have expressions")
+ exception_msgs.append(msg)
+ if exception_msgs:
+ raise WorkflowException("\n".join(exception_msgs))
+ else:
+ overall_res_req = {}
+ for a in all_res_req:
+ if all_res_req[a]:
+ if a in max_res_pars:
+ overall_res_req[a] = max(all_res_req[a])
+ elif a in sum_res_pars:
+ overall_res_req[a] = sum(all_res_req[a])
+ if overall_res_req:
+ overall_res_req["class"] = "ResourceRequirement"
+ return cmap(overall_res_req)
+
+class ArvadosWorkflowStep(WorkflowStep):
+ def __init__(self,
+ toolpath_object, # type: Dict[Text, Any]
+ pos, # type: int
+ loadingContext, # type: LoadingContext
+ arvrunner,
+ *argc,
+ **argv
+ ): # type: (...) -> None
+
+ super(ArvadosWorkflowStep, self).__init__(toolpath_object, pos, loadingContext, *argc, **argv)
+ self.tool["class"] = "WorkflowStep"
+ self.arvrunner = arvrunner
+
+ def job(self, joborder, output_callback, runtimeContext):
+ runtimeContext = runtimeContext.copy()
+ runtimeContext.toplevel = True # Preserve behavior for #13365
+
+ builder = make_builder({shortname(k): v for k,v in viewitems(joborder)}, self.hints, self.requirements,
+ runtimeContext, self.metadata)
+ runtimeContext = set_cluster_target(self.tool, self.arvrunner, builder, runtimeContext)
+ return super(ArvadosWorkflowStep, self).job(joborder, output_callback, runtimeContext)
+
+