X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/c22d90571a1fcb4b52e5387a791e3aefff5be6af..878ec61b535740ebeb40dcc4a330557698c66417:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index b04fb190e9..b0ef4a22cc 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -37,6 +37,9 @@ from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +def cleanup_name_for_collection(name): + return name.replace("/", " ") + class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" @@ -88,6 +91,8 @@ class ArvadosContainer(JobBase): container_request["state"] = "Committed" container_request.setdefault("properties", {}) + container_request["properties"]["cwl_input"] = self.joborder + runtime_constraints = {} if runtimeContext.project_uuid: @@ -146,6 +151,8 @@ class ArvadosContainer(JobBase): mounts[targetdir]["path"] = path prevdir = targetdir + "/" + intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + with Perf(metrics, "generatefiles %s" % self.name): if self.generatefiles["listing"]: vwd = arvados.collection.Collection(api_client=self.arvrunner.api, @@ -197,12 +204,11 @@ class ArvadosContainer(JobBase): if not runtimeContext.current_container: runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) - info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) - vwd.save_new(name=info["name"], + vwd.save_new(name=intermediate_collection_info["name"], owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, - trash_at=info["trash_at"], - properties=info["properties"]) + trash_at=intermediate_collection_info["trash_at"], + properties=intermediate_collection_info["properties"]) prev = None for f, p in sorteditems: @@ -247,7 +253,8 @@ class ArvadosContainer(JobBase): runtimeContext.project_uuid, runtimeContext.force_docker_pull, runtimeContext.tmp_outdir_prefix, - runtimeContext.match_local_docker) + runtimeContext.match_local_docker, + runtimeContext.copy_deps) network_req, _ = self.get_requirement("NetworkAccess") if network_req: @@ -318,7 +325,7 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_runner_cluster: extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster - container_request["output_name"] = "Output for step %s" % (self.name) + container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name)) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -340,6 +347,16 @@ class ArvadosContainer(JobBase): for pr in properties_req["processProperties"]: container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties") + if output_properties_req: + if self.arvrunner.api._rootDesc["revision"] >= "20220510": + container_request["output_properties"] = {} + for pr in output_properties_req["outputProperties"]: + container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + else: + logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", + self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") + if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries) @@ -394,7 +411,7 @@ class ArvadosContainer(JobBase): processStatus = "permanentFail" if rcode == 137: - logger.warning("%s This container was killed on the compute instance. The most common reason is that it attempted to allocate too much RAM and was targeted by the Out Of Memory (OOM) killer. Try resubmitting with a higher 'ramMin'.", + logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.", self.arvrunner.label(self)) else: processStatus = "permanentFail" @@ -422,6 +439,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") + + properties = record["properties"].copy() + properties["cwl_output"] = outputs + self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body={"container_request": {"properties": properties}} + ).execute(num_retries=self.arvrunner.num_retries) except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -438,7 +462,7 @@ class ArvadosContainer(JobBase): class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, runtimeContext): + def arvados_job_spec(self, runtimeContext, git_info): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -465,7 +489,7 @@ class RunnerContainer(Runner): "cwd": "/var/spool/cwl", "priority": self.priority, "state": "Committed", - "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image), + "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext), "mounts": { "/var/lib/cwl/cwl.input.json": { "kind": "json", @@ -499,15 +523,24 @@ class RunnerContainer(Runner): "kind": "collection", "portable_data_hash": "%s" % workflowcollection } + elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + workflowpath = "/var/lib/cwl/workflow.json#main" + record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries) + packed = yaml.safe_load(record["definition"]) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "content": packed + } + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", "content": packed } - if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + + container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()}) properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") if properties_req: @@ -550,17 +583,17 @@ class RunnerContainer(Runner): if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes: command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes) - if self.on_error: + if runtimeContext.on_error: command.append("--on-error=" + self.on_error) - if self.intermediate_output_ttl: - command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl) + if runtimeContext.intermediate_output_ttl: + command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl) - if self.arvrunner.trash_intermediate: + if runtimeContext.trash_intermediate: command.append("--trash-intermediate") - if self.arvrunner.project_uuid: - command.append("--project-uuid="+self.arvrunner.project_uuid) + if runtimeContext.project_uuid: + command.append("--project-uuid="+runtimeContext.project_uuid) if self.enable_dev: command.append("--enable-dev") @@ -571,6 +604,9 @@ class RunnerContainer(Runner): if runtimeContext.enable_preemptible is False: command.append("--disable-preemptible") + if runtimeContext.varying_url_params: + command.append("--varying-url-params="+runtimeContext.varying_url_params) + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -580,9 +616,9 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" - job_spec = self.arvados_job_spec(runtimeContext) - if self.arvrunner.project_uuid: - job_spec["owner_uuid"] = self.arvrunner.project_uuid + job_spec = self.arvados_job_spec(runtimeContext, self.git_info) + if runtimeContext.project_uuid: + job_spec["owner_uuid"] = runtimeContext.project_uuid extra_submit_params = {} if runtimeContext.submit_runner_cluster: