import ciso8601
import uuid
import math
+import re
import arvados_cwl.util
-import ruamel.yaml as yaml
+import ruamel.yaml
from cwltool.errors import WorkflowException
from cwltool.process import UnsupportedRequirement, shortname
self.job_runtime = job_runtime
self.running = False
self.uuid = None
+ self.attempt_count = 0
def update_pipeline_component(self, r):
pass
container_request["output_path"] = self.outdir
container_request["cwd"] = self.outdir
container_request["priority"] = runtimeContext.priority
- container_request["state"] = "Committed"
+ container_request["state"] = "Uncommitted"
container_request.setdefault("properties", {})
container_request["properties"]["cwl_input"] = self.joborder
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
runtimeContext.pull_image,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ runtimeContext)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if runtimeContext.submit_request_uuid:
- response = self.arvrunner.api.container_requests().update(
- uuid=runtimeContext.submit_request_uuid,
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
- else:
- response = self.arvrunner.api.container_requests().create(
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
+ ram = runtime_constraints["ram"]
+
+ for i in range(1, 4):
+ runtime_constraints["ram"] = ram * i
+
+ if runtimeContext.submit_request_uuid:
+ response = self.arvrunner.api.container_requests().update(
+ uuid=runtimeContext.submit_request_uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ runtimeContext.submit_request_uuid = response["uuid"]
+
+ if response["container_uuid"] is not None:
+ break
+
+ if response["container_uuid"] is None:
+ runtime_constraints["ram"] = ram * (self.attempt_count+1)
+
+ container_request["state"] = "Committed"
+ response = self.arvrunner.api.container_requests().update(
+ uuid=runtimeContext.submit_request_uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
+ self.attempt_count += 1
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
logger.debug("Container request was %s", container_request)
self.output_callback({}, "permanentFail")
+ def out_of_memory_retry(self, record, container):
+ if container["exit_code"] == 137:
+ return True
+
+ logc = arvados.collection.CollectionReader(record["log_uuid"],
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+
+ loglines = [""]
+ def callback(v1, v2, v3):
+ loglines[0] = v3
+
+ done.logtail(logc, callback, "", maxlen=200)
+
+ oom_matches = r'(bad_alloc|out ?of ?memory|Container using over 95% of memory)'
+
+ print("Checking loglines", loglines[0])
+
+ print("Match", re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE))
+
+ if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+ return True
+
+ return False
+
def done(self, record):
outputs = {}
+ retried = False
try:
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
else:
processStatus = "permanentFail"
+ if processStatus == "permanentFail" and self.out_of_memory_retry(record, container):
+ logger.info("%s Container failed with out of memory error, retrying with more RAM.",
+ self.arvrunner.label(self))
+ self.job_runtime.submit_request_uuid = None
+ self.run(None)
+ retried = True
+ return
+
if rcode == 137:
logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
self.arvrunner.label(self))
logger.exception("%s while getting output object:", self.arvrunner.label(self))
processStatus = "permanentFail"
finally:
- self.output_callback(outputs, processStatus)
+ if not retried:
+ self.output_callback(outputs, processStatus)
class RunnerContainer(Runner):
"portable_data_hash": "%s" % workflowcollection
}
elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
- packed = yaml.safe_load(record["definition"])
+ uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow.json#" + frg
+ packedtxt = self.loadingContext.loader.fetch_text(uuid)
+ yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+ packed = yaml.load(packedtxt)
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+ main = self.loadingContext.loader.idx["_:main"]
+ if main.get("id") == "_:main":
+ del main["id"]
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "content": packed
+ "content": main
}
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
if runtimeContext.prefer_cached_downloads:
command.append("--prefer-cached-downloads")
+ if self.fast_parser:
+ command.append("--fast-parser")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command