runtimeContext = self.job_runtime
- container_request = {
- "command": self.command_line,
- "name": self.name,
- "output_path": self.outdir,
- "cwd": self.outdir,
- "priority": runtimeContext.priority,
- "state": "Committed",
- "properties": {},
- }
+ if runtimeContext.submit_request_uuid:
+ container_request = self.arvrunner.api.container_requests().get(
+ uuid=runtimeContext.submit_request_uuid
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ container_request = {}
+
+ container_request["command"] = self.command_line
+ container_request["name"] = self.name
+ container_request["output_path"] = self.outdir
+ container_request["cwd"] = self.outdir
+ container_request["priority"] = runtimeContext.priority
+ container_request["state"] = "Committed"
+ container_request.setdefault("properties", {})
+
runtime_constraints = {}
if runtimeContext.project_uuid:
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
+ storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
+ if storage_class_req and storage_class_req.get("intermediateStorageClass"):
+ container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
+ else:
+ container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
+
if self.timelimit is not None and self.timelimit > 0:
scheduling_parameters["max_run_time"] = self.timelimit
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
+ properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
+ if properties_req:
+ for pr in properties_req["processProperties"]:
+ container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
+ if properties_req:
+ builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
+ for pr in properties_req["processProperties"]:
+ container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
# --local means execute the workflow instead of submitting a container request
# --api=containers means use the containers API
if runtimeContext.storage_classes != "default":
command.append("--storage-classes=" + runtimeContext.storage_classes)
+ if runtimeContext.intermediate_storage_classes != "default":
+ command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
+
if self.on_error:
command.append("--on-error=" + self.on_error)
logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
+ workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
+ workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
+ url = ""
+ if workbench2:
+ url = "{}processes/{}".format(workbench2, response["uuid"])
+ elif workbench1:
+ url = "{}container_requests/{}".format(workbench1, response["uuid"])
+ if url:
+ logger.info("Monitor workflow progress at %s", url)
+
+
def done(self, record):
try:
container = self.arvrunner.api.containers().get(