Merge branch '10088-cwl-dedup-deps' refs #10088
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index ba26816c219bc871dfb85e6bfc6290916390d395..7bfdba8b80e48d9cb24d7054933b428aa3729764 100644 (file)
@@ -1,12 +1,15 @@
 #!/usr/bin/env python
 
 #!/usr/bin/env python
 
-# Implement cwl-runner interface for submitting and running jobs on Arvados.
+# Implement cwl-runner interface for submitting and running work on Arvados, using
+# either the Crunch jobs API or Crunch containers API.
 
 import argparse
 import logging
 import os
 import sys
 import threading
 
 import argparse
 import logging
 import os
 import sys
 import threading
+import hashlib
+from functools import partial
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
 import pkg_resources  # part of setuptools
 
 from cwltool.errors import WorkflowException
@@ -14,61 +17,78 @@ import cwltool.main
 import cwltool.workflow
 
 import arvados
 import cwltool.workflow
 
 import arvados
-import arvados.events
+import arvados.config
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
 
 from .arvcontainer import ArvadosContainer, RunnerContainer
 from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
 from .arvtool import ArvadosCommandTool
 from .fsaccess import CollectionFsAccess
+from .arvworkflow import make_workflow
+from .perf import Perf
 
 from cwltool.process import shortname, UnsupportedRequirement
 
 from cwltool.process import shortname, UnsupportedRequirement
+from cwltool.pathmapper import adjustFileObjs
+from cwltool.draft2tool import compute_checksums
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
 class ArvCwlRunner(object):
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
 class ArvCwlRunner(object):
-    """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
-    complete, and report output."""
+    """Execute a CWL tool or workflow, submit work (using either jobs or
+    containers API), wait for them to complete, and report output.
 
 
-    def __init__(self, api_client, crunch2):
+    """
+
+    def __init__(self, api_client, work_api=None):
         self.api = api_client
         self.api = api_client
-        self.jobs = {}
+        self.processes = {}
         self.lock = threading.Lock()
         self.cond = threading.Condition(self.lock)
         self.final_output = None
         self.lock = threading.Lock()
         self.cond = threading.Condition(self.lock)
         self.final_output = None
+        self.final_status = None
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
         self.uploaded = {}
         self.num_retries = 4
         self.uuid = None
-        self.crunch2 = crunch2
+        self.work_api = work_api
+        self.stop_polling = threading.Event()
+        self.poll_api = None
+
+        if self.work_api is None:
+            # todo: autodetect API to use.
+            self.work_api = "jobs"
+
+        if self.work_api not in ("containers", "jobs"):
+            raise Exception("Unsupported API '%s'" % self.work_api)
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
 
     def arvMakeTool(self, toolpath_object, **kwargs):
         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
-            return ArvadosCommandTool(self, toolpath_object, crunch2=self.crunch2, **kwargs)
+            kwargs["work_api"] = self.work_api
+            return ArvadosCommandTool(self, toolpath_object, **kwargs)
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
         if processStatus == "success":
         else:
             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
 
     def output_callback(self, out, processStatus):
         if processStatus == "success":
-            logger.info("Overall job status is %s", processStatus)
+            logger.info("Overall process status is %s", processStatus)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
-
         else:
         else:
-            logger.warn("Overall job status is %s", processStatus)
+            logger.warn("Overall process status is %s", processStatus)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+        self.final_status = processStatus
         self.final_output = out
 
     def on_message(self, event):
         if "object_uuid" in event:
         self.final_output = out
 
     def on_message(self, event):
         if "object_uuid" in event:
-            if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+            if event["object_uuid"] in self.processes and event["event_type"] == "update":
+                if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
                     uuid = event["object_uuid"]
                     with self.lock:
                     uuid = event["object_uuid"]
                     with self.lock:
-                        j = self.jobs[uuid]
+                        j = self.processes[uuid]
                         logger.info("Job %s (%s) is Running", j.name, uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
                         logger.info("Job %s (%s) is Running", j.name, uuid)
                         j.running = True
                         j.update_pipeline_component(event["properties"]["new_attributes"])
@@ -76,29 +96,82 @@ class ArvCwlRunner(object):
                     uuid = event["object_uuid"]
                     try:
                         self.cond.acquire()
                     uuid = event["object_uuid"]
                     try:
                         self.cond.acquire()
-                        j = self.jobs[uuid]
+                        j = self.processes[uuid]
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                        j.done(event["properties"]["new_attributes"])
+                        with Perf(logger, "done %s" % j.name):
+                            j.done(event["properties"]["new_attributes"])
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
                         self.cond.notify()
                     finally:
                         self.cond.release()
 
+    def poll_states(self):
+        """Poll status of jobs or containers listed in the processes dict.
+
+        Runs in a separate thread.
+        """
+
+        while True:
+            self.stop_polling.wait(15)
+            if self.stop_polling.is_set():
+                break
+            with self.lock:
+                keys = self.processes.keys()
+            if not keys:
+                continue
+
+            if self.work_api == "containers":
+                table = self.poll_api.containers()
+            elif self.work_api == "jobs":
+                table = self.poll_api.jobs()
+
+            try:
+                proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
+            except Exception as e:
+                logger.warn("Error checking states on API server: %s", e)
+                continue
+
+            for p in proc_states["items"]:
+                self.on_message({
+                    "object_uuid": p["uuid"],
+                    "event_type": "update",
+                    "properties": {
+                        "new_attributes": p
+                    }
+                })
+
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
     def get_uploaded(self):
         return self.uploaded.copy()
 
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def check_writable(self, obj):
+        if isinstance(obj, dict):
+            if obj.get("writable"):
+                raise UnsupportedRequirement("InitialWorkDir feature 'writable: true' not supported")
+            for v in obj.itervalues():
+                self.check_writable(v)
+        if isinstance(obj, list):
+            for v in obj:
+                self.check_writable(v)
+
     def arvExecutor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
     def arvExecutor(self, tool, job_order, **kwargs):
         self.debug = kwargs.get("debug")
 
+        tool.visit(self.check_writable)
+
         if kwargs.get("quiet"):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
         if kwargs.get("quiet"):
             logger.setLevel(logging.WARN)
             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
+        if self.debug:
+            logger.setLevel(logging.DEBUG)
+
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
         useruuid = self.api.users().current().execute()["uuid"]
         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
         self.pipeline = None
+        make_fs_access = kwargs.get("make_fs_access") or partial(CollectionFsAccess, api_client=self.api)
+        self.fs_access = make_fs_access(kwargs["basedir"])
 
         if kwargs.get("create_template"):
             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
 
         if kwargs.get("create_template"):
             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
@@ -106,13 +179,41 @@ class ArvCwlRunner(object):
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
             # cwltool.main will write our return value to stdout.
             return tmpl.uuid
 
+        if kwargs.get("create_workflow") or kwargs.get("update_workflow"):
+            return make_workflow(self, tool, job_order, self.project_uuid, kwargs.get("update_workflow"))
+
+        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
+
+        kwargs["make_fs_access"] = make_fs_access
+        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+        kwargs["use_container"] = True
+        kwargs["tmpdir_prefix"] = "tmp"
+        kwargs["on_error"] = "continue"
+        kwargs["compute_checksum"] = kwargs.get("compute_checksum")
+
+        if self.work_api == "containers":
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["docker_outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+            kwargs["docker_tmpdir"] = "/tmp"
+        elif self.work_api == "jobs":
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["docker_outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
+
+        runnerjob = None
         if kwargs.get("submit"):
         if kwargs.get("submit"):
-            if self.crunch2:
-                runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
+            if self.work_api == "containers":
+                if tool.tool["class"] == "CommandLineTool":
+                    runnerjob = tool.job(job_order,
+                                         self.output_callback,
+                                         **kwargs).next()
+                else:
+                    runnerjob = RunnerContainer(self, tool, job_order, kwargs.get("enable_reuse"))
             else:
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
             else:
                 runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
 
-        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.crunch2:
+        if not kwargs.get("submit") and "cwl_runner_job" not in kwargs and not self.work_api == "containers":
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
             # Create pipeline for local run
             self.pipeline = self.api.pipeline_instances().create(
                 body={
@@ -122,37 +223,21 @@ class ArvCwlRunner(object):
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if kwargs.get("submit") and not kwargs.get("wait"):
-                runnerjob.run()
-                return runnerjob.uuid
+        if runnerjob and not kwargs.get("wait"):
+            runnerjob.run()
+            return runnerjob.uuid
 
 
-        if self.crunch2:
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#container"]], self.on_message)
-        else:
-            events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
-
-        self.debug = kwargs.get("debug")
-        self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
-        self.fs_access = CollectionFsAccess(kwargs["basedir"])
-
-        kwargs["fs_access"] = self.fs_access
-        kwargs["enable_reuse"] = kwargs.get("enable_reuse")
-
-        if self.crunch2:
-            kwargs["outdir"] = "/var/spool/cwl"
-            kwargs["tmpdir"] = "/tmp"
-        else:
-            kwargs["outdir"] = "$(task.outdir)"
-            kwargs["tmpdir"] = "$(task.tmpdir)"
+        self.poll_api = arvados.api('v1')
+        self.polling_thread = threading.Thread(target=self.poll_states)
+        self.polling_thread.start()
 
 
-        if kwargs.get("submit"):
+        if runnerjob:
             jobiter = iter((runnerjob,))
         else:
             if "cwl_runner_job" in kwargs:
                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
             jobiter = tool.job(job_order,
                                self.output_callback,
             jobiter = iter((runnerjob,))
         else:
             if "cwl_runner_job" in kwargs:
                 self.uuid = kwargs.get("cwl_runner_job").get('uuid')
             jobiter = tool.job(job_order,
                                self.output_callback,
-                               docker_outdir="$(task.outdir)",
                                **kwargs)
 
         try:
                                **kwargs)
 
         try:
@@ -163,18 +248,18 @@ class ArvCwlRunner(object):
 
             for runnable in jobiter:
                 if runnable:
 
             for runnable in jobiter:
                 if runnable:
-                    runnable.run(**kwargs)
+                    with Perf(logger, "run"):
+                        runnable.run(**kwargs)
                 else:
                 else:
-                    if self.jobs:
+                    if self.processes:
                         self.cond.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
 
                         self.cond.wait(1)
                     else:
                         logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                         break
 
-            while self.jobs:
+            while self.processes:
                 self.cond.wait(1)
 
                 self.cond.wait(1)
 
-            events.close()
         except UnsupportedRequirement:
             raise
         except:
         except UnsupportedRequirement:
             raise
         except:
@@ -185,14 +270,25 @@ class ArvCwlRunner(object):
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and self.crunch2:
+            if runnerjob and runnerjob.uuid and self.work_api == "containers":
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
                 self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.cond.release()
+            self.stop_polling.set()
+            self.polling_thread.join()
+
+        if self.final_status == "UnsupportedRequirement":
+            raise UnsupportedRequirement("Check log for details.")
+
+        if self.final_status != "success":
+            raise WorkflowException("Workflow failed.")
 
         if self.final_output is None:
 
         if self.final_output is None:
-            raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
+            raise WorkflowException("Workflow did not return a result.")
+
+        if kwargs.get("compute_checksum"):
+            adjustFileObjs(self.final_output, partial(compute_checksums, self.fs_access))
 
         return self.final_output
 
 
         return self.final_output
 
@@ -212,7 +308,6 @@ def versionstring():
 def arg_parser():  # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
 
 def arg_parser():  # type: () -> argparse.ArgumentParser
     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
 
-    parser.add_argument("--conformance-test", action="store_true")
     parser.add_argument("--basedir", type=str,
                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
     parser.add_argument("--basedir", type=str,
                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
@@ -239,7 +334,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         default=True, dest="enable_reuse",
                         help="")
 
                         default=True, dest="enable_reuse",
                         help="")
 
-    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
+    parser.add_argument("--project-uuid", type=str, metavar="UUID", help="Project that will own the workflow jobs, if not provided, will go to home project.")
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
                         default=False)
@@ -250,6 +345,8 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
                         default=True, dest="submit")
     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
+    exgroup.add_argument("--create-workflow", action="store_true", help="Create an Arvados workflow.")
+    exgroup.add_argument("--update-workflow", type=str, metavar="UUID", help="Update existing Arvados workflow with uuid.")
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
 
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
@@ -257,17 +354,16 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
                         default=True, dest="wait")
 
-    exgroup = parser.add_mutually_exclusive_group()
-    exgroup.add_argument("--crunch1", action="store_false",
-                        default=False, dest="crunch2",
-                        help="Use Crunch v1 Jobs API")
+    parser.add_argument("--api", type=str,
+                        default=None, dest="work_api",
+                        help="Select work submission API, one of 'jobs' or 'containers'.")
 
 
-    exgroup.add_argument("--crunch2", action="store_true",
-                        default=False, dest="crunch2",
-                        help="Use Crunch v2 Containers API")
+    parser.add_argument("--compute-checksum", action="store_true", default=False,
+                        help="Compute checksum of contents while collecting outputs",
+                        dest="compute_checksum")
 
 
-    parser.add_argument("workflow", type=str, nargs="?", default=None)
-    parser.add_argument("job_order", nargs=argparse.REMAINDER)
+    parser.add_argument("workflow", type=str, nargs="?", default=None, help="The workflow to execute")
+    parser.add_argument("job_order", nargs=argparse.REMAINDER, help="The input object to the workflow.")
 
     return parser
 
 
     return parser
 
@@ -277,21 +373,25 @@ def main(args, stdout, stderr, api_client=None):
 
     job_order_object = None
     arvargs = parser.parse_args(args)
 
     job_order_object = None
     arvargs = parser.parse_args(args)
-    if arvargs.create_template and not arvargs.job_order:
+    if (arvargs.create_template or arvargs.create_workflow or arvargs.update_workflow) and not arvargs.job_order:
         job_order_object = ({}, "")
 
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
         job_order_object = ({}, "")
 
     try:
         if api_client is None:
             api_client=arvados.api('v1', model=OrderedJsonModel())
-        runner = ArvCwlRunner(api_client, crunch2=arvargs.crunch2)
+        runner = ArvCwlRunner(api_client, work_api=arvargs.work_api)
     except Exception as e:
         logger.error(e)
         return 1
 
     except Exception as e:
         logger.error(e)
         return 1
 
+    arvargs.conformance_test = None
+    arvargs.use_container = True
+
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
                              executor=runner.arvExecutor,
                              makeTool=runner.arvMakeTool,
                              versionfunc=versionstring,
     return cwltool.main.main(args=arvargs,
                              stdout=stdout,
                              stderr=stderr,
                              executor=runner.arvExecutor,
                              makeTool=runner.arvMakeTool,
                              versionfunc=versionstring,
-                             job_order_object=job_order_object)
+                             job_order_object=job_order_object,
+                             make_fs_access=partial(CollectionFsAccess, api_client=api_client))