Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index ae3c6688955301141f0af3405787c9f831fb58b7..6fcf366e02aeed8aca3bc25a56d8562f0ba812f7 100644 (file)
@@ -37,6 +37,9 @@ from ._version import __version__
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
+def cleanup_name_for_collection(name):
+    return name.replace("/", " ")
+
 class ArvadosContainer(JobBase):
     """Submit and manage a Crunch container request for executing a CWL CommandLineTool."""
 
@@ -63,7 +66,7 @@ class ArvadosContainer(JobBase):
         env["TMPDIR"] = self.tmpdir
         return env
 
-    def run(self, runtimeContext):
+    def run(self, toplevelRuntimeContext):
         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
         # which calls makeJobRunner() to get a new ArvadosContainer
         # object.  The fields that define execution such as
@@ -88,6 +91,8 @@ class ArvadosContainer(JobBase):
         container_request["state"] = "Committed"
         container_request.setdefault("properties", {})
 
+        container_request["properties"]["cwl_input"] = self.joborder
+
         runtime_constraints = {}
 
         if runtimeContext.project_uuid:
@@ -146,6 +151,8 @@ class ArvadosContainer(JobBase):
                     mounts[targetdir]["path"] = path
             prevdir = targetdir + "/"
 
+        intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+
         with Perf(metrics, "generatefiles %s" % self.name):
             if self.generatefiles["listing"]:
                 vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
@@ -197,12 +204,11 @@ class ArvadosContainer(JobBase):
 
                 if not runtimeContext.current_container:
                     runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
-                info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
-                vwd.save_new(name=info["name"],
+                vwd.save_new(name=intermediate_collection_info["name"],
                              owner_uuid=runtimeContext.project_uuid,
                              ensure_unique_name=True,
-                             trash_at=info["trash_at"],
-                             properties=info["properties"])
+                             trash_at=intermediate_collection_info["trash_at"],
+                             properties=intermediate_collection_info["properties"])
 
                 prev = None
                 for f, p in sorteditems:
@@ -246,7 +252,9 @@ class ArvadosContainer(JobBase):
                                                                     runtimeContext.pull_image,
                                                                     runtimeContext.project_uuid,
                                                                     runtimeContext.force_docker_pull,
-                                                                    runtimeContext.tmp_outdir_prefix)
+                                                                    runtimeContext.tmp_outdir_prefix,
+                                                                    runtimeContext.match_local_docker,
+                                                                    runtimeContext.copy_deps)
 
         network_req, _ = self.get_requirement("NetworkAccess")
         if network_req:
@@ -259,7 +267,11 @@ class ArvadosContainer(JobBase):
         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
             if "keep_cache" in runtime_req:
-                runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+                if self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0:
+                    # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
+                    runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+                else:
+                    runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
             if "outputDirType" in runtime_req:
                 if runtime_req["outputDirType"] == "local_output_dir":
                     # Currently the default behavior.
@@ -291,6 +303,25 @@ class ArvadosContainer(JobBase):
             else:
                 container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
 
+        cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
+        if cuda_req:
+            runtime_constraints["cuda"] = {
+                "device_count": resources.get("cudaDeviceCount", 1),
+                "driver_version": cuda_req["cudaVersionMin"],
+                "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
+            }
+
+        if runtimeContext.enable_preemptible is False:
+            scheduling_parameters["preemptible"] = False
+        else:
+            preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
+            if preemptible_req:
+                scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
+            elif runtimeContext.enable_preemptible is True:
+                scheduling_parameters["preemptible"] = True
+            elif runtimeContext.enable_preemptible is None:
+                pass
+
         if self.timelimit is not None and self.timelimit > 0:
             scheduling_parameters["max_run_time"] = self.timelimit
 
@@ -298,7 +329,7 @@ class ArvadosContainer(JobBase):
         if runtimeContext.submit_runner_cluster:
             extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
 
-        container_request["output_name"] = "Output for step %s" % (self.name)
+        container_request["output_name"] = cleanup_name_for_collection("Output from step %s" % (self.name))
         container_request["output_ttl"] = self.output_ttl
         container_request["mounts"] = mounts
         container_request["secret_mounts"] = secret_mounts
@@ -320,6 +351,16 @@ class ArvadosContainer(JobBase):
             for pr in properties_req["processProperties"]:
                 container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
 
+        output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+        if output_properties_req:
+            if self.arvrunner.api._rootDesc["revision"] >= "20220510":
+                container_request["output_properties"] = {}
+                for pr in output_properties_req["outputProperties"]:
+                    container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+            else:
+                logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
+                               self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
@@ -372,6 +413,10 @@ class ArvadosContainer(JobBase):
                     processStatus = "success"
                 else:
                     processStatus = "permanentFail"
+
+                if rcode == 137:
+                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
+                                 self.arvrunner.label(self))
             else:
                 processStatus = "permanentFail"
 
@@ -398,6 +443,13 @@ class ArvadosContainer(JobBase):
 
             if container["output"]:
                 outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+
+            properties = record["properties"].copy()
+            properties["cwl_output"] = outputs
+            self.arvrunner.api.container_requests().update(
+                uuid=self.uuid,
+                body={"container_request": {"properties": properties}}
+            ).execute(num_retries=self.arvrunner.num_retries)
         except WorkflowException as e:
             # Only include a stack trace if in debug mode.
             # A stack trace may obfuscate more useful output about the workflow.
@@ -414,7 +466,7 @@ class ArvadosContainer(JobBase):
 class RunnerContainer(Runner):
     """Submit and manage a container that runs arvados-cwl-runner."""
 
-    def arvados_job_spec(self, runtimeContext):
+    def arvados_job_spec(self, runtimeContext, git_info):
         """Create an Arvados container request for this workflow.
 
         The returned dict can be used to create a container passed as
@@ -441,7 +493,7 @@ class RunnerContainer(Runner):
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
-            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
@@ -475,15 +527,24 @@ class RunnerContainer(Runner):
                 "kind": "collection",
                 "portable_data_hash": "%s" % workflowcollection
             }
+        elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+            workflowpath = "/var/lib/cwl/workflow.json#main"
+            record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
+            packed = yaml.safe_load(record["definition"])
+            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+                "kind": "json",
+                "content": packed
+            }
+            container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
         else:
-            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
-            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+
+        container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
 
         properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
         if properties_req:
@@ -526,21 +587,33 @@ class RunnerContainer(Runner):
         if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
             command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
 
-        if self.on_error:
+        if runtimeContext.on_error:
             command.append("--on-error=" + self.on_error)
 
-        if self.intermediate_output_ttl:
-            command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+        if runtimeContext.intermediate_output_ttl:
+            command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
 
-        if self.arvrunner.trash_intermediate:
+        if runtimeContext.trash_intermediate:
             command.append("--trash-intermediate")
 
-        if self.arvrunner.project_uuid:
-            command.append("--project-uuid="+self.arvrunner.project_uuid)
+        if runtimeContext.project_uuid:
+            command.append("--project-uuid="+runtimeContext.project_uuid)
 
         if self.enable_dev:
             command.append("--enable-dev")
 
+        if runtimeContext.enable_preemptible is True:
+            command.append("--enable-preemptible")
+
+        if runtimeContext.enable_preemptible is False:
+            command.append("--disable-preemptible")
+
+        if runtimeContext.varying_url_params:
+            command.append("--varying-url-params="+runtimeContext.varying_url_params)
+
+        if runtimeContext.prefer_cached_downloads:
+            command.append("--prefer-cached-downloads")
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
@@ -550,9 +623,9 @@ class RunnerContainer(Runner):
 
     def run(self, runtimeContext):
         runtimeContext.keepprefix = "keep:"
-        job_spec = self.arvados_job_spec(runtimeContext)
-        if self.arvrunner.project_uuid:
-            job_spec["owner_uuid"] = self.arvrunner.project_uuid
+        job_spec = self.arvados_job_spec(runtimeContext, self.git_info)
+        if runtimeContext.project_uuid:
+            job_spec["owner_uuid"] = runtimeContext.project_uuid
 
         extra_submit_params = {}
         if runtimeContext.submit_runner_cluster: