8558: add min_cores_per_node and min_ram_mb_per_node to runtime_constraints from...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 6cfdd0b2ab565d01b69678f36f32cd1189afca12..ca807951bd8634db11fcc70c95f1899952e62b14 100644 (file)
@@ -20,14 +20,20 @@ import os
 import sys
 
 from cwltool.process import get_feature
+from arvados.api import OrderedJsonModel
 
 logger = logging.getLogger('arvados.cwl-runner')
 logger.setLevel(logging.INFO)
 
-crunchrunner_pdh = "721abe848fd8e6e6d1c99b920e6b7a2c+140"
+crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
 
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+
+
 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
@@ -146,6 +152,19 @@ class ArvadosJob(object):
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
 
+        resources = self.builder.resources
+        if resources is not None:
+            if "coresMin" in resources.keys():
+                try:
+                    runtime_constraints["min_cores_per_node"] = int(resources["coresMin"])
+                except:
+                    runtime_constraints["min_cores_per_node"] = None
+            if "ramMin" in resources.keys():
+                try:
+                    runtime_constraints["min_ram_mb_per_node"] = int(resources["ramMin"])
+                except:
+                    runtime_constraints["min_ram_mb_per_node"] = None
+
         try:
             response = self.arvrunner.api.jobs().create(body={
                 "script": "crunchrunner",
@@ -193,8 +212,27 @@ class ArvadosJob(object):
             try:
                 outputs = {}
                 if record["output"]:
-                    self.builder.outdir = "keep:" + record["output"]
-                    outputs = self.collect_outputs(self.builder.outdir)
+                    logc = arvados.collection.Collection(record["log"])
+                    log = logc.open(logc.keys()[0])
+                    tmpdir = None
+                    outdir = None
+                    keepdir = None
+                    for l in log.readlines():
+                        g = tmpdirre.match(l)
+                        if g:
+                            tmpdir = g.group(1)
+                        g = outdirre.match(l)
+                        if g:
+                            outdir = g.group(1)
+                        g = keepre.match(l)
+                        if g:
+                            keepdir = g.group(1)
+                        if tmpdir and outdir and keepdir:
+                            break
+
+                    self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
+                    outputs = self.collect_outputs("keep:" + record["output"])
             except Exception as e:
                 logger.exception("Got exception while collecting job outputs:")
                 processStatus = "permanentFail"
@@ -237,6 +275,15 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
             arvrunner.add_uploaded(src, (ab, st.fn))
             self._pathmap[src] = (ab, st.fn)
 
+        self.keepdir = None
+
+    def reversemap(self, target):
+        if target.startswith("keep:"):
+            return target
+        elif self.keepdir and target.startswith(self.keepdir):
+            return "keep:" + target[len(self.keepdir)+1:]
+        else:
+            return super(ArvPathMapper, self).reversemap(target)
 
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
@@ -343,6 +390,7 @@ class ArvCwlRunner(object):
             jobiter = tool.job(job_order,
                                input_basedir,
                                self.output_callback,
+                               docker_outdir="$(task.outdir)",
                                **kwargs)
 
             try:
@@ -398,7 +446,7 @@ def main(args, stdout, stderr, api_client=None):
                         help="")
 
     try:
-        runner = ArvCwlRunner(api_client=arvados.api('v1'))
+        runner = ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
     except Exception as e:
         logger.error(e)
         return 1