Merge branch 'master' into 8654-arv-jobs-cwl-runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 480d18ef81fb2bb4b10f368bb362edf50394e2e6..c0adb33550f10e2a95a26becbca3c961a4379ac3 100644 (file)
@@ -11,6 +11,7 @@ import cwltool.draft2tool
 import cwltool.workflow
 import cwltool.main
 from cwltool.process import shortname
+from cwltool.errors import WorkflowException
 import threading
 import cwltool.docker
 import fnmatch
@@ -152,6 +153,12 @@ class ArvadosJob(object):
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
 
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
         try:
             response = self.arvrunner.api.jobs().create(body={
                 "owner_uuid": self.arvrunner.project_uuid,
@@ -216,14 +223,23 @@ class ArvadosJob(object):
                         g = keepre.match(l)
                         if g:
                             keepdir = g.group(1)
-                        if tmpdir and outdir and keepdir:
-                            break
+
+                        # It turns out if the job fails and restarts it can
+                        # come up on a different compute node, so we have to
+                        # read the log to the end to be sure instead of taking the
+                        # easy way out.
+                        #
+                        #if tmpdir and outdir and keepdir:
+                        #    break
 
                     self.builder.outdir = outdir
                     self.builder.pathmapper.keepdir = keepdir
                     outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
             except Exception as e:
-                logger.exception("Got exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting job outputs:")
                 processStatus = "permanentFail"
 
             self.output_callback(outputs, processStatus)
@@ -389,6 +405,8 @@ class ArvCwlRunner(object):
 
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
+        self.debug = args.debug
+
         try:
             self.api.collections().get(uuid=crunchrunner_pdh).execute()
         except arvados.errors.ApiError as e:
@@ -462,7 +480,7 @@ class ArvCwlRunner(object):
                 if sys.exc_info()[0] is KeyboardInterrupt:
                     logger.error("Interrupted, marking pipeline as failed")
                 else:
-                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
             finally: