Merge branch '20825-cwl-separate-runner' refs #20825
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index f8715f7e7b805f8191e79cd68b4ae4324d04bc59..6e3e42975e75385fe1bf1a2e1d5b5070773d4e8c 100644 (file)
@@ -15,9 +15,10 @@ import datetime
 import ciso8601
 import uuid
 import math
+import re
 
 import arvados_cwl.util
-import ruamel.yaml as yaml
+import ruamel.yaml
 
 from cwltool.errors import WorkflowException
 from cwltool.process import UnsupportedRequirement, shortname
@@ -56,6 +57,7 @@ class ArvadosContainer(JobBase):
         self.job_runtime = job_runtime
         self.running = False
         self.uuid = None
+        self.attempt_count = 0
 
     def update_pipeline_component(self, r):
         pass
@@ -88,7 +90,7 @@ class ArvadosContainer(JobBase):
         container_request["output_path"] = self.outdir
         container_request["cwd"] = self.outdir
         container_request["priority"] = runtimeContext.priority
-        container_request["state"] = "Committed"
+        container_request["state"] = "Uncommitted"
         container_request.setdefault("properties", {})
 
         container_request["properties"]["cwl_input"] = self.joborder
@@ -250,11 +252,7 @@ class ArvadosContainer(JobBase):
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                     docker_req,
                                                                     runtimeContext.pull_image,
-                                                                    runtimeContext.project_uuid,
-                                                                    runtimeContext.force_docker_pull,
-                                                                    runtimeContext.tmp_outdir_prefix,
-                                                                    runtimeContext.match_local_docker,
-                                                                    runtimeContext.copy_deps)
+                                                                    runtimeContext)
 
         network_req, _ = self.get_requirement("NetworkAccess")
         if network_req:
@@ -264,10 +262,22 @@ class ArvadosContainer(JobBase):
         if api_req:
             runtime_constraints["API"] = True
 
+        use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
+
+        keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
+        if keep_cache_type_req:
+            if "keepCacheType" in keep_cache_type_req:
+                if keep_cache_type_req["keepCacheType"] == "ram_cache":
+                    use_disk_cache = False
+
         runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
             if "keep_cache" in runtime_req:
-                runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+                if use_disk_cache:
+                    # If DefaultKeepCacheRAM is zero it means we should use disk cache.
+                    runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
+                else:
+                    runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
             if "outputDirType" in runtime_req:
                 if runtime_req["outputDirType"] == "local_output_dir":
                     # Currently the default behavior.
@@ -357,6 +367,12 @@ class ArvadosContainer(JobBase):
                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
 
+        ram_multiplier = [1]
+
+        oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+        if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'):
+            ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
@@ -364,23 +380,45 @@ class ArvadosContainer(JobBase):
                 container_request["name"] = wfrecord["name"]
             container_request["properties"]["template_uuid"] = wfuuid
 
-        self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+        if self.attempt_count == 0:
+            self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
 
         try:
-            if runtimeContext.submit_request_uuid:
-                response = self.arvrunner.api.container_requests().update(
-                    uuid=runtimeContext.submit_request_uuid,
-                    body=container_request,
-                    **extra_submit_params
-                ).execute(num_retries=self.arvrunner.num_retries)
-            else:
-                response = self.arvrunner.api.container_requests().create(
-                    body=container_request,
-                    **extra_submit_params
-                ).execute(num_retries=self.arvrunner.num_retries)
+            ram = runtime_constraints["ram"]
+
+            self.uuid = runtimeContext.submit_request_uuid
+
+            for i in ram_multiplier:
+                runtime_constraints["ram"] = ram * i
+
+                if self.uuid:
+                    response = self.arvrunner.api.container_requests().update(
+                        uuid=self.uuid,
+                        body=container_request,
+                        **extra_submit_params
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                else:
+                    response = self.arvrunner.api.container_requests().create(
+                        body=container_request,
+                        **extra_submit_params
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                    self.uuid = response["uuid"]
+
+                if response["container_uuid"] is not None:
+                    break
+
+            if response["container_uuid"] is None:
+                runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
+
+            container_request["state"] = "Committed"
+            response = self.arvrunner.api.container_requests().update(
+                uuid=self.uuid,
+                body=container_request,
+                **extra_submit_params
+            ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.uuid = response["uuid"]
             self.arvrunner.process_submitted(self)
+            self.attempt_count += 1
 
             if response["state"] == "Final":
                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
@@ -391,8 +429,37 @@ class ArvadosContainer(JobBase):
             logger.debug("Container request was %s", container_request)
             self.output_callback({}, "permanentFail")
 
+    def out_of_memory_retry(self, record, container):
+        oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+        if oom_retry_req is None:
+            return False
+
+        # Sometimes it gets killed with no warning
+        if container["exit_code"] == 137:
+            return True
+
+        logc = arvados.collection.CollectionReader(record["log_uuid"],
+                                                   api_client=self.arvrunner.api,
+                                                   keep_client=self.arvrunner.keep_client,
+                                                   num_retries=self.arvrunner.num_retries)
+
+        loglines = [""]
+        def callback(v1, v2, v3):
+            loglines[0] = v3
+
+        done.logtail(logc, callback, "", maxlen=1000)
+
+        # Check allocation failure
+        oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
+        if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+            return True
+
+        return False
+
     def done(self, record):
         outputs = {}
+        retried = False
+        rcode = None
         try:
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
@@ -410,8 +477,17 @@ class ArvadosContainer(JobBase):
                 else:
                     processStatus = "permanentFail"
 
+                if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
+                    logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
+                                 self.arvrunner.label(self))
+                    self.job_runtime.submit_request_uuid = None
+                    self.uuid = None
+                    self.run(None)
+                    retried = True
+                    return
+
                 if rcode == 137:
-                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
+                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
                                  self.arvrunner.label(self))
             else:
                 processStatus = "permanentFail"
@@ -424,7 +500,7 @@ class ArvadosContainer(JobBase):
                 label = self.arvrunner.label(self)
                 done.logtail(
                     logc, logger.error,
-                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
 
             if record["output_uuid"]:
                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
@@ -456,7 +532,8 @@ class ArvadosContainer(JobBase):
             logger.exception("%s while getting output object:", self.arvrunner.label(self))
             processStatus = "permanentFail"
         finally:
-            self.output_callback(outputs, processStatus)
+            if not retried:
+                self.output_callback(outputs, processStatus)
 
 
 class RunnerContainer(Runner):
@@ -483,13 +560,19 @@ class RunnerContainer(Runner):
                 }
                 self.job_order[param] = {"$include": mnt}
 
+        container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
+
+        workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+            container_image = workflow_runner_req.get("acrContainerImage")
+
         container_req = {
             "name": self.name,
             "output_path": "/var/spool/cwl",
             "cwd": "/var/spool/cwl",
             "priority": self.priority,
             "state": "Committed",
-            "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+            "container_image": container_image,
             "mounts": {
                 "/var/lib/cwl/cwl.input.json": {
                     "kind": "json",
@@ -510,7 +593,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": False, # Never reuse the runner container - see #15497.
+            "use_existing": self.reuse_runner,
             "properties": {}
         }
 
@@ -524,20 +607,26 @@ class RunnerContainer(Runner):
                 "portable_data_hash": "%s" % workflowcollection
             }
         elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
-            workflowpath = "/var/lib/cwl/workflow.json#main"
-            record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
-            packed = yaml.safe_load(record["definition"])
+            uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
+            workflowpath = "/var/lib/cwl/workflow.json#" + frg
+            packedtxt = self.loadingContext.loader.fetch_text(uuid)
+            yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+            packed = yaml.load(packedtxt)
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+        elif self.embedded_tool.tool.get("id", "").startswith("file:"):
+            raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
         else:
-            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+            main = self.loadingContext.loader.idx["_:main"]
+            if main.get("id") == "_:main":
+                del main["id"]
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
-                "content": packed
+                "content": main
             }
 
         container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
@@ -610,6 +699,9 @@ class RunnerContainer(Runner):
         if runtimeContext.prefer_cached_downloads:
             command.append("--prefer-cached-downloads")
 
+        if self.fast_parser:
+            command.append("--fast-parser")
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command