import arvados.collection
+import crunchstat_summary.summarizer
+import crunchstat_summary.reader
+
from .arvdocker import arv_docker_get_image
from . import done
from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
- ramMultiplier = [1]
+ ram_multiplier = [1]
oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
- if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'):
- ramMultiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+ if oom_retry_req:
+ if oom_retry_req.get('memoryRetryMultiplier'):
+ ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
+ elif oom_retry_req.get('memoryRetryMultipler'):
+ ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+ else:
+ ram_multiplier.append(2)
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
self.uuid = runtimeContext.submit_request_uuid
- for i in ramMultiplier:
+ for i in ram_multiplier:
runtime_constraints["ram"] = ram * i
if self.uuid:
break
if response["container_uuid"] is None:
- runtime_constraints["ram"] = ram * ramMultiplier[self.attempt_count]
+ runtime_constraints["ram"] = ram * ram_multiplier[self.attempt_count]
container_request["state"] = "Committed"
response = self.arvrunner.api.container_requests().update(
def done(self, record):
outputs = {}
retried = False
+ rcode = None
try:
container = self.arvrunner.api.containers().get(
uuid=record["container_uuid"]
processStatus = "permanentFail"
if processStatus == "permanentFail" and self.attempt_count == 1 and self.out_of_memory_retry(record, container):
- logger.info("%s Container failed with out of memory error, retrying with more RAM.",
+ logger.warning("%s Container failed with out of memory error, retrying with more RAM.",
self.arvrunner.label(self))
self.job_runtime.submit_request_uuid = None
self.uuid = None
return
if rcode == 137:
- logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin'.",
+ logger.warning("%s Container may have been killed for using too much RAM. Try resubmitting with a higher 'ramMin' or use the arv:OutOfMemoryRetry feature.",
self.arvrunner.label(self))
else:
processStatus = "permanentFail"
- if processStatus == "permanentFail" and record["log_uuid"]:
- logc = arvados.collection.CollectionReader(record["log_uuid"],
- api_client=self.arvrunner.api,
- keep_client=self.arvrunner.keep_client,
- num_retries=self.arvrunner.num_retries)
+ logc = None
+ if record["log_uuid"]:
+ logc = arvados.collection.Collection(record["log_uuid"],
+ api_client=self.arvrunner.api,
+ keep_client=self.arvrunner.keep_client,
+ num_retries=self.arvrunner.num_retries)
+
+ if processStatus == "permanentFail" and logc is not None:
label = self.arvrunner.label(self)
done.logtail(
logc, logger.error,
- "%s (%s) error log:" % (label, record["uuid"]), maxlen=40)
+ "%s (%s) error log:" % (label, record["uuid"]), maxlen=40, include_crunchrun=(rcode is None or rcode > 127))
if record["output_uuid"]:
if self.arvrunner.trash_intermediate or self.arvrunner.intermediate_output_ttl:
uuid=self.uuid,
body={"container_request": {"properties": properties}}
).execute(num_retries=self.arvrunner.num_retries)
+
+ if logc is not None and self.job_runtime.enable_usage_report is not False:
+ try:
+ summarizer = crunchstat_summary.summarizer.ContainerRequestSummarizer(
+ record,
+ collection_object=logc,
+ label=self.name,
+ arv=self.arvrunner.api)
+ summarizer.run()
+ with logc.open("usage_report.html", "wt") as mr:
+ mr.write(summarizer.html_report())
+ logc.save()
+
+ # Post warnings about nodes that are under-utilized.
+ for rc in summarizer._recommend_gen(lambda x: x):
+ self.usage_report_notes.append(rc)
+
+ except Exception as e:
+ logger.warning("%s unable to generate resource usage report",
+ self.arvrunner.label(self),
+ exc_info=(e if self.arvrunner.debug else False))
+
except WorkflowException as e:
# Only include a stack trace if in debug mode.
# A stack trace may obfuscate more useful output about the workflow.
}
self.job_order[param] = {"$include": mnt}
+ container_image = arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext)
+
+ workflow_runner_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+ if workflow_runner_req and workflow_runner_req.get("acrContainerImage"):
+ container_image = workflow_runner_req.get("acrContainerImage")
+
container_req = {
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
+ "container_image": container_image,
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
"API": True
},
- "use_existing": False, # Never reuse the runner container - see #15497.
+ "use_existing": self.reuse_runner,
"properties": {}
}
"content": packed
}
container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+ elif self.embedded_tool.tool.get("id", "").startswith("file:"):
+ raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
else:
main = self.loadingContext.loader.idx["_:main"]
if main.get("id") == "_:main":
if runtimeContext.prefer_cached_downloads:
command.append("--prefer-cached-downloads")
+ if runtimeContext.enable_usage_report is True:
+ command.append("--enable-usage-report")
+
+ if runtimeContext.enable_usage_report is False:
+ command.append("--disable-usage-report")
+
if self.fast_parser:
command.append("--fast-parser")
logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
- workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
- url = ""
if workbench2:
url = "{}processes/{}".format(workbench2, response["uuid"])
- elif workbench1:
- url = "{}container_requests/{}".format(workbench1, response["uuid"])
- if url:
logger.info("Monitor workflow progress at %s", url)