From 42cb6a6d7679c5dc90adc14da57bb5691930e0f0 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 27 Nov 2018 17:26:07 -0500 Subject: [PATCH] 14510: Update tests/docs for --collection-cache-size Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- .../cwl/cwl-extensions.html.textile.liquid | 3 +- sdk/cwl/arvados_cwl/arv-cwl-schema.yml | 7 ++ sdk/cwl/arvados_cwl/executor.py | 1 + sdk/cwl/arvados_cwl/task_queue.py | 38 ++++----- sdk/cwl/tests/test_submit.py | 79 ++++++++++++++----- sdk/cwl/tests/test_tq.py | 40 ++++++---- 6 files changed, 112 insertions(+), 56 deletions(-) diff --git a/doc/user/cwl/cwl-extensions.html.textile.liquid b/doc/user/cwl/cwl-extensions.html.textile.liquid index f2dd937d95..d62002237a 100644 --- a/doc/user/cwl/cwl-extensions.html.textile.liquid +++ b/doc/user/cwl/cwl-extensions.html.textile.liquid @@ -138,7 +138,8 @@ table(table table-bordered table-condensed). |_. Field |_. Type |_. Description | |ramMin|int|RAM, in mebibytes, to reserve for the arvados-cwl-runner process. Default 1 GiB| |coresMin|int|Number of cores to reserve to the arvados-cwl-runner process. Default 1 core.| -|keep_cache|int|RAM, in mebibytes, to reserve for caching keep collection metadata. Default 256 MiB| +|keep_cache|int|Size of collection metadata cache for the workflow runner, in MiB. Default 256 MiB. Will be added on to the RAM request when determining node size to request.| + h2(#clustertarget). arv:ClusterTarget Specify which Arvados cluster should execute a container or subworkflow, and the parent project for the container request. diff --git a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml index 902b1ffba2..dce1bd4d02 100644 --- a/sdk/cwl/arvados_cwl/arv-cwl-schema.yml +++ b/sdk/cwl/arvados_cwl/arv-cwl-schema.yml @@ -233,6 +233,13 @@ $graph: type: int? doc: Minimum cores allocated to cwl-runner jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin" + keep_cache: + type: int? + doc: | + Size of collection metadata cache for the workflow runner, in + MiB. Default 256 MiB. Will be added on to the RAM request + when determining node size to request. + jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache" - name: ClusterTarget type: record diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 3589fad276..caf954fcb0 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -95,6 +95,7 @@ class ArvCwlExecutor(object): arvargs.output_name = None arvargs.output_tags = None arvargs.thread_count = 1 + arvargs.collection_cache_size = None self.api = api_client self.processes = {} diff --git a/sdk/cwl/arvados_cwl/task_queue.py b/sdk/cwl/arvados_cwl/task_queue.py index 018172b591..1c233fac0a 100644 --- a/sdk/cwl/arvados_cwl/task_queue.py +++ b/sdk/cwl/arvados_cwl/task_queue.py @@ -23,36 +23,36 @@ class TaskQueue(object): t.start() def task_queue_func(self): + while True: + task = self.task_queue.get() + if task is None: + return + try: + task() + except Exception as e: + logger.exception("Unhandled exception running task") + self.error = e - while True: - task = self.task_queue.get() - if task is None: - return - try: - task() - except Exception as e: - logger.exception("Unhandled exception running task") - self.error = e - - with self.lock: - self.in_flight -= 1 + with self.lock: + self.in_flight -= 1 def add(self, task, unlock, check_done): - with self.lock: - if self.thread_count > 1: + if self.thread_count > 1: + with self.lock: self.in_flight += 1 - else: - task() - return + else: + task() + return while True: try: unlock.release() + if check_done.is_set(): + return self.task_queue.put(task, block=True, timeout=3) return except Queue.Full: - if check_done.is_set(): - return + pass finally: unlock.acquire() diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index bf2791d728..a7a21e709e 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -274,7 +274,7 @@ def stubs(func): 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'], 'name': 'submit_wf.cwl', 'container_image': '999999999999999999999999999999d3+99', @@ -283,7 +283,7 @@ def stubs(func): 'runtime_constraints': { 'API': True, 'vcpus': 1, - 'ram': 1024*1024*1024 + 'ram': (1024+256)*1024*1024 }, 'use_existing': True, 'properties': {}, @@ -559,7 +559,8 @@ class TestSubmit(unittest.TestCase): 'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--disable-reuse', '--debug', '--on-error=continue', + '--disable-reuse', "--collection-cache-size=256", + '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] expect_container["use_existing"] = False @@ -584,7 +585,7 @@ class TestSubmit(unittest.TestCase): 'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--disable-reuse', '--debug', '--on-error=continue', + '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] expect_container["use_existing"] = False expect_container["name"] = "submit_wf_no_reuse.cwl" @@ -621,7 +622,8 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=stop', + '--enable-reuse', "--collection-cache-size=256", + '--debug', '--on-error=stop', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] stubs.api.container_requests().create.assert_called_with( @@ -647,7 +649,7 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', + '--enable-reuse', "--collection-cache-size=256", "--output-name="+output_name, '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] expect_container["output_name"] = output_name @@ -673,7 +675,7 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', "--debug", + '--enable-reuse', "--collection-cache-size=256", "--debug", "--storage-classes=foo", '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] @@ -740,7 +742,8 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", '--debug', + '--on-error=continue', "--intermediate-output-ttl=3600", '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] @@ -765,7 +768,8 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", + '--debug', '--on-error=continue', "--trash-intermediate", '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] @@ -792,7 +796,7 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', + '--enable-reuse', "--collection-cache-size=256", "--output-tags="+output_tags, '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] @@ -814,7 +818,7 @@ class TestSubmit(unittest.TestCase): logging.exception("") expect_container = copy.deepcopy(stubs.expect_container_spec) - expect_container["runtime_constraints"]["ram"] = 2048*1024*1024 + expect_container["runtime_constraints"]["ram"] = (2048+256)*1024*1024 stubs.api.container_requests().create.assert_called_with( body=JsonDiffMatcher(expect_container)) @@ -877,13 +881,13 @@ class TestSubmit(unittest.TestCase): 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue', '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'], 'cwd': '/var/spool/cwl', 'runtime_constraints': { 'API': True, 'vcpus': 1, - 'ram': 1073741824 + 'ram': 1342177280 }, 'use_existing': True, 'properties': {}, @@ -999,13 +1003,13 @@ class TestSubmit(unittest.TestCase): 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'], 'cwd': '/var/spool/cwl', 'runtime_constraints': { 'API': True, 'vcpus': 1, - 'ram': 1073741824 + 'ram': 1342177280 }, 'use_existing': True, 'properties': { @@ -1059,7 +1063,8 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', "--eval-timeout=20", "--thread-count=4", - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", '--debug', + '--on-error=continue', '--project-uuid='+project_uuid, '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] @@ -1085,7 +1090,34 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=60.0', '--thread-count=4', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", + '--debug', '--on-error=continue', + '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] + + stubs.api.container_requests().create.assert_called_with( + body=JsonDiffMatcher(expect_container)) + self.assertEqual(capture_stdout.getvalue(), + stubs.expect_container_request_uuid + '\n') + + @stubs + def test_submit_container_collection_cache(self, stubs): + project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz' + capture_stdout = cStringIO.StringIO() + try: + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", "--collection-cache-size=500", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client) + self.assertEqual(exited, 0) + except: + logging.exception("") + + expect_container = copy.deepcopy(stubs.expect_container_spec) + expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', + '--no-log-timestamps', '--disable-validate', + '--eval-timeout=60.0', '--thread-count=4', + '--enable-reuse', "--collection-cache-size=500", + '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] stubs.api.container_requests().create.assert_called_with( @@ -1111,7 +1143,8 @@ class TestSubmit(unittest.TestCase): expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps', '--disable-validate', '--eval-timeout=20', '--thread-count=20', - '--enable-reuse', '--debug', '--on-error=continue', + '--enable-reuse', "--collection-cache-size=256", + '--debug', '--on-error=continue', '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] stubs.api.container_requests().create.assert_called_with( @@ -1197,7 +1230,7 @@ class TestSubmit(unittest.TestCase): expect_container["runtime_constraints"] = { "API": True, "vcpus": 2, - "ram": 2000 * 2**20 + "ram": (2000+512) * 2**20 } expect_container["name"] = "submit_wf_runner_resources.cwl" expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [ @@ -1211,6 +1244,11 @@ class TestSubmit(unittest.TestCase): expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = { "arv": "http://arvados.org/cwl#", } + expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers', + '--no-log-timestamps', '--disable-validate', + '--eval-timeout=20', '--thread-count=4', + '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue', + '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'] stubs.api.container_requests().create.assert_called_with( body=JsonDiffMatcher(expect_container)) @@ -1280,6 +1318,7 @@ class TestSubmit(unittest.TestCase): "--eval-timeout=20", '--thread-count=4', "--enable-reuse", + "--collection-cache-size=256", '--debug', "--on-error=continue", "/var/lib/cwl/workflow.json#main", @@ -1407,7 +1446,7 @@ class TestSubmit(unittest.TestCase): "properties": {}, "runtime_constraints": { "API": True, - "ram": 1073741824, + "ram": 1342177280, "vcpus": 1 }, "secret_mounts": { diff --git a/sdk/cwl/tests/test_tq.py b/sdk/cwl/tests/test_tq.py index 2afbe0cff2..a094890650 100644 --- a/sdk/cwl/tests/test_tq.py +++ b/sdk/cwl/tests/test_tq.py @@ -22,29 +22,37 @@ def fail_task(): class TestTaskQueue(unittest.TestCase): def test_tq(self): tq = TaskQueue(threading.Lock(), 2) + try: + self.assertIsNone(tq.error) - self.assertIsNone(tq.error) - - tq.add(success_task) - tq.add(success_task) - tq.add(success_task) - tq.add(success_task) + unlock = threading.Lock() + unlock.acquire() + check_done = threading.Event() - tq.join() + tq.add(success_task, unlock, check_done) + tq.add(success_task, unlock, check_done) + tq.add(success_task, unlock, check_done) + tq.add(success_task, unlock, check_done) + finally: + tq.join() self.assertIsNone(tq.error) def test_tq_error(self): tq = TaskQueue(threading.Lock(), 2) - - self.assertIsNone(tq.error) - - tq.add(success_task) - tq.add(success_task) - tq.add(fail_task) - tq.add(success_task) - - tq.join() + try: + self.assertIsNone(tq.error) + + unlock = threading.Lock() + unlock.acquire() + check_done = threading.Event() + + tq.add(success_task, unlock, check_done) + tq.add(success_task, unlock, check_done) + tq.add(fail_task, unlock, check_done) + tq.add(success_task, unlock, check_done) + finally: + tq.join() self.assertIsNotNone(tq.error) -- 2.30.2