Merge branch 'master' into 8654-arv-jobs-cwl-runner
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index cc826d4d4bb2b04794259a4d8b754e1a76619add..c0adb33550f10e2a95a26becbca3c961a4379ac3 100644 (file)
@@ -11,6 +11,7 @@ import cwltool.draft2tool
 import cwltool.workflow
 import cwltool.main
 from cwltool.process import shortname
+from cwltool.errors import WorkflowException
 import threading
 import cwltool.docker
 import fnmatch
@@ -48,10 +49,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
 
     if not images:
         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = ["--project-uuid", project_uuid, image_name]
+        args = ["--project-uuid="+project_uuid, image_name]
         if image_tag:
             args.append(image_tag)
-        logger.info("Uploading Docker image %s", ":".join(args))
+        logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args)
 
     return dockerRequirement["dockerImageId"]
@@ -150,7 +151,13 @@ class ArvadosJob(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvruner.project_uuid)
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
 
         try:
             response = self.arvrunner.api.jobs().create(body={
@@ -216,14 +223,23 @@ class ArvadosJob(object):
                         g = keepre.match(l)
                         if g:
                             keepdir = g.group(1)
-                        if tmpdir and outdir and keepdir:
-                            break
+
+                        # It turns out if the job fails and restarts it can
+                        # come up on a different compute node, so we have to
+                        # read the log to the end to be sure instead of taking the
+                        # easy way out.
+                        #
+                        #if tmpdir and outdir and keepdir:
+                        #    break
 
                     self.builder.outdir = outdir
                     self.builder.pathmapper.keepdir = keepdir
                     outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
             except Exception as e:
-                logger.exception("Got exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting job outputs:")
                 processStatus = "permanentFail"
 
             self.output_callback(outputs, processStatus)
@@ -232,7 +248,8 @@ class ArvadosJob(object):
 
 
 class ArvPathMapper(cwltool.pathmapper.PathMapper):
-    def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
+    def __init__(self, arvrunner, referenced_files, basedir,
+                 collection_pattern, file_pattern, **kwargs):
         self._pathmap = arvrunner.get_uploaded()
         uploadfiles = []
 
@@ -240,10 +257,10 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
         for src in referenced_files:
             if isinstance(src, basestring) and pdh_path.match(src):
-                self._pathmap[src] = (src, "$(task.keep)/%s" % src[5:])
+                self._pathmap[src] = (src, collection_pattern % src[5:])
             if src not in self._pathmap:
                 ab = cwltool.pathmapper.abspath(src, basedir)
-                st = arvados.commands.run.statfile("", ab, fnPattern="$(task.keep)/%s/%s")
+                st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
                 if kwargs.get("conformance_test"):
                     self._pathmap[src] = (src, ab)
                 elif isinstance(st, arvados.commands.run.UploadFile):
@@ -258,7 +275,7 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              arvrunner.api,
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s",
+                                             fnPattern=file_pattern,
                                              project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
@@ -285,7 +302,10 @@ class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
         return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, input_basedir, **kwargs):
-        return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
+        return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
+                             "$(task.keep)/%s",
+                             "$(task.keep)/%s/%s",
+                             **kwargs)
 
 
 class ArvCwlRunner(object):
@@ -319,24 +339,24 @@ class ArvCwlRunner(object):
 
     def on_message(self, event):
         if "object_uuid" in event:
-                if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                    if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
-                        uuid = event["object_uuid"]
-                        with self.lock:
-                            j = self.jobs[uuid]
-                            logger.info("Job %s (%s) is Running", j.name, uuid)
-                            j.running = True
-                            j.update_pipeline_component(event["properties"]["new_attributes"])
-                    elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                        uuid = event["object_uuid"]
-                        try:
-                            self.cond.acquire()
-                            j = self.jobs[uuid]
-                            logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                            j.done(event["properties"]["new_attributes"])
-                            self.cond.notify()
-                        finally:
-                            self.cond.release()
+            if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+                if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+                    uuid = event["object_uuid"]
+                    with self.lock:
+                        j = self.jobs[uuid]
+                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        j.running = True
+                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                    uuid = event["object_uuid"]
+                    try:
+                        self.cond.acquire()
+                        j = self.jobs[uuid]
+                        logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        j.done(event["properties"]["new_attributes"])
+                        self.cond.notify()
+                    finally:
+                        self.cond.release()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -344,9 +364,49 @@ class ArvCwlRunner(object):
     def add_uploaded(self, src, pair):
         self.uploaded[src] = pair
 
+    def upload_docker(self, tool):
+        pass
+
+    def submit(self, tool, job_order, input_basedir, args, **kwargs):
+        files = set()
+        def visitFiles(self, path):
+            files.add(path)
+
+        adjustFiles(process.scandeps("", tool.tool,
+                                     set(("run")),
+                                     set(("$schemas", "path"))),
+                    visitFiles)
+        adjustFiles(job_order, visitFiles)
+
+        mapper = ArvPathMapper(self, files, "",
+                               "$(task.keep)/%s",
+                               "$(task.keep)/%s/%s",
+                               **kwargs)
+
+        job_order = adjustFiles(job_order, lambda p: mapper.mapper(p))
+
+        response = self.api.jobs().create(body={
+            "script": "cwl-runner",
+            "script_version": "8654-arv-jobs-cwl-runner",
+            "repository": "arvados",
+            "script_parameters": job_order,
+            "runtime_constraints": {
+                "docker_image": "arvados/jobs"
+            }
+        }, find_or_create=args.enable_reuse).execute(num_retries=self.num_retries)
+        print response["uuid"]
+        return None
+
+
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
+        if args.submit:
+            self.submit(tool, job_order, input_basedir, args, **kwargs)
+            return
+
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
+        self.debug = args.debug
+
         try:
             self.api.collections().get(uuid=crunchrunner_pdh).execute()
         except arvados.errors.ApiError as e:
@@ -383,6 +443,8 @@ class ArvCwlRunner(object):
                     "components": {},
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
 
+            logger.info("Pipeline instance %s", self.pipeline["uuid"])
+
             jobiter = tool.job(job_order,
                                input_basedir,
                                self.output_callback,
@@ -390,42 +452,39 @@ class ArvCwlRunner(object):
                                **kwargs)
 
             try:
+                self.cond.acquire()
+                # Will continue to hold the lock for the duration of this code
+                # except when in cond.wait(), at which point on_message can update
+                # job state and process output callbacks.
+
                 for runnable in jobiter:
                     if runnable:
-                        with self.lock:
-                            runnable.run(**kwargs)
+                        runnable.run(**kwargs)
                     else:
                         if self.jobs:
-                            try:
-                                self.cond.acquire()
-                                self.cond.wait(1)
-                            except RuntimeError:
-                                pass
-                            finally:
-                                self.cond.release()
+                            self.cond.wait(1)
                         else:
-                            logger.error("Workflow cannot make any more progress.")
+                            logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                             break
 
                 while self.jobs:
-                    try:
-                        self.cond.acquire()
-                        self.cond.wait(1)
-                    except RuntimeError:
-                        pass
-                    finally:
-                        self.cond.release()
+                    self.cond.wait(1)
 
                 events.close()
 
                 if self.final_output is None:
                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
 
+                # create final output collection
             except:
-                if sys.exc_info()[0] is not KeyboardInterrupt:
-                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+                if sys.exc_info()[0] is KeyboardInterrupt:
+                    logger.error("Interrupted, marking pipeline as failed")
+                else:
+                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            finally:
+                self.cond.release()
 
             return self.final_output
 
@@ -435,12 +494,13 @@ def main(args, stdout, stderr, api_client=None):
     parser = cwltool.main.arg_parser()
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
     exgroup.add_argument("--disable-reuse", action="store_false",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
+    parser.add_argument("--submit", type=str, help="Submit job and print job uuid.")
 
     try:
         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))