X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/3fa6aa4043286ad61e5f29c136d3cc2942e8750d..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/sdk/cwl/arvados_cwl/arvcontainer.py diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index f3e122e603..6fcf366e02 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -37,6 +37,9 @@ from ._version import __version__ logger = logging.getLogger('arvados.cwl-runner') metrics = logging.getLogger('arvados.cwl-runner.metrics') +def cleanup_name_for_collection(name): + return name.replace("/", " ") + class ArvadosContainer(JobBase): """Submit and manage a Crunch container request for executing a CWL CommandLineTool.""" @@ -88,6 +91,8 @@ class ArvadosContainer(JobBase): container_request["state"] = "Committed" container_request.setdefault("properties", {}) + container_request["properties"]["cwl_input"] = self.joborder + runtime_constraints = {} if runtimeContext.project_uuid: @@ -262,7 +267,11 @@ class ArvadosContainer(JobBase): runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints") if runtime_req: if "keep_cache" in runtime_req: - runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) + if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0: + # If DefaultKeepCacheDisk is non-zero it means we should use disk cache. + runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20) + else: + runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20) if "outputDirType" in runtime_req: if runtime_req["outputDirType"] == "local_output_dir": # Currently the default behavior. @@ -320,7 +329,7 @@ class ArvadosContainer(JobBase): if runtimeContext.submit_runner_cluster: extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster - container_request["output_name"] = "Output from step %s" % (self.name) + container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name)) container_request["output_ttl"] = self.output_ttl container_request["mounts"] = mounts container_request["secret_mounts"] = secret_mounts @@ -434,6 +443,13 @@ class ArvadosContainer(JobBase): if container["output"]: outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep") + + properties = record["properties"].copy() + properties["cwl_output"] = outputs + self.arvrunner.api.container_requests().update( + uuid=self.uuid, + body={"container_request": {"properties": properties}} + ).execute(num_retries=self.arvrunner.num_retries) except WorkflowException as e: # Only include a stack trace if in debug mode. # A stack trace may obfuscate more useful output about the workflow. @@ -450,7 +466,7 @@ class ArvadosContainer(JobBase): class RunnerContainer(Runner): """Submit and manage a container that runs arvados-cwl-runner.""" - def arvados_job_spec(self, runtimeContext): + def arvados_job_spec(self, runtimeContext, git_info): """Create an Arvados container request for this workflow. The returned dict can be used to create a container passed as @@ -511,15 +527,24 @@ class RunnerContainer(Runner): "kind": "collection", "portable_data_hash": "%s" % workflowcollection } + elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"): + workflowpath = "/var/lib/cwl/workflow.json#main" + record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries) + packed = yaml.safe_load(record["definition"]) + container_req["mounts"]["/var/lib/cwl/workflow.json"] = { + "kind": "json", + "content": packed + } + container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] else: - packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext) + packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info) workflowpath = "/var/lib/cwl/workflow.json#main" container_req["mounts"]["/var/lib/cwl/workflow.json"] = { "kind": "json", "content": packed } - if self.embedded_tool.tool.get("id", "").startswith("arvwf:"): - container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33] + + container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()}) properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties") if properties_req: @@ -583,6 +608,12 @@ class RunnerContainer(Runner): if runtimeContext.enable_preemptible is False: command.append("--disable-preemptible") + if runtimeContext.varying_url_params: + command.append("--varying-url-params="+runtimeContext.varying_url_params) + + if runtimeContext.prefer_cached_downloads: + command.append("--prefer-cached-downloads") + command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"]) container_req["command"] = command @@ -592,7 +623,7 @@ class RunnerContainer(Runner): def run(self, runtimeContext): runtimeContext.keepprefix = "keep:" - job_spec = self.arvados_job_spec(runtimeContext) + job_spec = self.arvados_job_spec(runtimeContext, self.git_info) if runtimeContext.project_uuid: job_spec["owner_uuid"] = runtimeContext.project_uuid