Merge branch 'master' into 8857-cwl-job-reuse
authorPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 1 Apr 2016 14:01:39 +0000 (10:01 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Fri, 1 Apr 2016 14:01:39 +0000 (10:01 -0400)
Conflicts:
sdk/cwl/arvados_cwl/__init__.py

1  2 
sdk/cwl/arvados_cwl/__init__.py

index 339b91ce594bc5747120c55106394b07c909af32,511f173eb0008995a60e55652a9c3af6be40bb06..ab8d725bd775d34d3588d17ba8004327231cea80
@@@ -1,5 -1,7 +1,7 @@@
  #!/usr/bin/env python
  
+ # Implement cwl-runner interface for submitting and running jobs on Arvados.
  import argparse
  import arvados
  import arvados.events
@@@ -19,8 -21,11 +21,11 @@@ import loggin
  import re
  import os
  import sys
+ import functools
+ import json
+ import pkg_resources  # part of setuptools
  
- from cwltool.process import get_feature
+ from cwltool.process import get_feature, adjustFiles, scandeps
  from arvados.api import OrderedJsonModel
  
  logger = logging.getLogger('arvados.cwl-runner')
@@@ -32,6 -37,8 +37,8 @@@ keepre = re.compile(r"^\S+ \S+ \d+ \d+ 
  
  
  def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
+     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
      if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
          dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
  
          if image_tag:
              args.append(image_tag)
          logger.info("Uploading Docker image %s", ":".join(args[1:]))
-         arvados.commands.keepdocker.main(args)
+         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
  
      return dockerRequirement["dockerImageId"]
  
  
  class CollectionFsAccess(cwltool.process.StdFsAccess):
+     """Implement the cwltool FsAccess interface for Arvados Collections."""
      def __init__(self, basedir):
          self.collections = {}
          self.basedir = basedir
              return os.path.exists(self._abs(fn))
  
  class ArvadosJob(object):
+     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
      def __init__(self, runner):
          self.arvrunner = runner
          self.running = False
              runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
              runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
  
 +        filters = [["repository", "=", "arvados"],
 +                   ["script", "=", "crunchrunner"],
 +                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
 +        if not self.arvrunner.ignore_docker_for_reuse:
 +            filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
 +
          try:
 -            response = self.arvrunner.api.jobs().create(body={
 -                "owner_uuid": self.arvrunner.project_uuid,
 -                "script": "crunchrunner",
 -                "repository": "arvados",
 -                "script_version": "master",
 -                "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
 -                "script_parameters": {"tasks": [script_parameters]},
 -                "runtime_constraints": runtime_constraints
 -            }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
 +            response = self.arvrunner.api.jobs().create(
 +                body={
 +                    "owner_uuid": self.arvrunner.project_uuid,
 +                    "script": "crunchrunner",
 +                    "repository": "arvados",
 +                    "script_version": "master",
 +                    "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
 +                    "script_parameters": {"tasks": [script_parameters]},
 +                    "runtime_constraints": runtime_constraints
 +                },
 +                filters=filters,
 +                find_or_create=kwargs.get("enable_reuse", True)
 +            ).execute(num_retries=self.arvrunner.num_retries)
  
              self.arvrunner.jobs[response["uuid"]] = self
  
              del self.arvrunner.jobs[record["uuid"]]
  
  
+ class RunnerJob(object):
+     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
+     def __init__(self, runner, tool, job_order, enable_reuse):
+         self.arvrunner = runner
+         self.tool = tool
+         self.job_order = job_order
+         self.running = False
+         self.enable_reuse = enable_reuse
+     def update_pipeline_component(self, record):
+         pass
+     def upload_docker(self, tool):
+         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
+             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
+             if docker_req:
+                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
+         elif isinstance(tool, cwltool.workflow.Workflow):
+             for s in tool.steps:
+                 self.upload_docker(s.embedded_tool)
+     def run(self, dry_run=False, pull_image=True, **kwargs):
+         self.upload_docker(self.tool)
+         workflowfiles = set()
+         jobfiles = set()
+         workflowfiles.add(self.tool.tool["id"])
+         self.name = os.path.basename(self.tool.tool["id"])
+         def visitFiles(files, path):
+             files.add(path)
+             return path
+         document_loader, _, _ = cwltool.process.get_schema()
+         def loadref(b, u):
+             return document_loader.resolve_ref(u, base_url=b)[0]
+         sc = scandeps("", self.tool.tool,
+                       set(("$import", "run")),
+                       set(("$include", "$schemas", "path")),
+                       loadref)
+         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
+         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
+         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
+                                        "%s",
+                                        "%s/%s",
+                                        name=self.name,
+                                        **kwargs)
+         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
+                                   "%s",
+                                   "%s/%s",
+                                   name=os.path.basename(self.job_order.get("id", "#")),
+                                   **kwargs)
+         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+         if "id" in self.job_order:
+             del self.job_order["id"]
+         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
+         response = self.arvrunner.api.jobs().create(body={
+             "script": "cwl-runner",
+             "script_version": "master",
+             "repository": "arvados",
+             "script_parameters": self.job_order,
+             "runtime_constraints": {
+                 "docker_image": "arvados/jobs"
+             }
+         }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
+         self.arvrunner.jobs[response["uuid"]] = self
+         logger.info("Submitted job %s", response["uuid"])
+         if response["state"] in ("Complete", "Failed", "Cancelled"):
+             self.done(response)
+     def done(self, record):
+         if record["state"] == "Complete":
+             processStatus = "success"
+         else:
+             processStatus = "permanentFail"
+         outputs = None
+         try:
+             try:
+                 outc = arvados.collection.Collection(record["output"])
+                 with outc.open("cwl.output.json") as f:
+                     outputs = json.load(f)
+             except Exception as e:
+                 logger.error("While getting final output object: %s", e)
+             self.arvrunner.output_callback(outputs, processStatus)
+         finally:
+             del self.arvrunner.jobs[record["uuid"]]
  class ArvPathMapper(cwltool.pathmapper.PathMapper):
-     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+     """Convert container-local paths to and from Keep collection ids."""
+     def __init__(self, arvrunner, referenced_files, basedir,
+                  collection_pattern, file_pattern, name=None, **kwargs):
          self._pathmap = arvrunner.get_uploaded()
-         uploadfiles = []
+         uploadfiles = set()
  
          pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
  
          for src in referenced_files:
              if isinstance(src, basestring) and pdh_path.match(src):
-                 self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+                 self._pathmap[src] = (src, collection_pattern % src[5:])
+             if "#" in src:
+                 src = src[:src.index("#")]
              if src not in self._pathmap:
                  ab = cwltool.pathmapper.abspath(src, basedir)
-                 st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                  if kwargs.get("conformance_test"):
                      self._pathmap[src] = (src, ab)
                  elif isinstance(st, arvados.commands.run.UploadFile):
-                     uploadfiles.append((src, ab, st))
+                     uploadfiles.add((src, ab, st))
                  elif isinstance(st, arvados.commands.run.ArvFile):
                      self._pathmap[src] = (ab, st.fn)
                  else:
                                               arvrunner.api,
                                               dry_run=kwargs.get("dry_run"),
                                               num_retries=3,
-                                              fnPattern="$(task.keep)/%s/%s",
+                                              fnPattern=file_pattern,
+                                              name=name,
                                               project=arvrunner.project_uuid)
  
          for src, ab, st in uploadfiles:
  
  
  class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
+     """Wrap cwltool CommandLineTool to override selected methods."""
      def __init__(self, arvrunner, toolpath_object, **kwargs):
          super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
          self.arvrunner = arvrunner
          return ArvadosJob(self.arvrunner)
  
      def makePathMapper(self, reffiles, input_basedir, **kwargs):
-         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+                              "$(task.keep)/%s",
+                              "$(task.keep)/%s/%s",
+                              **kwargs)
  
  
  class ArvCwlRunner(object):
+     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
+     complete, and report output."""
      def __init__(self, api_client):
          self.api = api_client
          self.jobs = {}
      def output_callback(self, out, processStatus):
          if processStatus == "success":
              logger.info("Overall job status is %s", processStatus)
-             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                  body={"state": "Complete"}).execute(num_retries=self.num_retries)
+             if self.pipeline:
+                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
  
          else:
              logger.warn("Overall job status is %s", processStatus)
-             self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                  body={"state": "Failed"}).execute(num_retries=self.num_retries)
+             if self.pipeline:
+                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
          self.final_output = out
  
  
          self.uploaded[src] = pair
  
      def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+         self.debug = args.debug
+         if args.quiet:
+             logger.setLevel(logging.WARN)
+             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
+         useruuid = self.api.users().current().execute()["uuid"]
+         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+         self.pipeline = None
+         if args.submit:
+             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
+             if not args.wait:
+                 runnerjob.run()
+                 return
          events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
  
 +        self.debug = args.debug
 +        self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
          self.fs_access = CollectionFsAccess(input_basedir)
  
          kwargs["fs_access"] = self.fs_access
          kwargs["outdir"] = "$(task.outdir)"
          kwargs["tmpdir"] = "$(task.tmpdir)"
  
-         useruuid = self.api.users().current().execute()["uuid"]
-         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
          if kwargs.get("conformance_test"):
              return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
          else:
-             self.pipeline = self.api.pipeline_instances().create(
-                 body={
-                     "owner_uuid": self.project_uuid,
-                     "name": shortname(tool.tool["id"]),
-                     "components": {},
-                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+             if args.submit:
+                 jobiter = iter((runnerjob,))
+             else:
+                 components = {}
+                 if "cwl_runner_job" in kwargs:
+                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
+                 self.pipeline = self.api.pipeline_instances().create(
+                     body={
+                         "owner_uuid": self.project_uuid,
+                         "name": shortname(tool.tool["id"]),
+                         "components": components,
+                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
  
-             logger.info("Pipeline instance %s", self.pipeline["uuid"])
+                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
  
-             jobiter = tool.job(job_order,
-                                input_basedir,
-                                self.output_callback,
-                                docker_outdir="$(task.outdir)",
-                                **kwargs)
+                 jobiter = tool.job(job_order,
+                                    input_basedir,
+                                    self.output_callback,
+                                    docker_outdir="$(task.outdir)",
+                                    **kwargs)
  
              try:
                  self.cond.acquire()
                      logger.error("Interrupted, marking pipeline as failed")
                  else:
                      logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
-                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
-                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+                 if self.pipeline:
+                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
              finally:
                  self.cond.release()
  
              return self.final_output
  
+ def versionstring():
+     """Print version string of key packages for provenance and debugging."""
+     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
+     arvpkg = pkg_resources.require("arvados-python-client")
+     cwlpkg = pkg_resources.require("cwltool")
+     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
+                                     "arvados-python-client", arvpkg[0].version,
+                                     "cwltool", cwlpkg[0].version)
  
  def main(args, stdout, stderr, api_client=None):
      args.insert(0, "--leave-outputs")
      parser = cwltool.main.arg_parser()
      exgroup = parser.add_mutually_exclusive_group()
      exgroup.add_argument("--enable-reuse", action="store_true",
                          default=True, dest="enable_reuse",
      exgroup.add_argument("--disable-reuse", action="store_false",
                          default=True, dest="enable_reuse",
                          help="")
      parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 +    parser.add_argument("--ignore-docker-for-reuse", action="store_true",
 +                        help="Ignore Docker image version when deciding whether to reuse past jobs.",
 +                        default=False)
  
+     exgroup = parser.add_mutually_exclusive_group()
+     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
+                         default=True, dest="submit")
+     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
+                         default=True, dest="submit")
+     exgroup = parser.add_mutually_exclusive_group()
+     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
+                         default=True, dest="wait")
+     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
+                         default=True, dest="wait")
      try:
-         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
+         if api_client is None:
+             api_client=arvados.api('v1', model=OrderedJsonModel())
+         runner = ArvCwlRunner(api_client)
      except Exception as e:
          logger.error(e)
          return 1
  
-     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
+     return cwltool.main.main(args,
+                              stdout=stdout,
+                              stderr=stderr,
+                              executor=runner.arvExecutor,
+                              makeTool=runner.arvMakeTool,
+                              parser=parser,
+                              versionfunc=versionstring)