refs #10524
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 3144592fc98f47083581e06fabcf900517d7ab01..9cabedf7f77a15532ad1379d638d5ca9a4169e44 100644 (file)
@@ -49,7 +49,7 @@ class ArvCwlRunner(object):
 
     """
 
-    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None):
+    def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None):
         self.api = api_client
         self.processes = {}
         self.lock = threading.Lock()
@@ -64,6 +64,7 @@ class ArvCwlRunner(object):
         self.pipeline = None
         self.final_output_collection = None
         self.output_name = output_name
+        self.output_tags = output_tags
         self.project_uuid = None
 
         if keep_client is not None:
@@ -138,34 +139,43 @@ class ArvCwlRunner(object):
         Runs in a separate thread.
         """
 
-        while True:
-            self.stop_polling.wait(15)
-            if self.stop_polling.is_set():
-                break
-            with self.lock:
-                keys = self.processes.keys()
-            if not keys:
-                continue
-
-            if self.work_api == "containers":
-                table = self.poll_api.containers()
-            elif self.work_api == "jobs":
-                table = self.poll_api.jobs()
-
-            try:
-                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
-            except Exception as e:
-                logger.warn("Error checking states on API server: %s", e)
-                continue
-
-            for p in proc_states["items"]:
-                self.on_message({
-                    "object_uuid": p["uuid"],
-                    "event_type": "update",
-                    "properties": {
-                        "new_attributes": p
-                    }
-                })
+        try:
+            while True:
+                self.stop_polling.wait(15)
+                if self.stop_polling.is_set():
+                    break
+                with self.lock:
+                    keys = self.processes.keys()
+                if not keys:
+                    continue
+
+                if self.work_api == "containers":
+                    table = self.poll_api.containers()
+                elif self.work_api == "jobs":
+                    table = self.poll_api.jobs()
+
+                try:
+                    proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    logger.warn("Error checking states on API server: %s", e)
+                    continue
+
+                for p in proc_states["items"]:
+                    self.on_message({
+                        "object_uuid": p["uuid"],
+                        "event_type": "update",
+                        "properties": {
+                            "new_attributes": p
+                        }
+                    })
+        except:
+            logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
+            self.cond.acquire()
+            self.processes.clear()
+            self.cond.notify()
+            self.cond.release()
+        finally:
+            self.stop_polling.set()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -183,7 +193,7 @@ class ArvCwlRunner(object):
             for v in obj:
                 self.check_writable(v)
 
-    def make_output_collection(self, name, outputObj):
+    def make_output_collection(self, name, tagsString, outputObj):
         outputObj = copy.deepcopy(outputObj)
 
         files = []
@@ -201,14 +211,28 @@ class ArvCwlRunner(object):
 
         srccollections = {}
         for k,v in generatemapper.items():
+            if k.startswith("_:"):
+                if v.type == "Directory":
+                    continue
+                if v.type == "CreateFile":
+                    with final.open(v.target, "wb") as f:
+                        f.write(v.resolved.encode("utf-8"))
+                    continue
+
+            if not k.startswith("keep:"):
+                raise Exception("Output source is not in keep or a literal")
             sp = k.split("/")
             srccollection = sp[0][5:]
             if srccollection not in srccollections:
-                srccollections[srccollection] = arvados.collection.CollectionReader(
-                    srccollection,
-                    api_client=self.api,
-                    keep_client=self.keep_client,
-                    num_retries=self.num_retries)
+                try:
+                    srccollections[srccollection] = arvados.collection.CollectionReader(
+                        srccollection,
+                        api_client=self.api,
+                        keep_client=self.keep_client,
+                        num_retries=self.num_retries)
+                except arvados.errors.ArgumentError as e:
+                    logger.error("Creating CollectionReader for '%s' '%s': %s", k, v, e)
+                    raise
             reader = srccollections[srccollection]
             try:
                 srcpath = "/".join(sp[1:]) if len(sp) > 1 else "."
@@ -218,7 +242,7 @@ class ArvCwlRunner(object):
 
         def rewrite(fileobj):
             fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
-            for k in ("basename", "size", "listing"):
+            for k in ("basename", "listing", "contents"):
                 if k in fileobj:
                     del fileobj[k]
 
@@ -234,7 +258,20 @@ class ArvCwlRunner(object):
                     final.api_response()["name"],
                     final.manifest_locator())
 
-        self.final_output_collection = final
+        final_uuid = final.manifest_locator()
+        tags = tagsString.split(',')
+        for tag in tags:
+             self.api.links().create(body={
+                "head_uuid": final_uuid, "link_class": "tag", "name": tag
+                }).execute(num_retries=self.num_retries)
+
+        def finalcollection(fileobj):
+            fileobj["location"] = "keep:%s/%s" % (final.portable_data_hash(), fileobj["location"])
+
+        adjustDirObjs(outputObj, finalcollection)
+        adjustFileObjs(outputObj, finalcollection)
+
+        return (outputObj, final)
 
     def set_crunch_output(self):
         if self.work_api == "containers":
@@ -259,22 +296,26 @@ class ArvCwlRunner(object):
 
         tool.visit(self.check_writable)
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
+        self.project_uuid = kwargs.get("project_uuid")
         self.pipeline = None
         make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess,
                                                                  api_client=self.api,
                                                                  keep_client=self.keep_client)
         self.fs_access = make_fs_access(kwargs["basedir"])
 
-        if kwargs.get("create_template"):
-            tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
-            tmpl.save()
-            # cwltool.main will write our return value to stdout.
-            return tmpl.uuid
-
-        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
-            return upload_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+        existing_uuid = kwargs.get("update_workflow")
+        if existing_uuid or kwargs.get("create_workflow"):
+            if self.work_api == "jobs":
+                tmpl = RunnerTemplate(self, tool, job_order,
+                                      kwargs.get("enable_reuse"),
+                                      uuid=existing_uuid)
+                tmpl.save()
+                # cwltool.main will write our return value to stdout.
+                return tmpl.uuid
+            else:
+                return upload_workflow(self, tool, job_order,
+                                       self.project_uuid,
+                                       uuid=existing_uuid)
 
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
@@ -305,9 +346,9 @@ class ArvCwlRunner(object):
                                          self.output_callback,
                                          **kwargs).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
             else:
-                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name)
+                runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"), self.output_name, self.output_tags)
 
         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
@@ -346,6 +387,10 @@ class ArvCwlRunner(object):
             loopperf.__enter__()
             for runnable in jobiter:
                 loopperf.__exit__()
+
+                if self.stop_polling.is_set():
+                    break
+
                 if runnable:
                     with Perf(metrics, "run"):
                         runnable.run(**kwargs)
@@ -367,7 +412,7 @@ class ArvCwlRunner(object):
             if sys.exc_info()[0] is KeyboardInterrupt:
                 logger.error("Interrupted, marking pipeline as failed")
             else:
-                logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
+                logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
@@ -390,7 +435,9 @@ class ArvCwlRunner(object):
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
-            self.make_output_collection(self.output_name, self.final_output)
+            if self.output_tags is None:
+                self.output_tags = ""
+            self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, self.output_tags, self.final_output)
             self.set_crunch_output()
 
         if self.final_status != "success":
@@ -448,6 +495,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
 
     parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--output-name", type=str, help="Name to use for collection that stores the final output.", default=None)
+    parser.add_argument("--output-tags", type=str, help="Tags for the final output collection separated by commas, e.g., '--output-tags tag0,tag1,tag2'.", default=None)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -457,9 +505,10 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="submit")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
-    exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
-    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
-    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
+    exgroup.add_argument("--create-template", action="store_true", help="(Deprecated) synonym for --create-workflow.",
+                         dest="create_workflow")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow (if using the 'containers' API) or pipeline template (if using the 'jobs' API). See --api.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update an existing Arvados workflow or pipeline template with the given UUID.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -497,7 +546,21 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
+
+    if arvargs.update_workflow:
+        if arvargs.update_workflow.find('-7fd4e-') == 5:
+            want_api = 'containers'
+        elif arvargs.update_workflow.find('-p5p6p-') == 5:
+            want_api = 'jobs'
+        else:
+            want_api = None
+        if want_api and arvargs.work_api and want_api != arvargs.work_api:
+            logger.error('--update-workflow arg {!r} uses {!r} API, but --api={!r} specified'.format(
+                arvargs.update_workflow, want_api, arvargs.work_api))
+            return 1
+        arvargs.work_api = want_api
+
+    if (arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     add_arv_hints()
@@ -505,7 +568,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client, output_name=arvargs.output_name, output_tags=arvargs.output_tags)
     except Exception as e:
         logger.error(e)
         return 1
@@ -523,6 +586,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None):
 
     arvargs.conformance_test = None
     arvargs.use_container = True
+    arvargs.relax_path_checks = True
 
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,