Merge branch '8815-crunchrunner-everywhere' into 8654-arv-jobs-cwl-runner
authorPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 29 Mar 2016 17:34:40 +0000 (13:34 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Tue, 29 Mar 2016 17:34:40 +0000 (13:34 -0400)
Conflicts:
sdk/cwl/arvados_cwl/__init__.py

1  2 
sdk/cwl/arvados_cwl/__init__.py

index 42e0da2e41fd57725b641585f2487971ce646275,e3fd1fccd372d75abf06445f32fe2a13f8a8e66c..5c7ec4de4efaafe87470a386b2c20db4e9fbc552
@@@ -19,20 -19,13 +19,16 @@@ import loggin
  import re
  import os
  import sys
 +import functools
 +import json
 +import pkg_resources  # part of setuptools
  
 -from cwltool.process import get_feature
 +from cwltool.process import get_feature, adjustFiles, scandeps
  from arvados.api import OrderedJsonModel
  
  logger = logging.getLogger('arvados.cwl-runner')
  logger.setLevel(logging.INFO)
  
- crunchrunner_pdh = "ff6fc71e593081ef9733afacaeee15ea+140"
- crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
- certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
  tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
  outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
  keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
@@@ -155,6 -148,8 +151,8 @@@ class ArvadosJob(object)
          (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
          if docker_req and kwargs.get("use_container") is not False:
              runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+         else:
+             runtime_constraints["docker_image"] = "arvados/jobs"
  
          resources = self.builder.resources
          if resources is not None:
                  "repository": "arvados",
                  "script_version": "master",
                  "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
-                 "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
+                 "script_parameters": {"tasks": [script_parameters]},
                  "runtime_constraints": runtime_constraints
              }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
  
              del self.arvrunner.jobs[record["uuid"]]
  
  
 +class RunnerJob(object):
 +    def __init__(self, runner, tool, job_order, enable_reuse):
 +        self.arvrunner = runner
 +        self.tool = tool
 +        self.job_order = job_order
 +        self.running = False
 +        self.enable_reuse = enable_reuse
 +
 +    def update_pipeline_component(self, record):
 +        pass
 +
 +    def upload_docker(self, tool):
 +        if isinstance(tool, cwltool.draft2tool.CommandLineTool):
 +            (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
 +            if docker_req:
 +                arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
 +        elif isinstance(tool, cwltool.workflow.Workflow):
 +            for s in tool.steps:
 +                self.upload_docker(s.embedded_tool)
 +
 +    def run(self, dry_run=False, pull_image=True, **kwargs):
 +        self.upload_docker(self.tool)
 +
 +        workflowfiles = set()
 +        jobfiles = set()
 +        workflowfiles.add(self.tool.tool["id"])
 +
 +        self.name = os.path.basename(self.tool.tool["id"])
 +
 +        def visitFiles(files, path):
 +            files.add(path)
 +            return path
 +
 +        document_loader, _, _ = cwltool.process.get_schema()
 +        def loadref(b, u):
 +            return document_loader.resolve_ref(u, base_url=b)[0]
 +
 +        sc = scandeps("", self.tool.tool,
 +                      set(("$import", "run")),
 +                      set(("$include", "$schemas", "path")),
 +                      loadref)
 +        adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
 +        adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
 +
 +        workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
 +                                       "%s",
 +                                       "%s/%s",
 +                                       name=self.name,
 +                                       **kwargs)
 +
 +        jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
 +                                  "%s",
 +                                  "%s/%s",
 +                                  name=os.path.basename(self.job_order.get("id", "#")),
 +                                  **kwargs)
 +
 +        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
 +
 +        if "id" in self.job_order:
 +            del self.job_order["id"]
 +
 +        self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
 +
 +        response = self.arvrunner.api.jobs().create(body={
 +            "script": "cwl-runner",
 +            "script_version": "8654-arv-jobs-cwl-runner",
 +            "repository": "arvados",
 +            "script_parameters": self.job_order,
 +            "runtime_constraints": {
 +                "docker_image": "arvados/jobs"
 +            }
 +        }, find_or_create=self.enable_reuse).execute(num_retries=self.arvrunner.num_retries)
 +
 +        self.arvrunner.jobs[response["uuid"]] = self
 +
 +        logger.info("Submitted job %s", response["uuid"])
 +
 +        if response["state"] in ("Complete", "Failed", "Cancelled"):
 +            self.done(response)
 +
 +    def done(self, record):
 +        if record["state"] == "Complete":
 +            processStatus = "success"
 +        else:
 +            processStatus = "permanentFail"
 +
 +        outputs = None
 +        try:
 +            try:
 +                outc = arvados.collection.Collection(record["output"])
 +                with outc.open("cwl.output.json") as f:
 +                    outputs = json.load(f)
 +            except Exception as e:
 +                logger.error("While getting final output object: %s", e)
 +            self.arvrunner.output_callback(outputs, processStatus)
 +        finally:
 +            del self.arvrunner.jobs[record["uuid"]]
 +
  class ArvPathMapper(cwltool.pathmapper.PathMapper):
 -    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
 +    def __init__(self, arvrunner, referenced_files, basedir,
 +                 collection_pattern, file_pattern, name=None, **kwargs):
          self._pathmap = arvrunner.get_uploaded()
 -        uploadfiles = []
 +        uploadfiles = set()
  
          pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
  
          for src in referenced_files:
              if isinstance(src, basestring) and pdh_path.match(src):
 -                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
 +                self._pathmap[src] = (src, collection_pattern % src[5:])
 +            if "#" in src:
 +                src = src[:src.index("#")]
              if src not in self._pathmap:
                  ab = cwltool.pathmapper.abspath(src, basedir)
 -                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
 +                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                  if kwargs.get("conformance_test"):
                      self._pathmap[src] = (src, ab)
                  elif isinstance(st, arvados.commands.run.UploadFile):
 -                    uploadfiles.append((src, ab, st))
 +                    uploadfiles.add((src, ab, st))
                  elif isinstance(st, arvados.commands.run.ArvFile):
                      self._pathmap[src] = (ab, st.fn)
                  else:
                                               arvrunner.api,
                                               dry_run=kwargs.get("dry_run"),
                                               num_retries=3,
 -                                             fnPattern="$(task.keep)/%s/%s",
 +                                             fnPattern=file_pattern,
 +                                             name=name,
                                               project=arvrunner.project_uuid)
  
          for src, ab, st in uploadfiles:
@@@ -440,10 -333,7 +438,10 @@@ class ArvadosCommandTool(cwltool.draft2
          return ArvadosJob(self.arvrunner)
  
      def makePathMapper(self, reffiles, input_basedir, **kwargs):
 -        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
 +        return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
 +                             "$(task.keep)/%s",
 +                             "$(task.keep)/%s/%s",
 +                             **kwargs)
  
  
  class ArvCwlRunner(object):
      def output_callback(self, out, processStatus):
          if processStatus == "success":
              logger.info("Overall job status is %s", processStatus)
 -            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
 -                                                 body={"state": "Complete"}).execute(num_retries=self.num_retries)
 +            if self.pipeline:
 +                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
 +                                                     body={"state": "Complete"}).execute(num_retries=self.num_retries)
  
          else:
              logger.warn("Overall job status is %s", processStatus)
 -            self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
 -                                                 body={"state": "Failed"}).execute(num_retries=self.num_retries)
 +            if self.pipeline:
 +                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
 +                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
          self.final_output = out
  
  
          self.uploaded[src] = pair
  
      def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
-         try:
-             self.api.collections().get(uuid=crunchrunner_pdh).execute()
-         except arvados.errors.ApiError as e:
-             import httplib2
-             h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
-             resp, content = h.request(crunchrunner_download, "GET")
-             resp2, content2 = h.request(certs_download, "GET")
-             with arvados.collection.Collection() as col:
-                 with col.open("crunchrunner", "w") as f:
-                     f.write(content)
-                 with col.open("ca-certificates.crt", "w") as f:
-                     f.write(content2)
-                 col.save_new("crunchrunner binary", ensure_unique_name=True)
 +        self.debug = args.debug
 +
 +        if args.quiet:
 +            logger.setLevel(logging.WARN)
 +            logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 +
 +        useruuid = self.api.users().current().execute()["uuid"]
 +        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
 +        self.pipeline = None
 +
 +        if args.submit:
 +            runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
 +            if not args.wait:
 +                runnerjob.run()
 +                return
 +
          events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
  
 -        self.debug = args.debug
          self.fs_access = CollectionFsAccess(input_basedir)
  
          kwargs["fs_access"] = self.fs_access
          kwargs["outdir"] = "$(task.outdir)"
          kwargs["tmpdir"] = "$(task.tmpdir)"
  
 -        useruuid = self.api.users().current().execute()["uuid"]
 -        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
 -
          if kwargs.get("conformance_test"):
              return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
          else:
 -            self.pipeline = self.api.pipeline_instances().create(
 -                body={
 -                    "owner_uuid": self.project_uuid,
 -                    "name": shortname(tool.tool["id"]),
 -                    "components": {},
 -                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 +            if args.submit:
 +                jobiter = iter((runnerjob,))
 +            else:
 +                components = {}
 +                if "cwl_runner_job" in kwargs:
 +                    components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
  
 -            logger.info("Pipeline instance %s", self.pipeline["uuid"])
 +                self.pipeline = self.api.pipeline_instances().create(
 +                    body={
 +                        "owner_uuid": self.project_uuid,
 +                        "name": shortname(tool.tool["id"]),
 +                        "components": components,
 +                        "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
  
 -            jobiter = tool.job(job_order,
 -                               input_basedir,
 -                               self.output_callback,
 -                               docker_outdir="$(task.outdir)",
 -                               **kwargs)
 +                logger.info("Pipeline instance %s", self.pipeline["uuid"])
 +
 +                jobiter = tool.job(job_order,
 +                                   input_basedir,
 +                                   self.output_callback,
 +                                   docker_outdir="$(task.outdir)",
 +                                   **kwargs)
  
              try:
                  self.cond.acquire()
                      logger.error("Interrupted, marking pipeline as failed")
                  else:
                      logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
 -                self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
 -                                                     body={"state": "Failed"}).execute(num_retries=self.num_retries)
 +                if self.pipeline:
 +                    self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
 +                                                         body={"state": "Failed"}).execute(num_retries=self.num_retries)
              finally:
                  self.cond.release()
  
              return self.final_output
  
 +def versionstring():
 +    cwlpkg = pkg_resources.require("cwltool")
 +    arvpkg = pkg_resources.require("arvados-python-client")
 +    arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
 +
 +    return "%s %s, %s %s, %s %s" % (sys.argv[0],
 +                                        "arvados-cwl-runner", arvcwlpkg[0].version,
 +                                        "arvados-python-client", cwlpkg[0].version,
 +                                        "cwltool", arvpkg[0].version)
  
  def main(args, stdout, stderr, api_client=None):
      args.insert(0, "--leave-outputs")
      parser = cwltool.main.arg_parser()
 +
      exgroup = parser.add_mutually_exclusive_group()
      exgroup.add_argument("--enable-reuse", action="store_true",
                          default=True, dest="enable_reuse",
      exgroup.add_argument("--disable-reuse", action="store_false",
                          default=True, dest="enable_reuse",
                          help="")
 +
      parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
  
 +    exgroup = parser.add_mutually_exclusive_group()
 +    exgroup.add_argument("--submit", action="store_true", help="Submit runner job so workflow can run unattended.",
 +                        default=True, dest="submit")
 +    exgroup.add_argument("--local", action="store_false", help="Workflow runner runs on local host and submits jobs.",
 +                        default=True, dest="submit")
 +
 +    exgroup = parser.add_mutually_exclusive_group()
 +    exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
 +                        default=True, dest="wait")
 +    exgroup.add_argument("--no-wait", action="store_false", help="Exit after submitting workflow runner job.",
 +                        default=True, dest="wait")
 +
      try:
 -        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
 +        if api_client is None:
 +            api_client=arvados.api('v1', model=OrderedJsonModel())
 +        runner = ArvCwlRunner(api_client)
      except Exception as e:
          logger.error(e)
          return 1
  
 -    return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)
 +    return cwltool.main.main(args,
 +                             stdout=stdout,
 +                             stderr=stderr,
 +                             executor=runner.arvExecutor,
 +                             makeTool=runner.arvMakeTool,
 +                             parser=parser,
 +                             versionfunc=versionstring)