X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/60e38e35f49de11b5752c2bfb7f9f10443d82c49..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 7964486ff2..6fcf366e02 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -37,6 +37,9 @@ from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +def cleanup_name_for_collection(name): + return name.replace("/", " ") + class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" @@ -88,6 +91,8 @@ class ArvadosContainer(JobBase): container_request["state"] = "Committed" container_request.setdefault("properties", {}) + container_request["properties"]["cwl_input"] = self.joborder + runtime_constraints = {} if runtimeContext.project_uuid: @@ -146,6 +151,8 @@ class ArvadosContainer(JobBase): mounts[targetdir]["path"] = path prevdir = targetdir + "/" + intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) + with Perf(metrics, "generatefiles %s" % self.name): if self.generatefiles["listing"]: vwd = arvados.collection.Collection(api_client=self.arvrunner.api, @@ -197,12 +204,11 @@ class ArvadosContainer(JobBase): if not runtimeContext.current_container: runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger) - info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl) - vwd.save_new(name=info["name"], + vwd.save_new(name=intermediate_collection_info["name"], owner_uuid=runtimeContext.project_uuid, ensure_unique_name=True, - trash_at=info["trash_at"], - properties=info["properties"]) + trash_at=intermediate_collection_info["trash_at"], + properties=intermediate_collection_info["properties"]) prev = None for f, p in sorteditems: @@ -261,7 +267,11 @@ class ArvadosContainer(JobBase): runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: - runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) + if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0: + # If DefaultKeepCacheDisk is non-zero it means we should use disk cache. + runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20) + else: + runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) if "outputDirType" in runtime_req: if runtime_req["outputDirType"] == "local_output_dir": # Currently the default behavior. @@ -319,7 +329,7 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_runner_cluster: extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster - container_request["output_name"] = "Output for step %s" % (self.name) + container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name)) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -343,8 +353,13 @@ class ArvadosContainer(JobBase): output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties") if output_properties_req: - for pr in output_properties_req["processProperties"]: - container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + if self.arvrunner.api._rootDesc["revision"] >= "20220510": + container_request["output_properties"] = {} + for pr in output_properties_req["outputProperties"]: + container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"]) + else: + logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.", + self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510") if runtimeContext.runnerjob.startswith("arvwf:"): wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")] @@ -428,6 +443,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") + + properties = record["properties"].copy() + properties["cwl_output"] = outputs + self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body={"container_request": {"properties": properties}} + ).execute(num_retries=self.arvrunner.num_retries) except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -444,7 +466,7 @@ class ArvadosContainer(JobBase): class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, runtimeContext): + def arvados_job_spec(self, runtimeContext, git_info): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -505,15 +527,24 @@ class RunnerContainer(Runner): "kind": "collection", "portable_data_hash": "%s" % workflowcollection } + elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + workflowpath = "/var/lib/cwl/workflow.json#main" + record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries) + packed = yaml.safe_load(record["definition"]) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "content": packed + } + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", "content": packed } - if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + + container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()}) properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") if properties_req: @@ -577,6 +608,12 @@ class RunnerContainer(Runner): if runtimeContext.enable_preemptible is False: command.append("--disable-preemptible") + if runtimeContext.varying_url_params: + command.append("--varying-url-params="+runtimeContext.varying_url_params) + + if runtimeContext.prefer_cached_downloads: + command.append("--prefer-cached-downloads") + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -586,7 +623,7 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" - job_spec = self.arvados_job_spec(runtimeContext) + job_spec = self.arvados_job_spec(runtimeContext, self.git_info) if runtimeContext.project_uuid: job_spec["owner_uuid"] = runtimeContext.project_uuid