import ciso8601
import uuid
import math
+import re
import arvados_cwl.util
-import ruamel.yaml as yaml
+import ruamel.yaml
from cwltool.errors import WorkflowException
from cwltool.process import UnsupportedRequirement, shortname
self.job_runtime = job_runtime
self.running = False
self.uuid = None
+ self.attempt_count = 0
def update_pipeline_component(self, r):
pass
container_request["output_path"] = self.outdir
container_request["cwd"] = self.outdir
container_request["priority"] = runtimeContext.priority
- container_request["state"] = "Committed"
+ container_request["state"] = "Uncommitted"
container_request.setdefault("properties", {})
container_request["properties"]["cwl_input"] = self.joborder
container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
docker_req,
runtimeContext.pull_image,
- runtimeContext.project_uuid,
- runtimeContext.force_docker_pull,
- runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker,
- runtimeContext.copy_deps)
+ runtimeContext)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
if api_req:
runtime_constraints["API"] = True
- use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheDisk", 0) > 0)
+ use_disk_cache = (self.arvrunner.api.config()["Containers"].get("DefaultKeepCacheRAM", 0) == 0)
+
+ keep_cache_type_req, _ = self.get_requirement("http://arvados.org/cwl#KeepCacheTypeRequirement")
+ if keep_cache_type_req:
+ if "keepCacheType" in keep_cache_type_req:
+ if keep_cache_type_req["keepCacheType"] == "ram_cache":
+ use_disk_cache = False
runtime_req, _ = self.get_requirement("http://arvados.org/cwl#RuntimeConstraints")
if runtime_req:
- if "keepCacheType" in runtime_req:
- if cache_type == "ram_cache":
- use_disk_cache = False
if "keep_cache" in runtime_req:
if use_disk_cache:
- # If DefaultKeepCacheDisk is non-zero it means we should use disk cache.
+ # If DefaultKeepCacheRAM is zero it means we should use disk cache.
runtime_constraints["keep_cache_disk"] = math.ceil(runtime_req["keep_cache"] * 2**20)
else:
runtime_constraints["keep_cache_ram"] = math.ceil(runtime_req["keep_cache"] * 2**20)
"writable": True
}
- if use_disk_cache and "keep_cache_disk" not in runtime_constraints:
- # Cache size wasn't explicitly set so calculate a default
- # based on 2x RAM request or 1 GB per core, whichever is
- # smaller. This is to avoid requesting 100s of GB of disk
- # cache when requesting a node with a huge amount of RAM.
- runtime_constraints["keep_cache_disk"] = min(runtime_constraints["ram"] * 2, runtime_constraints["vcpus"] * (1024*1024*1024))
-
partition_req, _ = self.get_requirement("http://arvados.org/cwl#PartitionRequirement")
if partition_req:
scheduling_parameters["partitions"] = aslist(partition_req["partition"])
logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+ ram_multiplier = [1]
+
+ oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+ if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'):
+ ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
- self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+ if self.attempt_count == 0:
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- if runtimeContext.submit_request_uuid:
- response = self.arvrunner.api.container_requests().update(
- uuid=runtimeContext.submit_request_uuid,
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
- else:
- response = self.arvrunner.api.container_requests().create(
- body=container_request,
- **extra_submit_params
- ).execute(num_retries=self.arvrunner.num_retries)
+ ram = runtime_constraints["ram"]
+
+ self.uuid = runtimeContext.submit_request_uuid
+
+ for i in ram_multiplier:
+ runtime_constraints["ram"] = ram * i
+
+ if self.uuid:
+ response = self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
+ self.uuid = response["uuid"]
+
+ if response["container_uuid"] is not None:
+ break
+
+ if response["container_uuid"] is None:
+ runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
+
+ container_request["state"] = "Committed"
+ response = self.arvrunner.api.container_requests().update(
+ uuid=self.uuid,
+ body=container_request,
+ **extra_submit_params
+ ).execute(num_retries=self.arvrunner.num_retries)
- self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
+ self.attempt_count += 1
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
logger.debug("Container request was %s", container_request)
self.output_callback({}, "permanentFail")
+ def out_of_memory_retry(self, record, container):
+ oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
+ if oom_retry_req is None:
+ return False
+
+ # Sometimes it gets killed with no warning
+ if container["exit_code"] == 137:
+ return True
+
+ logc = arvados.collection.CollectionReader(record["log_uuid"],
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+
+ loglines = [""]
+ def callback(v1, v2, v3):
+ loglines[0] = v3
+
+ done.logtail(logc, callback, "", maxlen=1000)
+
+ # Check allocation failure
+ oom_matches = oom_retry_req.get('memoryErrorRegex') or r'(bad_alloc|out ?of ?memory|memory ?error|container using over 9.% of memory)'
+ if re.search(oom_matches, loglines[0], re.IGNORECASE | re.MULTILINE):
+ return True
+
+ return False
+
def done(self, record):
outputs = {}
+ retried = False
+ rcode = None
try:
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
else:
processStatus = "permanentFail"
+ if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
+ logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
+ self.arvrunner.label(self))
+ self.job_runtime.submit_request_uuid = None
+ self.uuid = None
+ self.run(None)
+ retried = True
+ return
+
if rcode == 137:
- logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
+ logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
self.arvrunner.label(self))
else:
processStatus = "permanentFail"
label = self.arvrunner.label(self)
done.logtail(
logc, logger.error,
- "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+ "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
logger.exception("%s while getting output object:", self.arvrunner.label(self))
processStatus = "permanentFail"
finally:
- self.output_callback(outputs, processStatus)
+ if not retried:
+ self.output_callback(outputs, processStatus)
class RunnerContainer(Runner):
}
self.job_order[param] = {"$include": mnt}
+ container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
+
+ workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+ container_image = workflow_runner_req.get("acrContainerImage")
+
container_req = {
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+ "container_image": container_image,
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"portable_data_hash": "%s" % workflowcollection
}
elif self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
- workflowpath = "/var/lib/cwl/workflow.json#main"
- record = self.arvrunner.api.workflows().get(uuid=self.embedded_tool.tool["id"][6:33]).execute(num_retries=self.arvrunner.num_retries)
- packed = yaml.safe_load(record["definition"])
+ uuid, frg = urllib.parse.urldefrag(self.embedded_tool.tool["id"])
+ workflowpath = "/var/lib/cwl/workflow.json#" + frg
+ packedtxt = self.loadingContext.loader.fetch_text(uuid)
+ yaml = ruamel.yaml.YAML(typ='safe', pure=True)
+ packed = yaml.load(packedtxt)
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
"content": packed
}
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext, git_info)
+ main = self.loadingContext.loader.idx["_:main"]
+ if main.get("id") == "_:main":
+ del main["id"]
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
- "content": packed
+ "content": main
}
container_req["properties"].update({k.replace("http://arvados.org/cwl#", "arv:"): v for k, v in git_info.items()})
if runtimeContext.prefer_cached_downloads:
command.append("--prefer-cached-downloads")
+ if self.fast_parser:
+ command.append("--fast-parser")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command