X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1441208182ced30e28f05611fbfa51674570dd1e..ba34a22d9918ae97306472c04701e69090821c82:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index ae3c668895..6fcf366e02 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -37,6 +37,9 @@ from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +def cleanup_name_for_collection(name): + return name.replace("/", " ") + class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" @@ -63,7 +66,7 @@ class ArvadosContainer(JobBase): env["TMPDIR"] = self.tmpdir return env - def run(self, runtimeContext): + def run(self, toplevelRuntimeContext): # ArvadosCommandTool subclasses from cwltool.CommandLineTool, # which calls makeJobRunner() to get a new ArvadosContainer # object. The fields that define execution such as @@ -88,6 +91,8 @@ class ArvadosContainer(JobBase): container_request["state"] = "Committed" container_request.setdefault("properties", {}) + container_request["properties"]["cwl_input"] = self.joborder + runtime_constraints = {} if runtimeContext.project_uuid: @@ -146,6 +151,8 @@ class ArvadosContainer(JobBase): mounts[targetdir]["path"] = path prevdir = targetdir + "/" + intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + with Perf(metrics, "generatefiles %s" % self.name): if self.generatefiles["listing"]: vwd = arvados.collection.Collection(api_client=self.arvrunner.api, @@ -197,12 +204,11 @@ class ArvadosContainer(JobBase): if not runtimeContext.current_container: runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) - info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) - vwd.save_new(name=info["name"], + vwd.save_new(name=intermediate_collection_info["name"], owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, - trash_at=info["trash_at"], - properties=info["properties"]) + trash_at=intermediate_collection_info["trash_at"], + properties=intermediate_collection_info["properties"]) prev = None for f, p in sorteditems: @@ -246,7 +252,9 @@ class ArvadosContainer(JobBase): runtimeContext.pull_image, runtimeContext.project_uuid, runtimeContext.force_docker_pull, - runtimeContext.tmp_outdir_prefix) + runtimeContext.tmp_outdir_prefix, + runtimeContext.match_local_docker, + runtimeContext.copy_deps) network_req, _ = self.get_requirement("NetworkAccess") if network_req: @@ -259,7 +267,11 @@ class ArvadosContainer(JobBase): runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: - runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) + if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0: + # If DefaultKeepCacheDisk is non-zero it means we should use disk cache. + runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20) + else: + runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) if "outputDirType" in runtime_req: if runtime_req["outputDirType"] == "local_output_dir": # Currently the default behavior. @@ -291,6 +303,25 @@ class ArvadosContainer(JobBase): else: container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",") + cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement") + if cuda_req: + runtime_constraints["cuda"] = { + "device_count": resources.get("cudaDeviceCount", 1), + "driver_version": cuda_req["cudaVersionMin"], + "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0] + } + + if runtimeContext.enable_preemptible is False: + scheduling_parameters["preemptible"] = False + else: + preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible") + if preemptible_req: + scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"] + elif runtimeContext.enable_preemptible is True: + scheduling_parameters["preemptible"] = True + elif runtimeContext.enable_preemptible is None: + pass + if self.timelimit is not None and self.timelimit > 0: scheduling_parameters["max_run_time"] = self.timelimit @@ -298,7 +329,7 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_runner_cluster: extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster - container_request["output_name"] = "Output for step %s" % (self.name) + container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name)) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -320,6 +351,16 @@ class ArvadosContainer(JobBase): for pr in properties_req["processProperties"]: container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties") + if output_properties_req: + if self.arvrunner.api._rootDesc["revision"] >= "20220510": + container_request["output_properties"] = {} + for pr in output_properties_req["outputProperties"]: + container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + else: + logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", + self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") + if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) @@ -372,6 +413,10 @@ class ArvadosContainer(JobBase): processStatus = "success" else: processStatus = "permanentFail" + + if rcode == 137: + logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.", + self.arvrunner.label(self)) else: processStatus = "permanentFail" @@ -398,6 +443,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") + + properties = record["properties"].copy() + properties["cwl_output"] = outputs + self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body={"container_request": {"properties": properties}} + ).execute(num_retries=self.arvrunner.num_retries) except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -414,7 +466,7 @@ class ArvadosContainer(JobBase): class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, runtimeContext): + def arvados_job_spec(self, runtimeContext, git_info): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -441,7 +493,7 @@ class RunnerContainer(Runner): "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image), + "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -475,15 +527,24 @@ class RunnerContainer(Runner): "kind": "collection", "portable_data_hash": "%s" % workflowcollection } + elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + workflowpath = "/var/lib/cwl/workflow.json#main" + record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries) + packed = yaml.safe_load(record["definition"]) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "content": packed + } + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", "content": packed } - if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + + container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()}) properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") if properties_req: @@ -526,21 +587,33 @@ class RunnerContainer(Runner): if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes: command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) - if self.on_error: + if runtimeContext.on_error: command.append("--on-error=" + self.on_error) - if self.intermediate_output_ttl: - command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl) + if runtimeContext.intermediate_output_ttl: + command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl) - if self.arvrunner.trash_intermediate: + if runtimeContext.trash_intermediate: command.append("--trash-intermediate") - if self.arvrunner.project_uuid: - command.append("--project-uuid="+self.arvrunner.project_uuid) + if runtimeContext.project_uuid: + command.append("--project-uuid="+runtimeContext.project_uuid) if self.enable_dev: command.append("--enable-dev") + if runtimeContext.enable_preemptible is True: + command.append("--enable-preemptible") + + if runtimeContext.enable_preemptible is False: + command.append("--disable-preemptible") + + if runtimeContext.varying_url_params: + command.append("--varying-url-params="+runtimeContext.varying_url_params) + + if runtimeContext.prefer_cached_downloads: + command.append("--prefer-cached-downloads") + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -550,9 +623,9 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" - job_spec = self.arvados_job_spec(runtimeContext) - if self.arvrunner.project_uuid: - job_spec["owner_uuid"] = self.arvrunner.project_uuid + job_spec = self.arvados_job_spec(runtimeContext, self.git_info) + if runtimeContext.project_uuid: + job_spec["owner_uuid"] = runtimeContext.project_uuid extra_submit_params = {} if runtimeContext.submit_runner_cluster: