from .arvdocker import arv_docker_get_image
from . import done
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
from .fsaccess import CollectionFetcher
from .pathmapper import NoFollowPathMapper, trim_listing
from .perf import Perf
def update_pipeline_component(self, r):
pass
+ def _required_env(self):
+ env = {}
+ env["HOME"] = self.outdir
+ env["TMPDIR"] = self.tmpdir
+ return env
+
def run(self, runtimeContext):
# ArvadosCommandTool subclasses from cwltool.CommandLineTool,
# which calls makeJobRunner() to get a new ArvadosContainer
runtimeContext = self.job_runtime
- container_request = {
- "command": self.command_line,
- "name": self.name,
- "output_path": self.outdir,
- "cwd": self.outdir,
- "priority": runtimeContext.priority,
- "state": "Committed",
- "properties": {},
- }
+ if runtimeContext.submit_request_uuid:
+ container_request = self.arvrunner.api.container_requests().get(
+ uuid=runtimeContext.submit_request_uuid
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ container_request = {}
+
+ container_request["command"] = self.command_line
+ container_request["name"] = self.name
+ container_request["output_path"] = self.outdir
+ container_request["cwd"] = self.outdir
+ container_request["priority"] = runtimeContext.priority
+ container_request["state"] = "Committed"
+ container_request.setdefault("properties", {})
+
runtime_constraints = {}
if runtimeContext.project_uuid:
"path": "%s/%s" % (self.outdir, self.stdout)}
(docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
- if not docker_req:
- docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
runtimeContext.pull_image,
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix)
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
+
+ if self.arvrunner.api._rootDesc["revision"] >= "20210628":
+ storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
+ if storage_class_req and storage_class_req.get("intermediateStorageClass"):
+ container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
+ else:
+ container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
+
+ cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
+ if cuda_req:
+ runtime_constraints["cuda"] = {
+ "device_count": resources.get("cudaDeviceCount", 1),
+ "driver_version": cuda_req["cudaVersionMin"],
+ "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
+ }
+
+ if runtimeContext.enable_preemptible is False:
+ scheduling_parameters["preemptible"] = False
+ else:
+ preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
+ if preemptible_req:
+ scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
+ elif runtimeContext.enable_preemptible is True:
+ scheduling_parameters["preemptible"] = True
+ elif runtimeContext.enable_preemptible is None:
+ pass
+
if self.timelimit is not None and self.timelimit > 0:
scheduling_parameters["max_run_time"] = self.timelimit
enable_reuse = reuse_req["enableReuse"]
container_request["use_existing"] = enable_reuse
+ properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
+ if properties_req:
+ for pr in properties_req["processProperties"]:
+ container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
+ if properties_req:
+ builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
+ for pr in properties_req["processProperties"]:
+ container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
# --local means execute the workflow instead of submitting a container request
# --api=containers means use the containers API
if runtimeContext.debug:
command.append("--debug")
- if runtimeContext.storage_classes != "default":
+ if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
command.append("--storage-classes=" + runtimeContext.storage_classes)
+ if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
+ command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
+
if self.on_error:
command.append("--on-error=" + self.on_error)
if self.enable_dev:
command.append("--enable-dev")
+ if runtimeContext.enable_preemptible is True:
+ command.append("--enable-preemptible")
+
+ if runtimeContext.enable_preemptible is False:
+ command.append("--disable-preemptible")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
+ workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
+ workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
+ url = ""
+ if workbench2:
+ url = "{}processes/{}".format(workbench2, response["uuid"])
+ elif workbench1:
+ url = "{}container_requests/{}".format(workbench1, response["uuid"])
+ if url:
+ logger.info("Monitor workflow progress at %s", url)
+
+
def done(self, record):
try:
container = self.arvrunner.api.containers().get(