14510: Update tests/docs for --collection-cache-size
authorPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 27 Nov 2018 22:26:07 +0000 (17:26 -0500)
committerPeter Amstutz <pamstutz@veritasgenetics.com>
Tue, 27 Nov 2018 22:26:07 +0000 (17:26 -0500)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz@veritasgenetics.com>

doc/user/cwl/cwl-extensions.html.textile.liquid
sdk/cwl/arvados_cwl/arv-cwl-schema.yml
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/task_queue.py
sdk/cwl/tests/test_submit.py
sdk/cwl/tests/test_tq.py

index f2dd937d95976d3a770215dcd57faec1a0010e70..d62002237a7e7b1d43aa7c59f4ef1afa7bc38b84 100644 (file)
@@ -138,7 +138,8 @@ table(table table-bordered table-condensed).
 |_. Field |_. Type |_. Description |
 |ramMin|int|RAM, in mebibytes, to reserve for the arvados-cwl-runner process. Default 1 GiB|
 |coresMin|int|Number of cores to reserve to the arvados-cwl-runner process. Default 1 core.|
-|keep_cache|int|RAM, in mebibytes, to reserve for caching keep collection metadata. Default 256 MiB|
+|keep_cache|int|Size of collection metadata cache for the workflow runner, in MiB.  Default 256 MiB.  Will be added on to the RAM request when determining node size to request.|
+
 h2(#clustertarget). arv:ClusterTarget
 
 Specify which Arvados cluster should execute a container or subworkflow, and the parent project for the container request.
index 902b1ffba299240438c60c8a0a866db598b2a101..dce1bd4d0247d2f56af8902f844814633b739b25 100644 (file)
@@ -233,6 +233,13 @@ $graph:
       type: int?
       doc: Minimum cores allocated to cwl-runner
       jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+    keep_cache:
+      type: int?
+      doc: |
+        Size of collection metadata cache for the workflow runner, in
+        MiB.  Default 256 MiB.  Will be added on to the RAM request
+        when determining node size to request.
+      jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache"
 
 - name: ClusterTarget
   type: record
index 3589fad276c25fa4e666a8f51effcd485eaf8bf8..caf954fcb038108961c1cf296010ff0468a24fbc 100644 (file)
@@ -95,6 +95,7 @@ class ArvCwlExecutor(object):
             arvargs.output_name = None
             arvargs.output_tags = None
             arvargs.thread_count = 1
+            arvargs.collection_cache_size = None
 
         self.api = api_client
         self.processes = {}
index 018172b591b761b90825c9160b366b28a35d6b94..1c233fac0ad98f4b0421a4e0856b00fd19d1422f 100644 (file)
@@ -23,36 +23,36 @@ class TaskQueue(object):
             t.start()
 
     def task_queue_func(self):
+        while True:
+            task = self.task_queue.get()
+            if task is None:
+                return
+            try:
+                task()
+            except Exception as e:
+                logger.exception("Unhandled exception running task")
+                self.error = e
 
-            while True:
-                task = self.task_queue.get()
-                if task is None:
-                    return
-                try:
-                    task()
-                except Exception as e:
-                    logger.exception("Unhandled exception running task")
-                    self.error = e
-
-                with self.lock:
-                    self.in_flight -= 1
+            with self.lock:
+                self.in_flight -= 1
 
     def add(self, task, unlock, check_done):
-        with self.lock:
-            if self.thread_count > 1:
+        if self.thread_count > 1:
+            with self.lock:
                 self.in_flight += 1
-            else:
-                task()
-                return
+        else:
+            task()
+            return
 
         while True:
             try:
                 unlock.release()
+                if check_done.is_set():
+                    return
                 self.task_queue.put(task, block=True, timeout=3)
                 return
             except Queue.Full:
-                if check_done.is_set():
-                    return
+                pass
             finally:
                 unlock.acquire()
 
index bf2791d728286c7ccd66431766e34d9a77b66392..a7a21e709ef200341ed79aa31f917c7a990a87a3 100644 (file)
@@ -274,7 +274,7 @@ def stubs(func):
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
-                        '--enable-reuse', '--debug', '--on-error=continue',
+                        '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
             'container_image': '999999999999999999999999999999d3+99',
@@ -283,7 +283,7 @@ def stubs(func):
             'runtime_constraints': {
                 'API': True,
                 'vcpus': 1,
-                'ram': 1024*1024*1024
+                'ram': (1024+256)*1024*1024
             },
             'use_existing': True,
             'properties': {},
@@ -559,7 +559,8 @@ class TestSubmit(unittest.TestCase):
             'arvados-cwl-runner', '--local', '--api=containers',
             '--no-log-timestamps', '--disable-validate',
             '--eval-timeout=20', '--thread-count=4',
-            '--disable-reuse', '--debug', '--on-error=continue',
+            '--disable-reuse', "--collection-cache-size=256",
+            '--debug', '--on-error=continue',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
 
@@ -584,7 +585,7 @@ class TestSubmit(unittest.TestCase):
             'arvados-cwl-runner', '--local', '--api=containers',
             '--no-log-timestamps', '--disable-validate',
             '--eval-timeout=20', '--thread-count=4',
-            '--disable-reuse', '--debug', '--on-error=continue',
+            '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
         expect_container["name"] = "submit_wf_no_reuse.cwl"
@@ -621,7 +622,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=stop',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=stop',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -647,7 +649,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse',
+                                       '--enable-reuse', "--collection-cache-size=256",
                                        "--output-name="+output_name, '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["output_name"] = output_name
@@ -673,7 +675,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', "--debug",
+                                       '--enable-reuse', "--collection-cache-size=256", "--debug",
                                        "--storage-classes=foo", '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -740,7 +742,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256", '--debug',
+                                       '--on-error=continue',
                                        "--intermediate-output-ttl=3600",
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -765,7 +768,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=continue',
                                        "--trash-intermediate",
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -792,7 +796,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=4',
-                                       '--enable-reuse',
+                                       '--enable-reuse', "--collection-cache-size=256",
                                        "--output-tags="+output_tags, '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -814,7 +818,7 @@ class TestSubmit(unittest.TestCase):
             logging.exception("")
 
         expect_container = copy.deepcopy(stubs.expect_container_spec)
-        expect_container["runtime_constraints"]["ram"] = 2048*1024*1024
+        expect_container["runtime_constraints"]["ram"] = (2048+256)*1024*1024
 
         stubs.api.container_requests().create.assert_called_with(
             body=JsonDiffMatcher(expect_container))
@@ -877,13 +881,13 @@ class TestSubmit(unittest.TestCase):
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
-                        '--enable-reuse', '--debug', '--on-error=continue',
+                        '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,
                 'vcpus': 1,
-                'ram': 1073741824
+                'ram': 1342177280
             },
             'use_existing': True,
             'properties': {},
@@ -999,13 +1003,13 @@ class TestSubmit(unittest.TestCase):
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
                         '--eval-timeout=20', '--thread-count=4',
-                        '--enable-reuse', '--debug', '--on-error=continue',
+                        '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
             'runtime_constraints': {
                 'API': True,
                 'vcpus': 1,
-                'ram': 1073741824
+                'ram': 1342177280
             },
             'use_existing': True,
             'properties': {
@@ -1059,7 +1063,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        "--eval-timeout=20", "--thread-count=4",
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256", '--debug',
+                                       '--on-error=continue',
                                        '--project-uuid='+project_uuid,
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -1085,7 +1090,34 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=60.0', '--thread-count=4',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=continue',
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+    @stubs
+    def test_submit_container_collection_cache(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--collection-cache-size=500",
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+                                       '--no-log-timestamps', '--disable-validate',
+                                       '--eval-timeout=60.0', '--thread-count=4',
+                                       '--enable-reuse', "--collection-cache-size=500",
+                                       '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -1111,7 +1143,8 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
                                        '--eval-timeout=20', '--thread-count=20',
-                                       '--enable-reuse', '--debug', '--on-error=continue',
+                                       '--enable-reuse', "--collection-cache-size=256",
+                                       '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
@@ -1197,7 +1230,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["runtime_constraints"] = {
             "API": True,
             "vcpus": 2,
-            "ram": 2000 * 2**20
+            "ram": (2000+512) * 2**20
         }
         expect_container["name"] = "submit_wf_runner_resources.cwl"
         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
@@ -1211,6 +1244,11 @@ class TestSubmit(unittest.TestCase):
         expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
             "arv": "http://arvados.org/cwl#",
         }
+        expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
+                        '--no-log-timestamps', '--disable-validate',
+                        '--eval-timeout=20', '--thread-count=4',
+                        '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
+                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
         stubs.api.container_requests().create.assert_called_with(
             body=JsonDiffMatcher(expect_container))
@@ -1280,6 +1318,7 @@ class TestSubmit(unittest.TestCase):
                 "--eval-timeout=20",
                 '--thread-count=4',
                 "--enable-reuse",
+                "--collection-cache-size=256",
                 '--debug',
                 "--on-error=continue",
                 "/var/lib/cwl/workflow.json#main",
@@ -1407,7 +1446,7 @@ class TestSubmit(unittest.TestCase):
             "properties": {},
             "runtime_constraints": {
                 "API": True,
-                "ram": 1073741824,
+                "ram": 1342177280,
                 "vcpus": 1
             },
             "secret_mounts": {
index 2afbe0cff25f3d26e63253e697f3238468680e0f..a094890650e1a3049f177e9f01ec2330df7c7451 100644 (file)
@@ -22,29 +22,37 @@ def fail_task():
 class TestTaskQueue(unittest.TestCase):
     def test_tq(self):
         tq = TaskQueue(threading.Lock(), 2)
+        try:
+            self.assertIsNone(tq.error)
 
-        self.assertIsNone(tq.error)
-
-        tq.add(success_task)
-        tq.add(success_task)
-        tq.add(success_task)
-        tq.add(success_task)
+            unlock = threading.Lock()
+            unlock.acquire()
+            check_done = threading.Event()
 
-        tq.join()
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+        finally:
+            tq.join()
 
         self.assertIsNone(tq.error)
 
 
     def test_tq_error(self):
         tq = TaskQueue(threading.Lock(), 2)
-
-        self.assertIsNone(tq.error)
-
-        tq.add(success_task)
-        tq.add(success_task)
-        tq.add(fail_task)
-        tq.add(success_task)
-
-        tq.join()
+        try:
+            self.assertIsNone(tq.error)
+
+            unlock = threading.Lock()
+            unlock.acquire()
+            check_done = threading.Event()
+
+            tq.add(success_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+            tq.add(fail_task, unlock, check_done)
+            tq.add(success_task, unlock, check_done)
+        finally:
+            tq.join()
 
         self.assertIsNotNone(tq.error)