import ciso8601
import uuid
import math
+import re
import arvados_cwl.util
import ruamel.yaml
self.job_runtime = job_runtime
self.running = False
self.uuid = None
+ self.attempt_count = 0
def update_pipeline_component(self, r):
pass
container_request["output_path"] = self.outdir
container_request["cwd"] = self.outdir
container_request["priority"] = runtimeContext.priority
- container_request["state"] = "Committed"
+ container_request["state"] = "Uncommitted"
container_request.setdefault("properties", {})
container_request["properties"]["cwl_input"] = self.joborder
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if runtimeContext.submit_request_uuid:
- response = self.arvrunner.api.container_requests().update(
- uuid=runtimeContext.submit_request_uuid,
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
- else:
- response = self.arvrunner.api.container_requests().create(
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
+ ram = runtime_constraints["ram"]
+
+ for i in range(1, 4):
+ runtime_constraints["ram"] = ram * i
+
+ if runtimeContext.submit_request_uuid:
+ response = self.arvrunner.api.container_requests().update(
+ uuid=runtimeContext.submit_request_uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ runtimeContext.submit_request_uuid = response["uuid"]
+
+ if response["container_uuid"] is not None:
+ break
+
+ if response["container_uuid"] is None:
+ runtime_constraints["ram"] = ram * (self.attempt_count+1)
+
+ container_request["state"] = "Committed"
+ response = self.arvrunner.api.container_requests().update(
+ uuid=runtimeContext.submit_request_uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
+ self.attempt_count += 1
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
logger.debug("Container request was %s", container_request)
self.output_callback({}, "permanentFail")
+ def out_of_memory_retry(self, record, container):
+ if container["exit_code"] == 137:
+ return True
+
+ logc = arvados.collection.CollectionReader(record["log_uuid"],
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+
+ loglines = [""]
+ def callback(v1, v2, v3):
+ loglines[0] = v3
+
+ done.logtail(logc, callback, "", maxlen=200)
+
+ oom_matches = r'(bad_alloc|out ?of ?memory|Container using over 95% of memory)'
+
+ print("Checking loglines", loglines[0])
+
+ print("Match", re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE))
+
+ if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+ return True
+
+ return False
+
def done(self, record):
outputs = {}
+ retried = False
try:
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
else:
processStatus = "permanentFail"
+ if processStatus == "permanentFail" and self.out_of_memory_retry(record, container):
+ logger.info("%s Container failed with out of memory error, retrying with more RAM.",
+ self.arvrunner.label(self))
+ self.job_runtime.submit_request_uuid = None
+ self.run(None)
+ retried = True
+ return
+
if rcode == 137:
logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
self.arvrunner.label(self))
logger.exception("%s while getting output object:", self.arvrunner.label(self))
processStatus = "permanentFail"
finally:
- self.output_callback(outputs, processStatus)
+ if not retried:
+ self.output_callback(outputs, processStatus)
class RunnerContainer(Runner):