Merge branch '8815-crunchrunner-everywhere' into 8654-arv-jobs-cwl-runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index e3fd1fccd372d75abf06445f32fe2a13f8a8e66c..5c7ec4de4efaafe87470a386b2c20db4e9fbc552 100644 (file)
@@ -19,8 +19,11 @@ import logging
 import re
 import os
 import sys
+import functools
+import json
+import pkg_resources  # part of setuptools
 
-from cwltool.process import get_feature
+from cwltool.process import get_feature, adjustFiles, scandeps
 from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
@@ -279,23 +282,124 @@ class ArvadosJob(object):
             del self.arvrunner.jobs[record["uuid"]]
 
 
+class RunnerJob(object):
+    def __init__(self, runner, tool, job_order, enable_reuse):
+        self.arvrunner = runner
+        self.tool = tool
+        self.job_order = job_order
+        self.running = False
+        self.enable_reuse = enable_reuse
+
+    def update_pipeline_component(self, record):
+        pass
+
+    def upload_docker(self, tool):
+        if isinstance(tool, cwltool.draft2tool.CommandLineTool):
+            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+            if docker_req:
+                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+        elif isinstance(tool, cwltool.workflow.Workflow):
+            for s in tool.steps:
+                self.upload_docker(s.embedded_tool)
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        self.upload_docker(self.tool)
+
+        workflowfiles = set()
+        jobfiles = set()
+        workflowfiles.add(self.tool.tool["id"])
+
+        self.name = os.path.basename(self.tool.tool["id"])
+
+        def visitFiles(files, path):
+            files.add(path)
+            return path
+
+        document_loader, _, _ = cwltool.process.get_schema()
+        def loadref(b, u):
+            return document_loader.resolve_ref(u, base_url=b)[0]
+
+        sc = scandeps("", self.tool.tool,
+                      set(("$import", "run")),
+                      set(("$include", "$schemas", "path")),
+                      loadref)
+        adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
+        adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
+
+        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+                                       "%s",
+                                       "%s/%s",
+                                       name=self.name,
+                                       **kwargs)
+
+        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+                                  "%s",
+                                  "%s/%s",
+                                  name=os.path.basename(self.job_order.get("id", "#")),
+                                  **kwargs)
+
+        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+
+        if "id" in self.job_order:
+            del self.job_order["id"]
+
+        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+
+        response = self.arvrunner.api.jobs().create(body={
+            "script": "cwl-runner",
+            "script_version": "8654-arv-jobs-cwl-runner",
+            "repository": "arvados",
+            "script_parameters": self.job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+
+        self.arvrunner.jobs[response["uuid"]] = self
+
+        logger.info("Submitted job %s", response["uuid"])
+
+        if response["state"] in ("Complete", "Failed", "Cancelled"):
+            self.done(response)
+
+    def done(self, record):
+        if record["state"] == "Complete":
+            processStatus = "success"
+        else:
+            processStatus = "permanentFail"
+
+        outputs = None
+        try:
+            try:
+                outc = arvados.collection.Collection(record["output"])
+                with outc.open("cwl.output.json") as f:
+                    outputs = json.load(f)
+            except Exception as e:
+                logger.error("While getting final output object: %s", e)
+            self.arvrunner.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]
+
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+    def __init__(self, arvrunner, referenced_files, basedir,
+                 collection_pattern, file_pattern, name=None, **kwargs):
         self._pathmap = arvrunner.get_uploaded()
-        uploadfiles = []
+        uploadfiles = set()
 
         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
 
         for src in referenced_files:
             if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+                self._pathmap[src] = (src, collection_pattern % src[5:])
+            if "#" in src:
+                src = src[:src.index("#")]
             if src not in self._pathmap:
                 ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                 if kwargs.get("conformance_test"):
                     self._pathmap[src] = (src, ab)
                 elif isinstance(st, arvados.commands.run.UploadFile):
-                    uploadfiles.append((src, ab, st))
+                    uploadfiles.add((src, ab, st))
                 elif isinstance(st, arvados.commands.run.ArvFile):
                     self._pathmap[src] = (ab, st.fn)
                 else:
@@ -306,7 +410,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              arvrunner.api,
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s",
+                                             fnPattern=file_pattern,
+                                             name=name,
                                              project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
@@ -333,7 +438,10 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
         return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, input_basedir, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+        return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+                             "$(task.keep)/%s",
+                             "$(task.keep)/%s/%s",
+                             **kwargs)
 
 
 class ArvCwlRunner(object):
@@ -355,13 +463,15 @@ class ArvCwlRunner(object):
     def output_callback(self, out, processStatus):
         if processStatus == "success":
             logger.info("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Complete"}).execute(num_retries=self.num_retries)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
 
         else:
             logger.warn("Overall job status is %s", processStatus)
-            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                 body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            if self.pipeline:
+                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
         self.final_output = out
 
 
@@ -393,9 +503,24 @@ class ArvCwlRunner(object):
         self.uploaded[src] = pair
 
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+        self.debug = args.debug
+
+        if args.quiet:
+            logger.setLevel(logging.WARN)
+            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+        self.pipeline = None
+
+        if args.submit:
+            runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
+            if not args.wait:
+                runnerjob.run()
+                return
+
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
-        self.debug = args.debug
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
@@ -404,26 +529,30 @@ class ArvCwlRunner(object):
         kwargs["outdir"] = "$(task.outdir)"
         kwargs["tmpdir"] = "$(task.tmpdir)"
 
-        useruuid = self.api.users().current().execute()["uuid"]
-        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
-
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            self.pipeline = self.api.pipeline_instances().create(
-                body={
-                    "owner_uuid": self.project_uuid,
-                    "name": shortname(tool.tool["id"]),
-                    "components": {},
-                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+            if args.submit:
+                jobiter = iter((runnerjob,))
+            else:
+                components = {}
+                if "cwl_runner_job" in kwargs:
+                    components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
 
-            logger.info("Pipeline instance %s", self.pipeline["uuid"])
+                self.pipeline = self.api.pipeline_instances().create(
+                    body={
+                        "owner_uuid": self.project_uuid,
+                        "name": shortname(tool.tool["id"]),
+                        "components": components,
+                        "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
-            jobiter = tool.job(job_order,
-                               input_basedir,
-                               self.output_callback,
-                               docker_outdir="$(task.outdir)",
-                               **kwargs)
+                logger.info("Pipeline instance %s", self.pipeline["uuid"])
+
+                jobiter = tool.job(job_order,
+                                   input_basedir,
+                                   self.output_callback,
+                                   docker_outdir="$(task.outdir)",
+                                   **kwargs)
 
             try:
                 self.cond.acquire()
@@ -455,17 +584,28 @@ class ArvCwlRunner(object):
                     logger.error("Interrupted, marking pipeline as failed")
                 else:
                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
-                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
+                if self.pipeline:
+                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
             finally:
                 self.cond.release()
 
             return self.final_output
 
+def versionstring():
+    cwlpkg = pkg_resources.require("cwltool")
+    arvpkg = pkg_resources.require("arvados-python-client")
+    arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+
+    return "%s %s, %s %s, %s %s" % (sys.argv[0],
+                                        "arvados-cwl-runner", arvcwlpkg[0].version,
+                                        "arvados-python-client", cwlpkg[0].version,
+                                        "cwltool", arvpkg[0].version)
 
 def main(args, stdout, stderr, api_client=None):
     args.insert(0, "--leave-outputs")
     parser = cwltool.main.arg_parser()
+
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
                         default=True, dest="enable_reuse",
@@ -473,12 +613,33 @@ def main(args, stdout, stderr, api_client=None):
     exgroup.add_argument("--disable-reuse", action="store_false",
                         default=True, dest="enable_reuse",
                         help="")
+
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--submit", action="store_true", help="Submit runner job so workflow can run unattended.",
+                        default=True, dest="submit")
+    exgroup.add_argument("--local", action="store_false", help="Workflow runner runs on local host and submits jobs.",
+                        default=True, dest="submit")
+
+    exgroup = parser.add_mutually_exclusive_group()
+    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+                        default=True, dest="wait")
+    exgroup.add_argument("--no-wait", action="store_false", help="Exit after submitting workflow runner job.",
+                        default=True, dest="wait")
+
     try:
-        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+        if api_client is None:
+            api_client=arvados.api('v1', model=OrderedJsonModel())
+        runner = ArvCwlRunner(api_client)
     except Exception as e:
         logger.error(e)
         return 1
 
-    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+    return cwltool.main.main(args,
+                             stdout=stdout,
+                             stderr=stderr,
+                             executor=runner.arvExecutor,
+                             makeTool=runner.arvMakeTool,
+                             parser=parser,
+                             versionfunc=versionstring)