X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/f170c5a75f22a6db11ca93eed5b0dfc9c65c4270..876b9e64d1364770486552060222f5f6b1b5e2ea:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index c9170c51b7..ae3c668895 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -28,7 +28,7 @@ import arvados.collection from .arvdocker import arv_docker_get_image from . import done -from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields +from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder from .fsaccess import CollectionFetcher from .pathmapper import NoFollowPathMapper, trim_listing from .perf import Perf @@ -57,6 +57,12 @@ class ArvadosContainer(JobBase): def update_pipeline_component(self, r): pass + def _required_env(self): + env = {} + env["HOME"] = self.outdir + env["TMPDIR"] = self.tmpdir + return env + def run(self, runtimeContext): # ArvadosCommandTool subclasses from cwltool.CommandLineTool, # which calls makeJobRunner() to get a new ArvadosContainer @@ -67,15 +73,21 @@ class ArvadosContainer(JobBase): runtimeContext = self.job_runtime - container_request = { - "command": self.command_line, - "name": self.name, - "output_path": self.outdir, - "cwd": self.outdir, - "priority": runtimeContext.priority, - "state": "Committed", - "properties": {}, - } + if runtimeContext.submit_request_uuid: + container_request = self.arvrunner.api.container_requests().get( + uuid=runtimeContext.submit_request_uuid + ).execute(num_retries=self.arvrunner.num_retries) + else: + container_request = {} + + container_request["command"] = self.command_line + container_request["name"] = self.name + container_request["output_path"] = self.outdir + container_request["cwd"] = self.outdir + container_request["priority"] = runtimeContext.priority + container_request["state"] = "Committed" + container_request.setdefault("properties", {}) + runtime_constraints = {} if runtimeContext.project_uuid: @@ -228,8 +240,6 @@ class ArvadosContainer(JobBase): "path": "%s/%s" % (self.outdir, self.stdout)} (docker_req, docker_is_req) = self.get_requirement("DockerRequirement") - if not docker_req: - docker_req = {"dockerImageId": "arvados/jobs:"+__version__} container_request["container_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, @@ -273,11 +283,13 @@ class ArvadosContainer(JobBase): if self.output_ttl < 0: raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"]) - storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass") - if storage_class_req and storage_class_req.get("intermediateStorageClass"): - container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"]) - else: - container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",") + + if self.arvrunner.api._rootDesc["revision"] >= "20210628": + storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass") + if storage_class_req and storage_class_req.get("intermediateStorageClass"): + container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"]) + else: + container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",") if self.timelimit is not None and self.timelimit > 0: scheduling_parameters["max_run_time"] = self.timelimit @@ -303,6 +315,11 @@ class ArvadosContainer(JobBase): enable_reuse = reuse_req["enableReuse"] container_request["use_existing"] = enable_reuse + properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties") + if properties_req: + for pr in properties_req["processProperties"]: + container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) @@ -468,6 +485,11 @@ class RunnerContainer(Runner): if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") + if properties_req: + builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata) + for pr in properties_req["processProperties"]: + container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"]) # --local means execute the workflow instead of submitting a container request # --api=containers means use the containers API @@ -498,10 +520,10 @@ class RunnerContainer(Runner): if runtimeContext.debug: command.append("--debug") - if runtimeContext.storage_classes != "default": + if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes: command.append("--storage-classes=" + runtimeContext.storage_classes) - if runtimeContext.intermediate_storage_classes != "default": + if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes: command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) if self.on_error: