8815: Rely on system-provided crunchrunner. Also use arvados/jobs by default if...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 0f69ff6bf593f3d242bbdba64d7c67daab94970b..e3fd1fccd372d75abf06445f32fe2a13f8a8e66c 100644 (file)
@@ -11,6 +11,7 @@ import cwltool.draft2tool
 import cwltool.workflow
 import cwltool.main
 from cwltool.process import shortname
+from cwltool.errors import WorkflowException
 import threading
 import cwltool.docker
 import fnmatch
@@ -25,16 +26,12 @@ from arvados.api import OrderedJsonModel
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
-crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
-certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
-
 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
 
-def arv_docker_get_image(api_client, dockerRequirement, pull_image):
+def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
 
@@ -48,10 +45,10 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image):
 
     if not images:
         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
-        args = [image_name]
+        args = ["--project-uuid="+project_uuid, image_name]
         if image_tag:
             args.append(image_tag)
-        logger.info("Uploading Docker image %s", ":".join(args))
+        logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args)
 
     return dockerRequirement["dockerImageId"]
@@ -150,33 +147,24 @@ class ArvadosJob(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if docker_req and kwargs.get("use_container") is not False:
-            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
+            runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
+        else:
+            runtime_constraints["docker_image"] = "arvados/jobs"
 
         resources = self.builder.resources
         if resources is not None:
-            keys = resources.keys()
-            if "coresMin" in keys:
-                try:
-                    runtime_constraints["min_cores_per_node"] = int(resources["coresMin"])
-                except:
-                    runtime_constraints["min_cores_per_node"] = None
-            if "ramMin" in keys:
-                try:
-                    runtime_constraints["min_ram_mb_per_node"] = int(resources["ramMin"])
-                except:
-                    runtime_constraints["min_ram_mb_per_node"] = None
-            if "tmpdirMin" in keys:
-                try:
-                    runtime_constraints["min_scratch_mb_per_node"] = int(resources["tmpdirMin"])
-                except:
-                    runtime_constraints["min_scratch_mb_per_node"] = None
+            runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
+            runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
+            runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
 
         try:
             response = self.arvrunner.api.jobs().create(body={
+                "owner_uuid": self.arvrunner.project_uuid,
                 "script": "crunchrunner",
                 "repository": "arvados",
                 "script_version": "master",
-                "script_parameters": {"tasks": [script_parameters], "crunchrunner": crunchrunner_pdh+"/crunchrunner"},
+                "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                "script_parameters": {"tasks": [script_parameters]},
                 "runtime_constraints": runtime_constraints
             }, find_or_create=kwargs.get("enable_reuse", True)).execute(num_retries=self.arvrunner.num_retries)
 
@@ -223,7 +211,15 @@ class ArvadosJob(object):
                     tmpdir = None
                     outdir = None
                     keepdir = None
-                    for l in log.readlines():
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
                         g = tmpdirre.match(l)
                         if g:
                             tmpdir = g.group(1)
@@ -233,14 +229,49 @@ class ArvadosJob(object):
                         g = keepre.match(l)
                         if g:
                             keepdir = g.group(1)
-                        if tmpdir and outdir and keepdir:
-                            break
+
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
+
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
+                        # First, get output record.
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
+                        ).execute(num_retries=self.arvrunner.num_retries)
+
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
 
                     self.builder.outdir = outdir
                     self.builder.pathmapper.keepdir = keepdir
                     outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
             except Exception as e:
-                logger.exception("Got exception while collecting job outputs:")
+                logger.exception("Got unknown exception while collecting job outputs:")
                 processStatus = "permanentFail"
 
             self.output_callback(outputs, processStatus)
@@ -275,7 +306,8 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
                                              arvrunner.api,
                                              dry_run=kwargs.get("dry_run"),
                                              num_retries=3,
-                                             fnPattern="$(task.keep)/%s/%s")
+                                             fnPattern="$(task.keep)/%s/%s",
+                                             project=arvrunner.project_uuid)
 
         for src, ab, st in uploadfiles:
             arvrunner.add_uploaded(src, (ab, st.fn))
@@ -285,9 +317,9 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
 
     def reversemap(self, target):
         if target.startswith("keep:"):
-            return target
+            return (target, target)
         elif self.keepdir and target.startswith(self.keepdir):
-            return "keep:" + target[len(self.keepdir)+1:]
+            return (target, "keep:" + target[len(self.keepdir)+1:])
         else:
             return super(ArvPathMapper, self).reversemap(target)
 
@@ -335,24 +367,24 @@ class ArvCwlRunner(object):
 
     def on_message(self, event):
         if "object_uuid" in event:
-                if event["object_uuid"] in self.jobs and event["event_type"] == "update":
-                    if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
-                        uuid = event["object_uuid"]
-                        with self.lock:
-                            j = self.jobs[uuid]
-                            logger.info("Job %s (%s) is Running", j.name, uuid)
-                            j.running = True
-                            j.update_pipeline_component(event["properties"]["new_attributes"])
-                    elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
-                        uuid = event["object_uuid"]
-                        try:
-                            self.cond.acquire()
-                            j = self.jobs[uuid]
-                            logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
-                            j.done(event["properties"]["new_attributes"])
-                            self.cond.notify()
-                        finally:
-                            self.cond.release()
+            if event["object_uuid"] in self.jobs and event["event_type"] == "update":
+                if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
+                    uuid = event["object_uuid"]
+                    with self.lock:
+                        j = self.jobs[uuid]
+                        logger.info("Job %s (%s) is Running", j.name, uuid)
+                        j.running = True
+                        j.update_pipeline_component(event["properties"]["new_attributes"])
+                elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
+                    uuid = event["object_uuid"]
+                    try:
+                        self.cond.acquire()
+                        j = self.jobs[uuid]
+                        logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
+                        j.done(event["properties"]["new_attributes"])
+                        self.cond.notify()
+                    finally:
+                        self.cond.release()
 
     def get_uploaded(self):
         return self.uploaded.copy()
@@ -363,21 +395,7 @@ class ArvCwlRunner(object):
     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
 
-        try:
-            self.api.collections().get(uuid=crunchrunner_pdh).execute()
-        except arvados.errors.ApiError as e:
-            import httplib2
-            h = httplib2.Http(ca_certs=arvados.util.ca_certs_path())
-            resp, content = h.request(crunchrunner_download, "GET")
-            resp2, content2 = h.request(certs_download, "GET")
-            with arvados.collection.Collection() as col:
-                with col.open("crunchrunner", "w") as f:
-                    f.write(content)
-                with col.open("ca-certificates.crt", "w") as f:
-                    f.write(content2)
-
-                col.save_new("crunchrunner binary", ensure_unique_name=True)
-
+        self.debug = args.debug
         self.fs_access = CollectionFsAccess(input_basedir)
 
         kwargs["fs_access"] = self.fs_access
@@ -386,12 +404,20 @@ class ArvCwlRunner(object):
         kwargs["outdir"] = "$(task.outdir)"
         kwargs["tmpdir"] = "$(task.tmpdir)"
 
+        useruuid = self.api.users().current().execute()["uuid"]
+        self.project_uuid = args.project_uuid if args.project_uuid else useruuid
+
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
         else:
-            self.pipeline = self.api.pipeline_instances().create(body={"name": shortname(tool.tool["id"]),
-                                                                   "components": {},
-                                                                   "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+            self.pipeline = self.api.pipeline_instances().create(
+                body={
+                    "owner_uuid": self.project_uuid,
+                    "name": shortname(tool.tool["id"]),
+                    "components": {},
+                    "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
+
+            logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
             jobiter = tool.job(job_order,
                                input_basedir,
@@ -400,42 +426,39 @@ class ArvCwlRunner(object):
                                **kwargs)
 
             try:
+                self.cond.acquire()
+                # Will continue to hold the lock for the duration of this code
+                # except when in cond.wait(), at which point on_message can update
+                # job state and process output callbacks.
+
                 for runnable in jobiter:
                     if runnable:
-                        with self.lock:
-                            runnable.run(**kwargs)
+                        runnable.run(**kwargs)
                     else:
                         if self.jobs:
-                            try:
-                                self.cond.acquire()
-                                self.cond.wait(1)
-                            except RuntimeError:
-                                pass
-                            finally:
-                                self.cond.release()
+                            self.cond.wait(1)
                         else:
-                            logger.error("Workflow cannot make any more progress.")
+                            logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
                             break
 
                 while self.jobs:
-                    try:
-                        self.cond.acquire()
-                        self.cond.wait(1)
-                    except RuntimeError:
-                        pass
-                    finally:
-                        self.cond.release()
+                    self.cond.wait(1)
 
                 events.close()
 
                 if self.final_output is None:
                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
 
+                # create final output collection
             except:
-                if sys.exc_info()[0] is not KeyboardInterrupt:
-                    logger.exception("Caught unhandled exception, marking pipeline as failed")
+                if sys.exc_info()[0] is KeyboardInterrupt:
+                    logger.error("Interrupted, marking pipeline as failed")
+                else:
+                    logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
+            finally:
+                self.cond.release()
 
             return self.final_output
 
@@ -445,11 +468,12 @@ def main(args, stdout, stderr, api_client=None):
     parser = cwltool.main.arg_parser()
     exgroup = parser.add_mutually_exclusive_group()
     exgroup.add_argument("--enable-reuse", action="store_true",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
     exgroup.add_argument("--disable-reuse", action="store_false",
-                        default=False, dest="enable_reuse",
+                        default=True, dest="enable_reuse",
                         help="")
+    parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
 
     try:
         runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))