import ciso8601
import uuid
import math
+import re
import arvados_cwl.util
-import ruamel.yaml as yaml
+import ruamel.yaml
from cwltool.errors import WorkflowException
from cwltool.process import UnsupportedRequirement, shortname
self.job_runtime = job_runtime
self.running = False
self.uuid = None
+ self.attempt_count = 0
def update_pipeline_component(self, r):
pass
container_request["output_path"] = self.outdir
container_request["cwd"] = self.outdir
container_request["priority"] = runtimeContext.priority
- container_request["state"] = "Committed"
+ container_request["state"] = "Uncommitted"
container_request.setdefault("properties", {})
container_request["properties"]["cwl_input"] = self.joborder
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
runtimeContext.pull_image,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ runtimeContext)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+ ram_multiplier = [1]
+
+ oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+ if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'):
+ ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
- self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+ if self.attempt_count == 0:
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if runtimeContext.submit_request_uuid:
- response = self.arvrunner.api.container_requests().update(
- uuid=runtimeContext.submit_request_uuid,
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
- else:
- response = self.arvrunner.api.container_requests().create(
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
+ ram = runtime_constraints["ram"]
+
+ self.uuid = runtimeContext.submit_request_uuid
+
+ for i in ram_multiplier:
+ runtime_constraints["ram"] = ram * i
+
+ if self.uuid:
+ response = self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ self.uuid = response["uuid"]
+
+ if response["container_uuid"] is not None:
+ break
+
+ if response["container_uuid"] is None:
+ runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
+
+ container_request["state"] = "Committed"
+ response = self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
- self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
+ self.attempt_count += 1
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
logger.debug("Container request was %s", container_request)
self.output_callback({}, "permanentFail")
+ def out_of_memory_retry(self, record, container):
+ oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+ if oom_retry_req is None:
+ return False
+
+ # Sometimes it gets killed with no warning
+ if container["exit_code"] == 137:
+ return True
+
+ logc = arvados.collection.CollectionReader(record["log_uuid"],
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+
+ loglines = [""]
+ def callback(v1, v2, v3):
+ loglines[0] = v3
+
+ done.logtail(logc, callback, "", maxlen=1000)
+
+ # Check allocation failure
+ oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
+ if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+ return True
+
+ return False
+
def done(self, record):
outputs = {}
+ retried = False
+ rcode = None
try:
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
else:
processStatus = "permanentFail"
+ if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
+ logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
+ self.arvrunner.label(self))
+ self.job_runtime.submit_request_uuid = None
+ self.uuid = None
+ self.run(None)
+ retried = True
+ return
+
if rcode == 137:
- logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
+ logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
self.arvrunner.label(self))
else:
processStatus = "permanentFail"
label = self.arvrunner.label(self)
done.logtail(
logc, logger.error,
- "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+ "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
logger.exception("%s while getting output object:", self.arvrunner.label(self))
processStatus = "permanentFail"
finally:
- self.output_callback(outputs, processStatus)
+ if not retried:
+ self.output_callback(outputs, processStatus)
class RunnerContainer(Runner):
"ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
"API": True
},
- "use_existing": False, # Never reuse the runner container - see #15497.
+ "use_existing": self.reuse_runner,
"properties": {}
}
"portable_data_hash": "%s" % workflowcollection
}
elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
- packed = yaml.safe_load(record["definition"])
+ uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow.json#" + frg
+ packedtxt = self.loadingContext.loader.fetch_text(uuid)
+ yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+ packed = yaml.load(packedtxt)
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ elif self.embedded_tool.tool.get("id", "").startswith("file:"):
+ raise Exception("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+ main = self.loadingContext.loader.idx["_:main"]
+ if main.get("id") == "_:main":
+ del main["id"]
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "content": packed
+ "content": main
}
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})