Merge branch '10088-cwl-dedup-deps' refs #10088
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index f7792cd35d0b0c5e188eb6a6912542c8704f0bdb..7bfdba8b80e48d9cb24d7054933b428aa3729764 100644 (file)
@@ -17,16 +17,18 @@ import cwltool.main
 import cwltool.workflow
 
 import arvados
 import cwltool.workflow
 
 import arvados
-import arvados.events
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
 import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
 
 from cwltool.process import shortname, UnsupportedRequirement
 from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -49,6 +51,8 @@ class ArvCwlRunner(object):
         self.num_retries = 4
         self.uuid = None
         self.work_api = work_api
         self.num_retries = 4
         self.uuid = None
         self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
 
         if self.work_api is None:
             # todo: autodetect API to use.
 
         if self.work_api is None:
             # todo: autodetect API to use.
@@ -59,7 +63,8 @@ class ArvCwlRunner(object):
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, work_api=self.work_api, **kwargs)
+            kwargs["work_api"] = self.work_api
+            return ArvadosCommandTool(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
@@ -93,24 +98,75 @@ class ArvCwlRunner(object):
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
                         self.cond.acquire()
                         j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
+
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def check_writable(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable"):
+                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+            for v in obj.itervalues():
+                self.check_writable(v)
+        if isinstance(obj, list):
+            for v in obj:
+                self.check_writable(v)
+
     def arvExecutor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
     def arvExecutor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
+        tool.visit(self.check_writable)
+
         if kwargs.get("quiet"):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
         if kwargs.get("quiet"):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
+
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
@@ -123,7 +179,9 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
-        self.debug = kwargs.get("debug")
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
 
         kwargs["make_fs_access"] = make_fs_access
@@ -137,6 +195,7 @@ class ArvCwlRunner(object):
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
             kwargs["outdir"] = "/var/spool/cwl"
             kwargs["docker_outdir"] = "/var/spool/cwl"
             kwargs["tmpdir"] = "/tmp"
+            kwargs["docker_tmpdir"] = "/tmp"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
         elif self.work_api == "jobs":
             kwargs["outdir"] = "$(task.outdir)"
             kwargs["docker_outdir"] = "$(task.outdir)"
@@ -168,12 +227,9 @@ class ArvCwlRunner(object):
             runnerjob.run()
             return runnerjob.uuid
 
             runnerjob.run()
             return runnerjob.uuid
 
-        arvados.config.settings()["ARVADOS_DISABLE_WEBSOCKETS"] = "1"
-
-        if self.work_api == "containers":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
-        if self.work_api == "jobs":
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
 
         if runnerjob:
             jobiter = iter((runnerjob,))
 
         if runnerjob:
             jobiter = iter((runnerjob,))
@@ -192,7 +248,8 @@ class ArvCwlRunner(object):
 
             for runnable in jobiter:
                 if runnable:
 
             for runnable in jobiter:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(logger, "run"):
+                        runnable.run(**kwargs)
                 else:
                     if self.processes:
                         self.cond.wait(1)
                 else:
                     if self.processes:
                         self.cond.wait(1)
@@ -203,7 +260,6 @@ class ArvCwlRunner(object):
             while self.processes:
                 self.cond.wait(1)
 
             while self.processes:
                 self.cond.wait(1)
 
-            events.close()
         except UnsupportedRequirement:
             raise
         except:
         except UnsupportedRequirement:
             raise
         except:
@@ -219,6 +275,8 @@ class ArvCwlRunner(object):
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
 
         if self.final_status == "UnsupportedRequirement":
             raise UnsupportedRequirement("Check log for details.")
@@ -230,17 +288,7 @@ class ArvCwlRunner(object):
             raise WorkflowException("Workflow did not return a result.")
 
         if kwargs.get("compute_checksum"):
             raise WorkflowException("Workflow did not return a result.")
 
         if kwargs.get("compute_checksum"):
-            def compute_checksums(fileobj):
-                if "checksum" not in fileobj:
-                    checksum = hashlib.sha1()
-                    with self.fs_access.open(fileobj["location"], "rb") as f:
-                        contents = f.read(1024*1024)
-                        while contents != "":
-                            checksum.update(contents)
-                            contents = f.read(1024*1024)
-                    fileobj["checksum"] = "sha1$%s" % checksum.hexdigest()
-
-            adjustFileObjs(self.final_output, compute_checksums)
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output
 
 
         return self.final_output
 
@@ -286,7 +334,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="enable_reuse",
                         help="")
 
                         default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -297,6 +345,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -323,7 +373,7 @@ def main(args, stdout, stderr, api_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if arvargs.create_template and not arvargs.job_order:
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     try:
         job_order_object = ({}, "")
 
     try: