19744: Extend test_done() to check for a usage report
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index ea7c9f7a33eee04849ae338388e1f998f5a1918b..63e04a157e29d4edb39844495e1c24e326251eb8 100644 (file)
@@ -27,6 +27,9 @@ from cwltool.job import JobBase
 
 import arvados.collection
 
+import crunchstat_summary.summarizer
+import crunchstat_summary.reader
+
 from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields, make_builder
@@ -370,8 +373,13 @@ class ArvadosContainer(JobBase):
         ram_multiplier = [1]
 
         oom_retry_req, _ = self.get_requirement("http://arvados.org/cwl#OutOfMemoryRetry")
-        if oom_retry_req and oom_retry_req.get('memoryRetryMultipler'):
-            ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+        if oom_retry_req:
+            if oom_retry_req.get('memoryRetryMultiplier'):
+                ram_multiplier.append(oom_retry_req.get('memoryRetryMultiplier'))
+            elif oom_retry_req.get('memoryRetryMultipler'):
+                ram_multiplier.append(oom_retry_req.get('memoryRetryMultipler'))
+            else:
+                ram_multiplier.append(2)
 
         if runtimeContext.runnerjob.startswith("arvwf:"):
             wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
@@ -492,11 +500,14 @@ class ArvadosContainer(JobBase):
             else:
                 processStatus = "permanentFail"
 
-            if processStatus == "permanentFail" and record["log_uuid"]:
-                logc = arvados.collection.CollectionReader(record["log_uuid"],
-                                                           api_client=self.arvrunner.api,
-                                                           keep_client=self.arvrunner.keep_client,
-                                                           num_retries=self.arvrunner.num_retries)
+            logc = None
+            if record["log_uuid"]:
+                logc = arvados.collection.Collection(record["log_uuid"],
+                                                     api_client=self.arvrunner.api,
+                                                     keep_client=self.arvrunner.keep_client,
+                                                     num_retries=self.arvrunner.num_retries)
+
+            if processStatus == "permanentFail" and logc is not None:
                 label = self.arvrunner.label(self)
                 done.logtail(
                     logc, logger.error,
@@ -522,6 +533,20 @@ class ArvadosContainer(JobBase):
                 uuid=self.uuid,
                 body={"container_request": {"properties": properties}}
             ).execute(num_retries=self.arvrunner.num_retries)
+
+            if logc is not None:
+                try:
+                    summerizer = crunchstat_summary.summarizer.Summarizer(crunchstat_summary.reader.CollectionReader(logc.manifest_locator(), collection_object=logc),
+                                                                          label=self.name, arv=self.arvrunner.api)
+                    summerizer.run()
+                    with logc.open("usage_report.html", "wt") as mr:
+                        mr.write(summerizer.html_report())
+                    logc.save()
+                except Exception as e:
+                    logger.warning("%s unable to generate resource usage report",
+                                 self.arvrunner.label(self),
+                                 exc_info=(e if self.arvrunner.debug else False))
+
         except WorkflowException as e:
             # Only include a stack trace if in debug mode.
             # A stack trace may obfuscate more useful output about the workflow.
@@ -593,7 +618,7 @@ class RunnerContainer(Runner):
                 "ram": 1024*1024 * (math.ceil(self.submit_runner_ram) + math.ceil(self.collection_cache_size)),
                 "API": True
             },
-            "use_existing": False, # Never reuse the runner container - see #15497.
+            "use_existing": self.reuse_runner,
             "properties": {}
         }
 
@@ -617,6 +642,8 @@ class RunnerContainer(Runner):
                 "content": packed
             }
             container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
+        elif self.embedded_tool.tool.get("id", "").startswith("file:"):
+            raise WorkflowException("Tool id '%s' is a local file but expected keep: or arvwf:" % self.embedded_tool.tool.get("id"))
         else:
             main = self.loadingContext.loader.idx["_:main"]
             if main.get("id") == "_:main":
@@ -737,14 +764,9 @@ class RunnerContainer(Runner):
 
         logger.info("%s submitted container_request %s", self.arvrunner.label(self), response["uuid"])
 
-        workbench1 = self.arvrunner.api.config()["Services"]["Workbench1"]["ExternalURL"]
         workbench2 = self.arvrunner.api.config()["Services"]["Workbench2"]["ExternalURL"]
-        url = ""
         if workbench2:
             url = "{}processes/{}".format(workbench2, response["uuid"])
-        elif workbench1:
-            url = "{}container_requests/{}".format(workbench1, response["uuid"])
-        if url:
             logger.info("Monitor workflow progress at %s", url)