refs #10524
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 5515694024ccb86dd343543099013db945a1f515..9cabedf7f77a15532ad1379d638d5ca9a4169e44 100644 (file)
@@ -139,34 +139,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
 
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
+                if self.work_api == "containers":
+                    table = self.poll_api.containers()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
 
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -294,19 +303,19 @@ class ArvCwlRunner(object):
                                                                  keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        update_uuid = kwargs.get("update_workflow")
-        if update_uuid or kwargs.get("create_workflow"):
+        existing_uuid = kwargs.get("update_workflow")
+        if existing_uuid or kwargs.get("create_workflow"):
             if self.work_api == "jobs":
                 tmpl = RunnerTemplate(self, tool, job_order,
                                       kwargs.get("enable_reuse"),
-                                      uuid=update_uuid)
+                                      uuid=existing_uuid)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return tmpl.uuid
             else:
                 return upload_workflow(self, tool, job_order,
                                        self.project_uuid,
-                                       uuid=update_uuid)
+                                       uuid=existing_uuid)
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -378,6 +387,10 @@ class ArvCwlRunner(object):
             loopperf.__enter__()
             for runnable in jobiter:
                 loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                     with Perf(metrics, "run"):
                         runnable.run(**kwargs)
@@ -399,7 +412,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -494,7 +507,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
                          dest="create_workflow")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow or pipeline template (depending on selected API, see --api).")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
     exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
@@ -573,6 +586,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     arvargs.conformance_test = None
     arvargs.use_container = True
+    arvargs.relax_path_checks = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,