8460: Move loggedDuration from keepstore to sdk pkg as stats.Duration.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index b3d47dd8d05e5981ae4f645fa9c968ee7707e747..9cabedf7f77a15532ad1379d638d5ca9a4169e44 100644 (file)
@@ -139,34 +139,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
 
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
+                if self.work_api == "containers":
+                    table = self.poll_api.containers()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
 
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -287,22 +296,26 @@ class ArvCwlRunner(object):
 
         tool.visit(self.check_writable)
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  api_client=self.api,
                                                                  keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        if kwargs.get("create_template"):
-            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
-            tmpl.save()
-            # cwltool.main will write our return value to stdout.
-            return tmpl.uuid
-
-        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
-            return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+        existing_uuid = kwargs.get("update_workflow")
+        if existing_uuid or kwargs.get("create_workflow"):
+            if self.work_api == "jobs":
+                tmpl = RunnerTemplate(self, tool, job_order,
+                                      kwargs.get("enable_reuse"),
+                                      uuid=existing_uuid)
+                tmpl.save()
+                # cwltool.main will write our return value to stdout.
+                return tmpl.uuid
+            else:
+                return upload_workflow(self, tool, job_order,
+                                       self.project_uuid,
+                                       uuid=existing_uuid)
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -374,6 +387,10 @@ class ArvCwlRunner(object):
             loopperf.__enter__()
             for runnable in jobiter:
                 loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                     with Perf(metrics, "run"):
                         runnable.run(**kwargs)
@@ -395,7 +412,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -488,9 +505,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
-    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
-    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+    exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+                         dest="create_workflow")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -528,7 +546,21 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+
+    if arvargs.update_workflow:
+        if arvargs.update_workflow.find('-7fd4e-') == 5:
+            want_api = 'containers'
+        elif arvargs.update_workflow.find('-p5p6p-') == 5:
+            want_api = 'jobs'
+        else:
+            want_api = None
+        if want_api and arvargs.work_api and want_api != arvargs.work_api:
+            logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+                arvargs.update_workflow, want_api, arvargs.work_api))
+            return 1
+        arvargs.work_api = want_api
+
+    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
@@ -554,6 +586,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     arvargs.conformance_test = None
     arvargs.use_container = True
+    arvargs.relax_path_checks = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,