Merge branch 'master' into 8654-arv-jobs-cwl-runner
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Mar 2016 16:37:38 +0000 (12:37 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 21 Mar 2016 16:37:38 +0000 (12:37 -0400)
1  2 
sdk/cwl/arvados_cwl/__init__.py

index 480d18ef81fb2bb4b10f368bb362edf50394e2e6,5ed83abb920a604f6a95d5d0d9162cd69985d31a..c0adb33550f10e2a95a26becbca3c961a4379ac3
@@@ -11,6 -11,7 +11,7 @@@ import cwltool.draft2too
  import cwltool.workflow
  import cwltool.main
  from cwltool.process import shortname
+ from cwltool.errors import WorkflowException
  import threading
  import cwltool.docker
  import fnmatch
@@@ -152,6 -153,12 +153,12 @@@ class ArvadosJob(object)
          if docker_req and kwargs.get("use_container") is not False:
              runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
  
+         resources = self.builder.resources
+         if resources is not None:
+             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
          try:
              response = self.arvrunner.api.jobs().create(body={
                  "owner_uuid": self.arvrunner.project_uuid,
                          g = keepre.match(l)
                          if g:
                              keepdir = g.group(1)
-                         if tmpdir and outdir and keepdir:
-                             break
+                         # It turns out if the job fails and restarts it can
+                         # come up on a different compute node, so we have to
+                         # read the log to the end to be sure instead of taking the
+                         # easy way out.
+                         #
+                         #if tmpdir and outdir and keepdir:
+                         #    break
  
                      self.builder.outdir = outdir
                      self.builder.pathmapper.keepdir = keepdir
                      outputs = self.collect_outputs("keep:" + record["output"])
+             except WorkflowException as e:
+                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                 processStatus = "permanentFail"
              except Exception as e:
-                 logger.exception("Got exception while collecting job outputs:")
+                 logger.exception("Got unknown exception while collecting job outputs:")
                  processStatus = "permanentFail"
  
              self.output_callback(outputs, processStatus)
  
  
  class ArvPathMapper(cwltool.pathmapper.PathMapper):
 -    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
 +    def __init__(self, arvrunner, referenced_files, basedir,
 +                 collection_pattern, file_pattern, **kwargs):
          self._pathmap = arvrunner.get_uploaded()
          uploadfiles = []
  
  
          for src in referenced_files:
              if isinstance(src, basestring) and pdh_path.match(src):
 -                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
 +                self._pathmap[src] = (src, collection_pattern % src[5:])
              if src not in self._pathmap:
                  ab = cwltool.pathmapper.abspath(src, basedir)
 -                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
 +                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                  if kwargs.get("conformance_test"):
                      self._pathmap[src] = (src, ab)
                  elif isinstance(st, arvados.commands.run.UploadFile):
                                               arvrunner.api,
                                               dry_run=kwargs.get("dry_run"),
                                               num_retries=3,
 -                                             fnPattern="$(task.keep)/%s/%s",
 +                                             fnPattern=file_pattern,
                                               project=arvrunner.project_uuid)
  
          for src, ab, st in uploadfiles:
@@@ -286,10 -301,7 +302,10 @@@ class ArvadosCommandTool(cwltool.draft2
          return ArvadosJob(self.arvrunner)
  
      def makePathMapper(self, reffiles, input_basedir, **kwargs):
 -        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
 +        return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
 +                             "$(task.keep)/%s",
 +                             "$(task.keep)/%s/%s",
 +                             **kwargs)
  
  
  class ArvCwlRunner(object):
      def add_uploaded(self, src, pair):
          self.uploaded[src] = pair
  
 +    def upload_docker(self, tool):
 +        pass
 +
 +    def submit(self, tool, job_order, input_basedir, args, **kwargs):
 +        files = set()
 +        def visitFiles(self, path):
 +            files.add(path)
 +
 +        adjustFiles(process.scandeps("", tool.tool,
 +                                     set(("run")),
 +                                     set(("$schemas", "path"))),
 +                    visitFiles)
 +        adjustFiles(job_order, visitFiles)
 +
 +        mapper = ArvPathMapper(self, files, "",
 +                               "$(task.keep)/%s",
 +                               "$(task.keep)/%s/%s",
 +                               **kwargs)
 +
 +        job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
 +
 +        response = self.api.jobs().create(body={
 +            "script": "cwl-runner",
 +            "script_version": "8654-arv-jobs-cwl-runner",
 +            "repository": "arvados",
 +            "script_parameters": job_order,
 +            "runtime_constraints": {
 +                "docker_image": "arvados/jobs"
 +            }
 +        }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
 +        print response["uuid"]
 +        return None
 +
 +
      def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
 +        if args.submit:
 +            self.submit(tool, job_order, input_basedir, args, **kwargs)
 +            return
 +
          events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
  
+         self.debug = args.debug
          try:
              self.api.collections().get(uuid=crunchrunner_pdh).execute()
          except arvados.errors.ApiError as e:
                  if sys.exc_info()[0] is KeyboardInterrupt:
                      logger.error("Interrupted, marking pipeline as failed")
                  else:
-                     logger.exception("Caught unhandled exception, marking pipeline as failed")
+                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
                  self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                       body={"state": "Failed"}).execute(num_retries=self.num_retries)
              finally:
@@@ -482,7 -458,6 +500,7 @@@ def main(args, stdout, stderr, api_clie
                          default=True, dest="enable_reuse",
                          help="")
      parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 +    parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
  
      try:
          runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))