8442: CWL create crunch2 containers WIP
authorPeter Amstutz <peter.amstutz@curoverse.com>
Mon, 6 Jun 2016 19:24:50 +0000 (15:24 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 15 Jun 2016 20:09:45 +0000 (16:09 -0400)
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py [new file with mode: 0644]

index 371dd4fb8c5252ac7f37bddb240176dd389405d4..bf7ccc0f1a5692c5a3ac8af1630041b0840a3c26 100644 (file)
@@ -61,6 +61,8 @@ def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid
         logger.info("Uploading Docker image %s", ":".join(args[1:]))
         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
 
+    # XXX return PDH instead
+
     return dockerRequirement["dockerImageId"]
 
 
@@ -591,7 +593,10 @@ class ArvadosCommandTool(CommandLineTool):
         self.arvrunner = arvrunner
 
     def makeJobRunner(self):
-        return ArvadosJob(self.arvrunner)
+        if kwargs.get("crunch2"):
+            return ArvadosContainer(self.arvrunner)
+        else:
+            return ArvadosJob(self.arvrunner)
 
     def makePathMapper(self, reffiles, **kwargs):
         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
@@ -704,8 +709,12 @@ class ArvCwlRunner(object):
         kwargs["fs_access"] = self.fs_access
         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
 
-        kwargs["outdir"] = "$(task.outdir)"
-        kwargs["tmpdir"] = "$(task.tmpdir)"
+        if kwargs.get("crunch2"):
+            kwargs["outdir"] = "/var/spool/cwl"
+            kwargs["tmpdir"] = "/tmp"
+        else:
+            kwargs["outdir"] = "$(task.outdir)"
+            kwargs["tmpdir"] = "$(task.tmpdir)"
 
         if kwargs.get("conformance_test"):
             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py
new file mode 100644 (file)
index 0000000..fe0f7ca
--- /dev/null
@@ -0,0 +1,160 @@
+class ArvadosContainer(object):
+    """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
+
+    def __init__(self, runner):
+        self.arvrunner = runner
+        self.running = False
+
+    def run(self, dry_run=False, pull_image=True, **kwargs):
+        container_request = {
+            "command": self.command_line
+            "owner_uuid": self.arvrunner.project_uuid,
+            "name": self.name,
+            "output_path", "/var/spool/cwl",
+            "cwd", "/var/spool/cwl",
+            "priority": 1
+        }
+        runtime_constraints = {}
+        mounts = {}
+
+        if self.generatefiles:
+            vwd = arvados.collection.Collection()
+            container_request["task.vwd"] = {}
+            for t in self.generatefiles:
+                if isinstance(self.generatefiles[t], dict):
+                    src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
+                    vwd.copy(rest, t, source_collection=src)
+                else:
+                    with vwd.open(t, "w") as f:
+                        f.write(self.generatefiles[t])
+            vwd.save_new()
+            # TODO
+            # for t in self.generatefiles:
+            #     container_request["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+
+        container_request["environment"] = {"TMPDIR": "/tmp"}
+        if self.environment:
+            container_request["environment"].update(self.environment)
+
+        # TODO, not supported
+        #if self.stdin:
+        #    container_request["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+
+        if self.stdout:
+            mounts["stdout"] = {"kind": "file",
+                                "path": self.stdout}
+
+        (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
+        if not docker_req:
+            docker_req = "arvados/jobs"
+
+        container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
+                                                                     docker_req,
+                                                                     pull_image,
+                                                                     self.arvrunner.project_uuid)
+
+        resources = self.builder.resources
+        if resources is not None:
+            runtime_constraints["vcpus"] = resources.get("cores", 1)
+            runtime_constraints["ram"] = resources.get("ram") * 2**20
+            #runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
+
+        container_request["mounts"] = mounts
+        container_request["runtime_constraints"] = runtime_constraints
+
+        try:
+            response = self.arvrunner.api.container_requests().create(
+                body=container_request
+            ).execute(num_retries=self.arvrunner.num_retries)
+
+            self.arvrunner.jobs[response["uuid"]] = self
+
+            logger.info("Container %s (%s) is %s", self.name, response["uuid"], response["state"])
+
+            if response["state"] in ("Complete", "Cancelled"):
+                self.done(response)
+        except Exception as e:
+            logger.error("Got error %s" % str(e))
+            self.output_callback({}, "permanentFail")
+
+    def done(self, record):
+        try:
+            if record["state"] == "Complete":
+                processStatus = "success"
+            else:
+                processStatus = "permanentFail"
+
+            try:
+                outputs = {}
+                if record["output"]:
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log:
+                        # Determine the tmpdir, outdir and keepdir paths from
+                        # the job run.  Unfortunately, we can't take the first
+                        # values we find (which are expected to be near the
+                        # top) and stop scanning because if the node fails and
+                        # the job restarts on a different node these values
+                        # will different runs, and we need to know about the
+                        # final run that actually produced output.
+
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+
+                    colname = "Output %s of %s" % (record["output"][0:7], self.name)
+
+                    # check if collection already exists with same owner, name and content
+                    collection_exists = self.arvrunner.api.collections().list(
+                        filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
+                                 ['portable_data_hash', '=', record["output"]],
+                                 ["name", "=", colname]]
+                    ).execute(num_retries=self.arvrunner.num_retries)
+
+                    if not collection_exists["items"]:
+                        # Create a collection located in the same project as the
+                        # pipeline with the contents of the output.
+                        # First, get output record.
+                        collections = self.arvrunner.api.collections().list(
+                            limit=1,
+                            filters=[['portable_data_hash', '=', record["output"]]],
+                            select=["manifest_text"]
+                        ).execute(num_retries=self.arvrunner.num_retries)
+
+                        if not collections["items"]:
+                            raise WorkflowException(
+                                "Job output '%s' cannot be found on API server" % (
+                                    record["output"]))
+
+                        # Create new collection in the parent project
+                        # with the output contents.
+                        self.arvrunner.api.collections().create(body={
+                            "owner_uuid": self.arvrunner.project_uuid,
+                            "name": colname,
+                            "portable_data_hash": record["output"],
+                            "manifest_text": collections["items"][0]["manifest_text"]
+                        }, ensure_unique_name=True).execute(
+                            num_retries=self.arvrunner.num_retries)
+
+                    self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
+                    outputs = self.collect_outputs("keep:" + record["output"])
+            except WorkflowException as e:
+                logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
+                processStatus = "permanentFail"
+            except Exception as e:
+                logger.exception("Got unknown exception while collecting job outputs:")
+                processStatus = "permanentFail"
+
+            self.output_callback(outputs, processStatus)
+        finally:
+            del self.arvrunner.jobs[record["uuid"]]