Take sum for outdir requirement in single container
authorjiayong2 <jiayong@math.mit.edu>
Mon, 26 Feb 2018 20:06:59 +0000 (20:06 +0000)
committerjiayong2 <jiayong@math.mit.edu>
Mon, 26 Feb 2018 20:06:59 +0000 (20:06 +0000)
Arvados-DCO-1.1-Signed-off-by: Jiayong Li <jiayong@math.mit.edu>

sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/wf/echo_a.cwl
sdk/cwl/tests/wf/echo_b.cwl

index 15704dba66884e512d10afa5b87cfcfa2f40d78d..5aed871a12bc58d1a747efd2035dbf9d2a23b5a4 100644 (file)
@@ -27,6 +27,9 @@ from .perf import Perf
 logger = logging.getLogger('arvados.cwl-runner')
 metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
+max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
+sum_res_pars = ("outdirMin", "outdirMax")
+
 def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
                     submit_runner_ram=0, name=None, merged_map=None):
 
@@ -72,17 +75,19 @@ def dedup_reqs(reqs):
             dedup[r["class"]] = r
     return [dedup[r] for r in sorted(dedup.keys())]
 
-def get_max_res_req(res_reqs):
-    """Take the max of a list of ResourceRequirement."""
+def get_overall_res_req(res_reqs):
+    """Take the overall of a list of ResourceRequirement,
+    i.e., the max of coresMin, coresMax, ramMin, ramMax, tmpdirMin, tmpdirMax
+    and the sum of outdirMin, outdirMax."""
 
-    total_res_req = {}
+    all_res_req = {}
     exception_msgs = []
-    for a in ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"):
-        total_res_req[a] = []
+    for a in max_res_pars + sum_res_pars:
+        all_res_req[a] = []
         for res_req in res_reqs:
             if a in res_req:
                 if isinstance(res_req[a], int): # integer check
-                    total_res_req[a].append(res_req[a])
+                    all_res_req[a].append(res_req[a])
                 else:
                     msg = SourceLine(res_req).makeError(
                     "Non-top-level ResourceRequirement in single container cannot have expressions")
@@ -90,13 +95,16 @@ def get_max_res_req(res_reqs):
     if exception_msgs:
         raise WorkflowException("\n".join(exception_msgs))
     else:
-        max_res_req = {}
-        for a in total_res_req:
-            if total_res_req[a]:
-                max_res_req[a] = max(total_res_req[a])
-        if max_res_req:
-            max_res_req["class"] = "ResourceRequirement"
-        return cmap(max_res_req)
+        overall_res_req = {}
+        for a in all_res_req:
+            if all_res_req[a]:
+                if a in max_res_pars:
+                    overall_res_req[a] = max(all_res_req[a])
+                elif a in sum_res_pars:
+                    overall_res_req[a] = sum(all_res_req[a])
+        if overall_res_req:
+            overall_res_req["class"] = "ResourceRequirement"
+        return cmap(overall_res_req)
 
 class ArvadosWorkflow(Workflow):
     """Wrap cwltool Workflow to override selected methods."""
@@ -134,8 +142,8 @@ class ArvadosWorkflow(Workflow):
 
                     builder = Builder()
                     builder.job = joborder
-                    builder.requirements = self.requirements
-                    builder.hints = self.hints
+                    builder.requirements = workflowobj["requirements"]
+                    builder.hints = workflowobj["hints"]
                     builder.resources = {}
 
                     res_reqs = {"requirements": [], "hints": []}
@@ -146,7 +154,7 @@ class ArvadosWorkflow(Workflow):
                                     for req in item[t]:
                                         if req["class"] == "ResourceRequirement":
                                             eval_req = {"class": "ResourceRequirement"}
-                                            for a in ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax", "outdirMin", "outdirMax"):
+                                            for a in max_res_pars + sum_res_pars:
                                                 if a in req:
                                                     eval_req[a] = builder.do_eval(req[a])
                                             res_reqs[t].append(eval_req)
@@ -154,16 +162,16 @@ class ArvadosWorkflow(Workflow):
                                     for req in item[t]:
                                         if req["class"] == "ResourceRequirement":
                                             res_reqs[t].append(req)
-                    max_res_req = {"requirements": get_max_res_req(res_reqs["requirements"]),
-                                   "hints": get_max_res_req(res_reqs["hints"])}
+                    overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]),
+                                       "hints": get_overall_res_req(res_reqs["hints"])}
 
                     new_spec = {"requirements": self.requirements, "hints": self.hints}
                     for t in ("requirements", "hints"):
                         for req in new_spec[t]:
                             if req["class"] == "ResourceRequirement":
                                 new_spec[t].remove(req)
-                        if max_res_req[t]:
-                            new_spec[t].append(max_res_req[t])
+                        if overall_res_req[t]:
+                            new_spec[t].append(overall_res_req[t])
 
                     upload_dependencies(self.arvrunner,
                                         kwargs.get("name", ""),
@@ -218,7 +226,7 @@ class ArvadosWorkflow(Workflow):
                 "inputs": self.tool["inputs"],
                 "outputs": self.tool["outputs"],
                 "stdout": "cwl.output.json",
-                "requirements": new_spec["requirements"]+[
+                "requirements": self.requirements+[
                     {
                     "class": "InitialWorkDirRequirement",
                     "listing": [{
@@ -232,7 +240,7 @@ class ArvadosWorkflow(Workflow):
                             "entry": json.dumps(joborder_keepmount, indent=2, sort_keys=True, separators=(',',': ')).replace("\\", "\\\\").replace('$(', '\$(').replace('${', '\${')
                         }]
                 }],
-                "hints": new_spec["hints"],
+                "hints": self.hints,
                 "arguments": ["--no-container", "--move-outputs", "--preserve-entire-environment", "workflow.cwl#main", "cwl.input.yml"],
                 "id": "#"
             })
index 85189869dba91173c5c5e4bc6f29dd59c06e890c..5a6e6dc7e306ca991a9c39f55b4c7b1545bd255f 100644 (file)
@@ -397,7 +397,7 @@ class TestWorkflow(unittest.TestCase):
     @mock.patch("arvados.collection.CollectionReader")
     @mock.patch("arvados.collection.Collection")
     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
-    def test_max_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+    def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
         arvados_cwl.add_arv_hints()
 
         api = mock.MagicMock()
@@ -455,7 +455,7 @@ class TestWorkflow(unittest.TestCase):
                     'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
                     'task.stdout': 'cwl.output.json'}]},
                 'runtime_constraints': {
-                    'min_scratch_mb_per_node': 2048,
+                    'min_scratch_mb_per_node': 4096,
                     'min_cores_per_node': 3,
                     'docker_image': 'arvados/jobs',
                     'min_ram_mb_per_node': 1024
index f16bb09ed59da2d2fabfe1739643006b7e0396b4..b7893e221104308771c334c984ea0ad303646eb6 100644 (file)
@@ -3,6 +3,7 @@ class: CommandLineTool
 requirements:
   ResourceRequirement:
     coresMin: 2
+    outdirMin: 1024
 inputs: []
 outputs: []
 baseCommand: echo
index 1b22157f13cf4f49e33b287c85805064fc08bdc9..4db11ccdf2eb9ff26dcd5305ee528b418cb065ce 100644 (file)
@@ -3,6 +3,7 @@ class: CommandLineTool
 requirements:
   ResourceRequirement:
     coresMin: 3
+    outdirMin: 2048
 inputs: []
 outputs: []
 baseCommand: echo