from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
from .fsaccess import CollectionFetcher
from .pathmapper import NoFollowPathMapper, trim_listing
from .perf import Perf
def update_pipeline_component(self, r):
pass
- def run(self, runtimeContext):
+ def _required_env(self):
+ env = {}
+ env["HOME"] = self.outdir
+ env["TMPDIR"] = self.tmpdir
+ return env
+
+ def run(self, toplevelRuntimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
# object. The fields that define execution such as
runtimeContext = self.job_runtime
- container_request = {
- "command": self.command_line,
- "name": self.name,
- "output_path": self.outdir,
- "cwd": self.outdir,
- "priority": runtimeContext.priority,
- "state": "Committed",
- "properties": {},
- }
+ if runtimeContext.submit_request_uuid:
+ container_request = self.arvrunner.api.container_requests().get(
+ uuid=runtimeContext.submit_request_uuid
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ container_request = {}
+
+ container_request["command"] = self.command_line
+ container_request["name"] = self.name
+ container_request["output_path"] = self.outdir
+ container_request["cwd"] = self.outdir
+ container_request["priority"] = runtimeContext.priority
+ container_request["state"] = "Committed"
+ container_request.setdefault("properties", {})
+
runtime_constraints = {}
if runtimeContext.project_uuid:
"path": "%s/%s" % (self.outdir, self.stdout)}
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if not docker_req:
- docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
runtimeContext.pull_image,
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix)
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
- storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
- if storage_class_req and storage_class_req.get("intermediateStorageClass"):
- container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
+
+ if self.arvrunner.api._rootDesc["revision"] >= "20210628":
+ storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
+ if storage_class_req and storage_class_req.get("intermediateStorageClass"):
+ container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
+ else:
+ container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
+
+ cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
+ if cuda_req:
+ runtime_constraints["cuda"] = {
+ "device_count": resources.get("cudaDeviceCount", 1),
+ "driver_version": cuda_req["cudaVersionMin"],
+ "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
+ }
+
+ if runtimeContext.enable_preemptible is False:
+ scheduling_parameters["preemptible"] = False
else:
- container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
+ preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
+ if preemptible_req:
+ scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
+ elif runtimeContext.enable_preemptible is True:
+ scheduling_parameters["preemptible"] = True
+ elif runtimeContext.enable_preemptible is None:
+ pass
if self.timelimit is not None and self.timelimit > 0:
scheduling_parameters["max_run_time"] = self.timelimit
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
+ properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
+ if properties_req:
+ for pr in properties_req["processProperties"]:
+ container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
processStatus = "success"
else:
processStatus = "permanentFail"
+
+ if rcode == 137:
+ logger.warning("%s job was killed on the compute instance. The most common reason is that it attempted to allocate too much RAM and was targeted by the Out Of Memory (OOM) killer. Try resubmitting with a higher 'ramMin'.",
+ self.arvrunner.label(self))
else:
processStatus = "permanentFail"
if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
+ if properties_req:
+ builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
+ for pr in properties_req["processProperties"]:
+ container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
# --local means execute the workflow instead of submitting a container request
# --api=containers means use the containers API
if runtimeContext.debug:
command.append("--debug")
- if runtimeContext.storage_classes != "default":
+ if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
command.append("--storage-classes=" + runtimeContext.storage_classes)
- if runtimeContext.intermediate_storage_classes != "default":
+ if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
if self.on_error:
if self.enable_dev:
command.append("--enable-dev")
+ if runtimeContext.enable_preemptible is True:
+ command.append("--enable-preemptible")
+
+ if runtimeContext.enable_preemptible is False:
+ command.append("--disable-preemptible")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command