10576: Running jobs from keep: and arv: prefixes WIP. Tests passing, needs some...
[arvados.git] / sdk / cwl / arvados_cwl / arvcontainer.py
index fa32456c3f2c6b4a9e0a4f455a8662a8ea2fc42d..cea2b1091d400be70014cc512029db8d2b872955 100644 (file)
@@ -2,15 +2,19 @@ import logging
 import json
 import os
 
+import ruamel.yaml as yaml
+
 from cwltool.errors import WorkflowException
 from cwltool.process import get_feature, UnsupportedRequirement, shortname
 from cwltool.pathmapper import adjustFiles
+from cwltool.utils import aslist
 
 import arvados.collection
 
 from .arvdocker import arv_docker_get_image
 from . import done
-from .runner import Runner
+from .runner import Runner, arvados_jobs_image
+from .fsaccess import CollectionFetcher
 
 logger = logging.getLogger('arvados.cwl-runner')
 
@@ -41,6 +45,7 @@ class ArvadosContainer(object):
                 "kind": "tmp"
             }
         }
+        scheduling_parameters = {}
 
         dirs = set()
         for f in self.pathmapper.files():
@@ -60,7 +65,7 @@ class ArvadosContainer(object):
                 }
 
         if self.generatefiles["listing"]:
-            raise UnsupportedRequirement("Generate files not supported")
+            raise UnsupportedRequirement("InitialWorkDirRequirement not supported with --api=containers")
 
         container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
         if self.environment:
@@ -78,7 +83,7 @@ class ArvadosContainer(object):
 
         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
         if not docker_req:
-            docker_req = {"dockerImageId": "arvados/jobs"}
+            docker_req = {"dockerImageId": arvados_jobs_image(self.arvrunner)}
 
         container_request["container_image"] = arv_docker_get_image(self.arvrunner.api,
                                                                      docker_req,
@@ -96,19 +101,26 @@ class ArvadosContainer(object):
 
         runtime_req, _ = get_feature(self, "http://arvados.org/cwl#RuntimeConstraints")
         if runtime_req:
-            logger.warn("RuntimeConstraints not yet supported by container API")
+            if "keep_cache" in runtime_req:
+                runtime_constraints["keep_cache_ram"] = runtime_req["keep_cache"]
+
+        partition_req, _ = get_feature(self, "http://arvados.org/cwl#PartitionRequirement")
+        if partition_req:
+            scheduling_parameters["partitions"] = aslist(partition_req["partition"])
 
         container_request["mounts"] = mounts
         container_request["runtime_constraints"] = runtime_constraints
+        container_request["use_existing"] = kwargs.get("enable_reuse", True)
+        container_request["scheduling_parameters"] = scheduling_parameters
 
         try:
             response = self.arvrunner.api.container_requests().create(
                 body=container_request
             ).execute(num_retries=self.arvrunner.num_retries)
 
-            self.arvrunner.processes[response["container_uuid"]] = self
+            self.arvrunner.processes[response["uuid"]] = self
 
-            logger.info("Container %s (%s) request state is %s", self.name, response["uuid"], response["state"])
+            logger.info("Container request %s (%s) state is %s", self.name, response["uuid"], response["state"])
 
             if response["state"] == "Final":
                 self.done(response)
@@ -118,8 +130,11 @@ class ArvadosContainer(object):
 
     def done(self, record):
         try:
-            if record["state"] == "Complete":
-                rcode = record["exit_code"]
+            container = self.arvrunner.api.containers().get(
+                uuid=record["container_uuid"]
+            ).execute(num_retries=self.arvrunner.num_retries)
+            if container["state"] == "Complete":
+                rcode = container["exit_code"]
                 if self.successCodes and rcode in self.successCodes:
                     processStatus = "success"
                 elif self.temporaryFailCodes and rcode in self.temporaryFailCodes:
@@ -133,17 +148,14 @@ class ArvadosContainer(object):
             else:
                 processStatus = "permanentFail"
 
-            try:
-                outputs = {}
-                if record["output"]:
-                    outputs = done.done(self, record, "/tmp", self.outdir, "/keep")
-            except WorkflowException as e:
-                logger.error("Error while collecting container outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
-                processStatus = "permanentFail"
-            except Exception as e:
-                logger.exception("Got unknown exception while collecting job outputs:")
-                processStatus = "permanentFail"
+            outputs = {}
 
+            if container["output"]:
+                try:
+                    outputs = done.done_outputs(self, container, "/tmp", self.outdir, "/keep")
+                except Exception as e:
+                    logger.error("Got error %s" % str(e))
+                    self.output_callback({}, "permanentFail")
             self.output_callback(outputs, processStatus)
         finally:
             del self.arvrunner.processes[record["uuid"]]
@@ -168,36 +180,17 @@ class RunnerContainer(Runner):
                 json.dump(self.job_order, f, sort_keys=True, indent=4)
             jobobj.save_new(owner_uuid=self.arvrunner.project_uuid)
 
-        workflowname = os.path.basename(self.tool.tool["id"])
-        workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
-        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
-        workflowcollection = workflowcollection[5:workflowcollection.index('/')]
         jobpath = "/var/lib/cwl/job/cwl.input.json"
 
-        container_image = arv_docker_get_image(self.arvrunner.api,
-                                               {"dockerImageId": "arvados/jobs"},
-                                               pull_image,
-                                               self.arvrunner.project_uuid)
-
-        command = ["arvados-cwl-runner", "--local", "--api=containers"]
-        if self.output_name:
-            command.append("--output-name=" + self.output_name)
-        command.extend([workflowpath, jobpath])
-
-        return {
-            "command": command,
+        container_req = {
             "owner_uuid": self.arvrunner.project_uuid,
             "name": self.name,
             "output_path": "/var/spool/cwl",
             "cwd": "/var/spool/cwl",
             "priority": 1,
             "state": "Committed",
-            "container_image": container_image,
+            "container_image": arvados_jobs_image(self.arvrunner),
             "mounts": {
-                "/var/lib/cwl/workflow": {
-                    "kind": "collection",
-                    "portable_data_hash": "%s" % workflowcollection
-                },
                 jobpath: {
                     "kind": "collection",
                     "portable_data_hash": "%s/cwl.input.json" % jobobj.portable_data_hash()
@@ -213,11 +206,50 @@ class RunnerContainer(Runner):
             },
             "runtime_constraints": {
                 "vcpus": 1,
-                "ram": 1024*1024*256,
+                "ram": 1024*1024 * self.submit_runner_ram,
                 "API": True
             }
         }
 
+        workflowcollection = workflowmapper.mapper(self.tool.tool["id"])[1]
+        if workflowcollection.startswith("keep:"):
+            workflowcollection = workflowcollection[5:workflowcollection.index('/')]
+            workflowname = os.path.basename(self.tool.tool["id"])
+            workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
+            container_req["mounts"]["/var/lib/cwl/workflow"] = {
+                "kind": "collection",
+                "portable_data_hash": "%s" % workflowcollection
+                }
+        elif workflowcollection.startswith("arvwf:"):
+            workflowpath = "/var/lib/cwl/workflow.json#main"
+            fetcher = CollectionFetcher({}, None,
+                                        api_client=self.arvrunner.api,
+                                        keep_client=self.arvrunner.keep_client)
+            wfobj = yaml.safe_load(fetcher.fetch_text(workflowcollection))
+            container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
+                "kind": "json",
+                "json": wfobj
+            }
+
+        command = ["arvados-cwl-runner", "--local", "--api=containers"]
+        if self.output_name:
+            command.append("--output-name=" + self.output_name)
+
+        if self.output_tags:
+            command.append("--output-tags=" + self.output_tags)
+
+        if self.enable_reuse:
+            command.append("--enable-reuse")
+        else:
+            command.append("--disable-reuse")
+
+        command.extend([workflowpath, jobpath])
+
+        container_req["command"] = command
+
+        return container_req
+
+
     def run(self, *args, **kwargs):
         kwargs["keepprefix"] = "keep:"
         job_spec = self.arvados_job_spec(*args, **kwargs)
@@ -228,9 +260,23 @@ class RunnerContainer(Runner):
         ).execute(num_retries=self.arvrunner.num_retries)
 
         self.uuid = response["uuid"]
-        self.arvrunner.processes[response["container_uuid"]] = self
+        self.arvrunner.processes[response["uuid"]] = self
 
         logger.info("Submitted container %s", response["uuid"])
 
-        if response["state"] in ("Complete", "Failed", "Cancelled"):
+        if response["state"] == "Final":
             self.done(response)
+
+    def done(self, record):
+        try:
+            container = self.arvrunner.api.containers().get(
+                uuid=record["container_uuid"]
+            ).execute(num_retries=self.arvrunner.num_retries)
+        except Exception as e:
+            logger.exception("While getting runner container: %s", e)
+            self.arvrunner.output_callback({}, "permanentFail")
+            del self.arvrunner.processes[record["uuid"]]
+        else:
+            super(RunnerContainer, self).done(container)
+        finally:
+            del self.arvrunner.processes[record["uuid"]]