8460: Merge branch 'master' into 8460-websocket-go
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index b3d47dd8d05e5981ae4f645fa9c968ee7707e747..31272c825ea0f14874a6a13609c9df746ecc8871 100644 (file)
@@ -21,6 +21,7 @@ import schema_salad
 
 import arvados
 import arvados.config
+from arvados.errors import ApiError
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
@@ -72,7 +73,9 @@ class ArvCwlRunner(object):
         else:
             self.keep_client = arvados.keep.KeepClient(api_client=self.api, num_retries=self.num_retries)
 
-        for api in ["jobs", "containers"]:
+        self.work_api = None
+        expected_api = ["jobs", "containers"]
+        for api in expected_api:
             try:
                 methods = self.api._rootDesc.get('resources')[api]['methods']
                 if ('httpMethod' in methods['create'] and
@@ -81,11 +84,12 @@ class ArvCwlRunner(object):
                     break
             except KeyError:
                 pass
+
         if not self.work_api:
             if work_api is None:
                 raise Exception("No supported APIs")
             else:
-                raise Exception("Unsupported API '%s'" % work_api)
+                raise Exception("Unsupported API '%s', expected one of %s" % (work_api, expected_api))
 
     def arv_make_tool(self, toolpath_object, **kwargs):
         kwargs["work_api"] = self.work_api
@@ -120,7 +124,7 @@ class ArvCwlRunner(object):
                         logger.info("Job %s (%s) is Running", j.name, uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
-                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
                     uuid = event["object_uuid"]
                     try:
                         self.cond.acquire()
@@ -139,34 +143,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
 
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
+                if self.work_api == "containers":
+                    table = self.poll_api.container_requests()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
 
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -268,6 +281,12 @@ class ArvCwlRunner(object):
         if self.work_api == "containers":
             try:
                 current = self.api.containers().current().execute(num_retries=self.num_retries)
+            except ApiError as e:
+                # Status code 404 just means we're not running in a container.
+                if e.resp.status != 404:
+                    logger.info("Getting current container: %s", e)
+                return
+            try:
                 self.api.containers().update(uuid=current['uuid'],
                                              body={
                                                  'output': self.final_output_collection.portable_data_hash(),
@@ -287,22 +306,30 @@ class ArvCwlRunner(object):
 
         tool.visit(self.check_writable)
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  api_client=self.api,
                                                                  keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        if kwargs.get("create_template"):
-            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
-            tmpl.save()
-            # cwltool.main will write our return value to stdout.
-            return tmpl.uuid
-
-        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
-            return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+        existing_uuid = kwargs.get("update_workflow")
+        if existing_uuid or kwargs.get("create_workflow"):
+            if self.work_api == "jobs":
+                tmpl = RunnerTemplate(self, tool, job_order,
+                                      kwargs.get("enable_reuse"),
+                                      uuid=existing_uuid,
+                                      submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                      name=kwargs.get("name"))
+                tmpl.save()
+                # cwltool.main will write our return value to stdout.
+                return tmpl.uuid
+            else:
+                return upload_workflow(self, tool, job_order,
+                                       self.project_uuid,
+                                       uuid=existing_uuid,
+                                       submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                       name=kwargs.get("name"))
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -333,16 +360,20 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
+                                                self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                                name=kwargs.get("name"))
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name,
+                                      self.output_tags, submit_runner_ram=kwargs.get("submit_runner_ram"),
+                                      name=kwargs.get("name"))
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
                     "owner_uuid": self.project_uuid,
-                    "name": shortname(tool.tool["id"]),
+                    "name": kwargs["name"] if kwargs.get("name") else shortname(tool.tool["id"]),
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
@@ -374,6 +405,10 @@ class ArvCwlRunner(object):
             loopperf.__enter__()
             for runnable in jobiter:
                 loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                     with Perf(metrics, "run"):
                         runnable.run(**kwargs)
@@ -395,7 +430,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -488,9 +523,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
-    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
-    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+    exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+                         dest="create_workflow")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -506,6 +542,14 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help="Compute checksum of contents while collecting outputs",
                         dest="compute_checksum")
 
+    parser.add_argument("--submit-runner-ram", type=int,
+                        help="RAM (in MiB) required for the workflow runner job (default 1024)",
+                        default=1024)
+
+    parser.add_argument("--name", type=str,
+                        help="Name to use for workflow execution instance.",
+                        default=None)
+
     parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
     parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
@@ -528,7 +572,25 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+
+    if arvargs.version:
+        print versionstring()
+        return
+
+    if arvargs.update_workflow:
+        if arvargs.update_workflow.find('-7fd4e-') == 5:
+            want_api = 'containers'
+        elif arvargs.update_workflow.find('-p5p6p-') == 5:
+            want_api = 'jobs'
+        else:
+            want_api = None
+        if want_api and arvargs.work_api and want_api != arvargs.work_api:
+            logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+                arvargs.update_workflow, want_api, arvargs.work_api))
+            return 1
+        arvargs.work_api = want_api
+
+    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
@@ -554,6 +616,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     arvargs.conformance_test = None
     arvargs.use_container = True
+    arvargs.relax_path_checks = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,