20825: Make SeparateRunner reusable
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index e828b16d3091772d72e25b270f7d8269ff9916c6..aa9fa1e90358f733bbd21932fa38ba90b908aa93 100644 (file)
@@ -367,6 +367,12 @@ class ArvadosContainer(JobBase):
                 logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
                                self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
 
+        ram_multiplier = [1]
+
+        oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+        if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'):
+            ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
             wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
@@ -380,12 +386,14 @@ class ArvadosContainer(JobBase):
         try:
             ram = runtime_constraints["ram"]
 
-            for i in range(1, 4):
+            self.uuid = runtimeContext.submit_request_uuid
+
+            for i in ram_multiplier:
                 runtime_constraints["ram"] = ram * i
 
-                if runtimeContext.submit_request_uuid:
+                if self.uuid:
                     response = self.arvrunner.api.container_requests().update(
-                        uuid=runtimeContext.submit_request_uuid,
+                        uuid=self.uuid,
                         body=container_request,
                         **extra_submit_params
                     ).execute(num_retries=self.arvrunner.num_retries)
@@ -394,22 +402,21 @@ class ArvadosContainer(JobBase):
                         body=container_request,
                         **extra_submit_params
                     ).execute(num_retries=self.arvrunner.num_retries)
-                    runtimeContext.submit_request_uuid = response["uuid"]
+                    self.uuid = response["uuid"]
 
                 if response["container_uuid"] is not None:
                     break
 
             if response["container_uuid"] is None:
-                runtime_constraints["ram"] = ram * (self.attempt_count+1)
+                runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
 
             container_request["state"] = "Committed"
             response = self.arvrunner.api.container_requests().update(
-                uuid=runtimeContext.submit_request_uuid,
+                uuid=self.uuid,
                 body=container_request,
                 **extra_submit_params
             ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.uuid = response["uuid"]
             self.arvrunner.process_submitted(self)
             self.attempt_count += 1
 
@@ -423,6 +430,14 @@ class ArvadosContainer(JobBase):
             self.output_callback({}, "permanentFail")
 
     def out_of_memory_retry(self, record, container):
+        oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+        if oom_retry_req is None:
+            return False
+
+        # Sometimes it gets killed with no warning
+        if container["exit_code"] == 137:
+            return True
+
         logc = arvados.collection.CollectionReader(record["log_uuid"],
                                                    api_client=self.arvrunner.api,
                                                    keep_client=self.arvrunner.keep_client,
@@ -434,14 +449,9 @@ class ArvadosContainer(JobBase):
 
         done.logtail(logc, callback, "", maxlen=1000)
 
-        # Check OOM killed
-        oom_matches = r'container using over 9.% of memory'
-        if container["exit_code"] == 137 and re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
-            return True
-
         # Check allocation failure
-        bad_alloc_matches = r'(bad_alloc|out ?of ?memory)'
-        if re.search(bad_alloc_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+        oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
+        if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
             return True
 
         return False
@@ -449,6 +459,7 @@ class ArvadosContainer(JobBase):
     def done(self, record):
         outputs = {}
         retried = False
+        rcode = None
         try:
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
@@ -466,8 +477,8 @@ class ArvadosContainer(JobBase):
                 else:
                     processStatus = "permanentFail"
 
-                if processStatus == "permanentFail" and self.out_of_memory_retry(record, container):
-                    logger.info("%s Container failed with out of memory error, retrying with more RAM.",
+                if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
+                    logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
                                  self.arvrunner.label(self))
                     self.job_runtime.submit_request_uuid = None
                     self.uuid = None
@@ -476,7 +487,7 @@ class ArvadosContainer(JobBase):
                     return
 
                 if rcode == 137:
-                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
+                    logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
                                  self.arvrunner.label(self))
             else:
                 processStatus = "permanentFail"
@@ -489,7 +500,7 @@ class ArvadosContainer(JobBase):
                 label = self.arvrunner.label(self)
                 done.logtail(
                     logc, logger.error,
-                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+                    "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
 
             if record["output_uuid"]:
                 if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
@@ -576,7 +587,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": False, # Never reuse the runner container - see #15497.
+            "use_existing": self.reuse_runner,
             "properties": {}
         }
 
@@ -600,6 +611,8 @@ class RunnerContainer(Runner):
                 "content": packed
             }
             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+        elif self.embedded_tool.tool.get("id", "").startswith("file:"):
+            raise Exception("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
         else:
             main = self.loadingContext.loader.idx["_:main"]
             if main.get("id") == "_:main":