Merge branch '18947-githttpd'
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index 99d82f3398332d10e0ac0cbf95decb0d5870bd5a..f75bde81e6cebd655a8378fbd382f18b3bf18d2f 100644 (file)
@@ -21,15 +21,14 @@ import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
 from cwltool.process import UnsupportedRequirement, shortname
-from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
-from cwltool.utils import aslist
+from cwltool.utils import aslist, adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.job import JobBase
 
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
 from . import done
-from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
+from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
 from .fsaccess import CollectionFetcher
 from .pathmapper import NoFollowPathMapper, trim_listing
 from .perf import Perf
@@ -58,7 +57,13 @@ class ArvadosContainer(JobBase):
     def update_pipeline_component(self, r):
         pass
 
-    def run(self, runtimeContext):
+    def _required_env(self):
+        env = {}
+        env["HOME"] = self.outdir
+        env["TMPDIR"] = self.tmpdir
+        return env
+
+    def run(self, toplevelRuntimeContext):
         # ArvadosCommandTool subclasses from cwltool.CommandLineTool,
         # which calls makeJobRunner() to get a new ArvadosContainer
         # object.  The fields that define execution such as
@@ -68,15 +73,21 @@ class ArvadosContainer(JobBase):
 
         runtimeContext = self.job_runtime
 
-        container_request = {
-            "command": self.command_line,
-            "name": self.name,
-            "output_path": self.outdir,
-            "cwd": self.outdir,
-            "priority": runtimeContext.priority,
-            "state": "Committed",
-            "properties": {},
-        }
+        if runtimeContext.submit_request_uuid:
+            container_request = self.arvrunner.api.container_requests().get(
+                uuid=runtimeContext.submit_request_uuid
+            ).execute(num_retries=self.arvrunner.num_retries)
+        else:
+            container_request = {}
+
+        container_request["command"] = self.command_line
+        container_request["name"] = self.name
+        container_request["output_path"] = self.outdir
+        container_request["cwd"] = self.outdir
+        container_request["priority"] = runtimeContext.priority
+        container_request["state"] = "Committed"
+        container_request.setdefault("properties", {})
+
         runtime_constraints = {}
 
         if runtimeContext.project_uuid:
@@ -229,13 +240,14 @@ class ArvadosContainer(JobBase):
                                 "path": "%s/%s" % (self.outdir, self.stdout)}
 
         (docker_req, docker_is_req) = self.get_requirement("DockerRequirement")
-        if not docker_req:
-            docker_req = {"dockerImageId": "arvados/jobs:"+__version__}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                     docker_req,
                                                                     runtimeContext.pull_image,
-                                                                    runtimeContext.project_uuid)
+                                                                    runtimeContext.project_uuid,
+                                                                    runtimeContext.force_docker_pull,
+                                                                    runtimeContext.tmp_outdir_prefix,
+                                                                    runtimeContext.match_local_docker)
 
         network_req, _ = self.get_requirement("NetworkAccess")
         if network_req:
@@ -272,6 +284,33 @@ class ArvadosContainer(JobBase):
         if self.output_ttl < 0:
             raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
 
+
+        if self.arvrunner.api._rootDesc["revision"] >= "20210628":
+            storage_class_req, _ = self.get_requirement("http://arvados.org/cwl#OutputStorageClass")
+            if storage_class_req and storage_class_req.get("intermediateStorageClass"):
+                container_request["output_storage_classes"] = aslist(storage_class_req["intermediateStorageClass"])
+            else:
+                container_request["output_storage_classes"] = runtimeContext.intermediate_storage_classes.strip().split(",")
+
+        cuda_req, _ = self.get_requirement("http://commonwl.org/cwltool#CUDARequirement")
+        if cuda_req:
+            runtime_constraints["cuda"] = {
+                "device_count": resources.get("cudaDeviceCount", 1),
+                "driver_version": cuda_req["cudaVersionMin"],
+                "hardware_capability": aslist(cuda_req["cudaComputeCapability"])[0]
+            }
+
+        if runtimeContext.enable_preemptible is False:
+            scheduling_parameters["preemptible"] = False
+        else:
+            preemptible_req, _ = self.get_requirement("http://arvados.org/cwl#UsePreemptible")
+            if preemptible_req:
+                scheduling_parameters["preemptible"] = preemptible_req["usePreemptible"]
+            elif runtimeContext.enable_preemptible is True:
+                scheduling_parameters["preemptible"] = True
+            elif runtimeContext.enable_preemptible is None:
+                pass
+
         if self.timelimit is not None and self.timelimit > 0:
             scheduling_parameters["max_run_time"] = self.timelimit
 
@@ -296,6 +335,11 @@ class ArvadosContainer(JobBase):
                 enable_reuse = reuse_req["enableReuse"]
         container_request["use_existing"] = enable_reuse
 
+        properties_req, _ = self.get_requirement("http://arvados.org/cwl#ProcessProperties")
+        if properties_req:
+            for pr in properties_req["processProperties"]:
+                container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
@@ -348,6 +392,10 @@ class ArvadosContainer(JobBase):
                     processStatus = "success"
                 else:
                     processStatus = "permanentFail"
+
+                if rcode == 137:
+                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
+                                 self.arvrunner.label(self))
             else:
                 processStatus = "permanentFail"
 
@@ -461,6 +509,11 @@ class RunnerContainer(Runner):
             if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
                 container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
 
+        properties_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ProcessProperties")
+        if properties_req:
+            builder = make_builder(self.job_order, self.embedded_tool.hints, self.embedded_tool.requirements, runtimeContext, self.embedded_tool.metadata)
+            for pr in properties_req["processProperties"]:
+                container_req["properties"][pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
 
         # --local means execute the workflow instead of submitting a container request
         # --api=containers means use the containers API
@@ -491,9 +544,12 @@ class RunnerContainer(Runner):
         if runtimeContext.debug:
             command.append("--debug")
 
-        if runtimeContext.storage_classes != "default":
+        if runtimeContext.storage_classes != "default" and runtimeContext.storage_classes:
             command.append("--storage-classes=" + runtimeContext.storage_classes)
 
+        if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
+            command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
+
         if self.on_error:
             command.append("--on-error=" + self.on_error)
 
@@ -509,6 +565,12 @@ class RunnerContainer(Runner):
         if self.enable_dev:
             command.append("--enable-dev")
 
+        if runtimeContext.enable_preemptible is True:
+            command.append("--enable-preemptible")
+
+        if runtimeContext.enable_preemptible is False:
+            command.append("--disable-preemptible")
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
@@ -546,6 +608,17 @@ class RunnerContainer(Runner):
 
         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
 
+        workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
+        workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
+        url = ""
+        if workbench2:
+            url = "{}processes/{}".format(workbench2, response["uuid"])
+        elif workbench1:
+            url = "{}container_requests/{}".format(workbench1, response["uuid"])
+        if url:
+            logger.info("Monitor workflow progress at %s", url)
+
+
     def done(self, record):
         try:
             container = self.arvrunner.api.containers().get(