8558: add min_cores_per_node and min_ram_mb_per_node to runtime_constraints from...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
index 105d754aa424901201189741cf4d354f35c92c01..ca807951bd8634db11fcc70c95f1899952e62b14 100644 (file)
@@ -29,9 +29,9 @@ crunchrunner_pdh = "83db29f08544e1c319572a6bd971088a+140"
 crunchrunner_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/crunchrunner"
 certs_download = "https://cloud.curoverse.com/collections/download/qr1hi-4zz18-n3m1yxd0vx78jic/1i1u2qtq66k1atziv4ocfgsg5nu5tj11n4r6e0bhvjg03rix4m/ca-certificates.crt"
 
-tmpdirre = re.compile(r"\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
-outdirre = re.compile(r"\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
-keepre = re.compile(r"\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
+tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
+outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
+keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
 
 
 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
@@ -152,6 +152,19 @@ class ArvadosJob(object):
         if docker_req and kwargs.get("use_container") is not False:
             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
 
+        resources = self.builder.resources
+        if resources is not None:
+            if "coresMin" in resources.keys():
+                try:
+                    runtime_constraints["min_cores_per_node"] = int(resources["coresMin"])
+                except:
+                    runtime_constraints["min_cores_per_node"] = None
+            if "ramMin" in resources.keys():
+                try:
+                    runtime_constraints["min_ram_mb_per_node"] = int(resources["ramMin"])
+                except:
+                    runtime_constraints["min_ram_mb_per_node"] = None
+
         try:
             response = self.arvrunner.api.jobs().create(body={
                 "script": "crunchrunner",
@@ -218,6 +231,7 @@ class ArvadosJob(object):
                             break
 
                     self.builder.outdir = outdir
+                    self.builder.pathmapper.keepdir = keepdir
                     outputs = self.collect_outputs("keep:" + record["output"])
             except Exception as e:
                 logger.exception("Got exception while collecting job outputs:")
@@ -261,6 +275,15 @@ class ArvPathMapper(cwltool.pathmapper.PathMapper):
             arvrunner.add_uploaded(src, (ab, st.fn))
             self._pathmap[src] = (ab, st.fn)
 
+        self.keepdir = None
+
+    def reversemap(self, target):
+        if target.startswith("keep:"):
+            return target
+        elif self.keepdir and target.startswith(self.keepdir):
+            return "keep:" + target[len(self.keepdir)+1:]
+        else:
+            return super(ArvPathMapper, self).reversemap(target)
 
 
 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
@@ -367,6 +390,7 @@ class ArvCwlRunner(object):
             jobiter = tool.job(job_order,
                                input_basedir,
                                self.output_callback,
+                               docker_outdir="$(task.outdir)",
                                **kwargs)
 
             try: