19975: Initial work to auto-retry on OOM
authorPeter Amstutz <peter.amstutz@curii.com>
Sat, 4 Mar 2023 22:25:17 +0000 (17:25 -0500)
committerPeter Amstutz <peter.amstutz@curii.com>
Sat, 4 Mar 2023 22:25:17 +0000 (17:25 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curii.com>

sdk/cwl/arvados_cwl/arvcontainer.py

index 742906c616514b9bcd2317cfeb4a11697f1c8522..4d0fde7440455f52e4a93f40ad9455af3d98a195 100644 (file)
@@ -15,6 +15,7 @@ import datetime
 import ciso8601
 import uuid
 import math
+import re
 
 import arvados_cwl.util
 import ruamel.yaml
@@ -56,6 +57,7 @@ class ArvadosContainer(JobBase):
         self.job_runtime = job_runtime
         self.running = False
         self.uuid = None
+        self.attempt_count = 0
 
     def update_pipeline_component(self, r):
         pass
@@ -88,7 +90,7 @@ class ArvadosContainer(JobBase):
         container_request["output_path"] = self.outdir
         container_request["cwd"] = self.outdir
         container_request["priority"] = runtimeContext.priority
-        container_request["state"] = "Committed"
+        container_request["state"] = "Uncommitted"
         container_request.setdefault("properties", {})
 
         container_request["properties"]["cwl_input"] = self.joborder
@@ -375,20 +377,40 @@ class ArvadosContainer(JobBase):
         self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
 
         try:
-            if runtimeContext.submit_request_uuid:
-                response = self.arvrunner.api.container_requests().update(
-                    uuid=runtimeContext.submit_request_uuid,
-                    body=container_request,
-                    **extra_submit_params
-                ).execute(num_retries=self.arvrunner.num_retries)
-            else:
-                response = self.arvrunner.api.container_requests().create(
-                    body=container_request,
-                    **extra_submit_params
-                ).execute(num_retries=self.arvrunner.num_retries)
+            ram = runtime_constraints["ram"]
+
+            for i in range(1, 4):
+                runtime_constraints["ram"] = ram * i
+
+                if runtimeContext.submit_request_uuid:
+                    response = self.arvrunner.api.container_requests().update(
+                        uuid=runtimeContext.submit_request_uuid,
+                        body=container_request,
+                        **extra_submit_params
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                else:
+                    response = self.arvrunner.api.container_requests().create(
+                        body=container_request,
+                        **extra_submit_params
+                    ).execute(num_retries=self.arvrunner.num_retries)
+                    runtimeContext.submit_request_uuid = response["uuid"]
+
+                if response["container_uuid"] is not None:
+                    break
+
+            if response["container_uuid"] is None:
+                runtime_constraints["ram"] = ram * (self.attempt_count+1)
+
+            container_request["state"] = "Committed"
+            response = self.arvrunner.api.container_requests().update(
+                uuid=runtimeContext.submit_request_uuid,
+                body=container_request,
+                **extra_submit_params
+            ).execute(num_retries=self.arvrunner.num_retries)
 
             self.uuid = response["uuid"]
             self.arvrunner.process_submitted(self)
+            self.attempt_count += 1
 
             if response["state"] == "Final":
                 logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
@@ -399,8 +421,35 @@ class ArvadosContainer(JobBase):
             logger.debug("Container request was %s", container_request)
             self.output_callback({}, "permanentFail")
 
+    def out_of_memory_retry(self, record, container):
+        if container["exit_code"] == 137:
+            return True
+
+        logc = arvados.collection.CollectionReader(record["log_uuid"],
+                                                   api_client=self.arvrunner.api,
+                                                   keep_client=self.arvrunner.keep_client,
+                                                   num_retries=self.arvrunner.num_retries)
+
+        loglines = [""]
+        def callback(v1, v2, v3):
+            loglines[0] = v3
+
+        done.logtail(logc, callback, "", maxlen=200)
+
+        oom_matches = r'(bad_alloc|out ?of ?memory|Container using over 95% of memory)'
+
+        print("Checking loglines", loglines[0])
+
+        print("Match", re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE))
+
+        if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+            return True
+
+        return False
+
     def done(self, record):
         outputs = {}
+        retried = False
         try:
             container = self.arvrunner.api.containers().get(
                 uuid=record["container_uuid"]
@@ -418,6 +467,14 @@ class ArvadosContainer(JobBase):
                 else:
                     processStatus = "permanentFail"
 
+                if processStatus == "permanentFail" and self.out_of_memory_retry(record, container):
+                    logger.info("%s Container failed with out of memory error, retrying with more RAM.",
+                                 self.arvrunner.label(self))
+                    self.job_runtime.submit_request_uuid = None
+                    self.run(None)
+                    retried = True
+                    return
+
                 if rcode == 137:
                     logger.warning("%s Container may have been killed for using too much RAM.  Try resubmitting with a higher 'ramMin'.",
                                  self.arvrunner.label(self))
@@ -464,7 +521,8 @@ class ArvadosContainer(JobBase):
             logger.exception("%s while getting output object:", self.arvrunner.label(self))
             processStatus = "permanentFail"
         finally:
-            self.output_callback(outputs, processStatus)
+            if not retried:
+                self.output_callback(outputs, processStatus)
 
 
 class RunnerContainer(Runner):