self.wf_pdh = None
self.dynamic_resource_req = []
self.static_resource_req = []
+ self.wf_reffiles = []
def job(self, joborder, output_callback, **kwargs):
kwargs["work_api"] = self.work_api
builder.hints = workflowobj["hints"]
builder.resources = {}
- for item in packed["$graph"]:
+ def visit(item):
for t in ("hints", "requirements"):
+ if t not in item:
+ continue
for req in item[t]:
if req["class"] == "ResourceRequirement":
dyn = False
for k in max_res_pars + sum_res_pars:
- if isinstance(req[k], basestr):
- if item["id"] == "#main":
- # only the top-level requirements/hints may contain expressions
- self.dynamic_resource_req.append(req)
- dyn = True
- break
- else:
- with SourceLine(req, k, WorkflowException):
- raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
+ if k in req:
+ if isinstance(req[k], basestring):
+ if item["id"] == "#main":
+ # only the top-level requirements/hints may contain expressions
+ self.dynamic_resource_req.append(req)
+ dyn = True
+ break
+ else:
+ with SourceLine(req, k, WorkflowException):
+ raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
if not dyn:
self.static_resource_req.append(req)
+ visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
+
if self.static_resource_req:
self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
uri,
False)
+ # Discover files/directories referenced by the
+ # workflow (mainly "default" values)
+ visit_class(packed, ("File", "Directory"), self.wf_reffiles.append)
+
+
if self.dynamic_resource_req:
+ builder = Builder()
+ builder.job = joborder
+ builder.requirements = self.requirements
+ builder.hints = self.hints
+ builder.resources = {}
+
# Evaluate dynamic resource requirements using current builder
rs = copy.copy(self.static_resource_req)
for dyn_rs in self.dynamic_resource_req:
joborder_keepmount = copy.deepcopy(joborder)
reffiles = []
- visit_class(joborder_keepmount, ("File", "Directory"), lambda x: reffiles.append(x))
+ visit_class(joborder_keepmount, ("File", "Directory"), reffiles.append)
- mapper = ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
+ mapper = ArvPathMapper(self.arvrunner, reffiles+self.wf_reffiles, kwargs["basedir"],
"/keep/%s",
"/keep/%s/%s",
**kwargs)
+ # For containers API, we need to make sure any extra
+ # referenced files (ie referenced by the workflow but
+ # not in the inputs) are included in the mounts.
+ kwargs["extra_reffiles"] = copy.deepcopy(self.wf_reffiles)
+
def keepmount(obj):
remove_redundant_fields(obj)
with SourceLine(obj, None, WorkflowException, logger.isEnabledFor(logging.DEBUG)):