From 58b12d248ed05f8b75b16cee33b0e153e7be71f6 Mon Sep 17 00:00:00 2001 From: Peter Amstutz Date: Tue, 12 Mar 2019 16:39:12 -0400 Subject: [PATCH] 14322: Tests for edge cases Report unknown uuids and mismatches between current collection PDH (from API server lookup) and location PDH. Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- sdk/cwl/arvados_cwl/__init__.py | 2 +- sdk/cwl/arvados_cwl/executor.py | 4 +- sdk/cwl/arvados_cwl/runner.py | 59 +++++++++++++++++------ sdk/cwl/tests/test_submit.py | 83 ++++++++++++++++++++++++++++----- 4 files changed, 119 insertions(+), 29 deletions(-) diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index 834ca195fd..95711762c9 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -293,7 +293,7 @@ def main(args, stdout, stderr, api_client=None, keep_client=None, logger.exception("Error creating the Arvados CWL Executor") return 1 - # Note that unless in debug mode, some stack traces related to user + # Note that unless in debug mode, some stack traces related to user # workflow errors may be suppressed. See ArvadosJob.done(). if arvargs.debug: logger.setLevel(logging.DEBUG) diff --git a/sdk/cwl/arvados_cwl/executor.py b/sdk/cwl/arvados_cwl/executor.py index 319e8a8871..c358426166 100644 --- a/sdk/cwl/arvados_cwl/executor.py +++ b/sdk/cwl/arvados_cwl/executor.py @@ -87,7 +87,7 @@ class RuntimeStatusLoggingHandler(logging.Handler): ) finally: self.updatingRuntimeStatus = False - + class ArvCwlExecutor(object): """Execute a CWL tool or workflow, submit work (using either jobs or @@ -475,7 +475,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods with final.open("cwl.output.json", "w") as f: res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False)) - f.write(res) + f.write(res) final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 39620a55f3..d30445ab33 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -31,6 +31,7 @@ from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class from cwltool.utils import aslist from cwltool.builder import substitute from cwltool.pack import pack +import schema_salad.validate as validate import arvados.collection from .util import collectionUUID @@ -90,6 +91,7 @@ def discover_secondary_files(inputs, job_order, discovered=None): setSecondary(t, job_order[shortname(t["id"])], discovered) collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$') +collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?') def upload_dependencies(arvrunner, name, document_loader, workflowobj, uri, loadref_run, @@ -139,8 +141,21 @@ def upload_dependencies(arvrunner, name, document_loader, loadref, urljoin=document_loader.fetcher.urljoin) sc = [] - uuids = [] - def dependencies_needing_transformation(obj): + uuids = {} + + def collect_uuids(obj): + loc = obj.get("location", "") + sp = loc.split(":") + if sp[0] == "keep": + # Collect collection uuids that need to be resolved to + # portable data hashes + gp = collection_uuid_pattern.match(loc) + if gp: + uuids[gp.groups()[0]] = obj + if collectionUUID in obj: + uuids[obj[collectionUUID]] = obj + + def collect_uploads(obj): loc = obj.get("location", "") sp = loc.split(":") if len(sp) < 1: @@ -149,19 +164,18 @@ def upload_dependencies(arvrunner, name, document_loader, # Record local files than need to be uploaded, # don't include file literals, keep references, etc. sc.append(obj) - elif sp[0] == "keep": - # Collect collection uuids that need to be resolved to - # portable data hashes - gp = collection_uuid_pattern.match(loc) - if gp: - uuids.append(gp.groups()[0]) + collect_uuids(obj) - visit_class(sc_result, ("File", "Directory"), dependencies_needing_transformation) + visit_class(workflowobj, ("File", "Directory"), collect_uuids) + visit_class(sc_result, ("File", "Directory"), collect_uploads) + # Resolve any collection uuids we found to portable data hashes + # and assign them to uuid_map uuid_map = {} - while uuids: + fetch_uuids = list(uuids.keys()) + while fetch_uuids: lookups = arvrunner.api.collections().list( - filters=[["uuid", "in", uuids]], + filters=[["uuid", "in", fetch_uuids]], count="none", select=["uuid", "portable_data_hash"]).execute( num_retries=arvrunner.num_retries) @@ -172,7 +186,7 @@ def upload_dependencies(arvrunner, name, document_loader, for l in lookups["items"]: uuid_map[l["uuid"]] = l["portable_data_hash"] - uuids = [u for u in uuids if u not in uuid_map] + fetch_uuids = [u for u in fetch_uuids if u not in uuid_map] normalizeFilesDirs(sc) @@ -227,14 +241,31 @@ def upload_dependencies(arvrunner, name, document_loader, if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")): p["location"] = mapper.mapper(p["location"]).resolved return - if not uuid_map: + + if not loc: return + + if collectionUUID in p: + uuid = p[collectionUUID] + if uuid not in uuid_map: + raise SourceLine(p, collectionUUID, validate.ValidationException).makeError( + "Collection uuid %s not found" % uuid) + gp = collection_pdh_pattern.match(loc) + if gp and uuid_map[uuid] != gp.groups()[0]: + # This file entry has both collectionUUID and a PDH + # location. If the PDH doesn't match the one returned + # the API server, raise an error. + raise SourceLine(p, "location", validate.ValidationException).makeError( + "Expected collection uuid %s to be %s but API server reported %s" % ( + uuid, gp.groups()[0], uuid_map[p[collectionUUID]])) + gp = collection_uuid_pattern.match(loc) if not gp: return uuid = gp.groups()[0] if uuid not in uuid_map: - raise Exception("Cannot resolve uuid %s" % uuid) + raise SourceLine(p, "location", validate.ValidationException).makeError( + "Collection uuid %s not found" % uuid) p["location"] = "keep:%s%s" % (uuid_map[uuid], gp.groups()[1] if gp.groups()[1] else "") p[collectionUUID] = uuid diff --git a/sdk/cwl/tests/test_submit.py b/sdk/cwl/tests/test_submit.py index 4218ec137b..76b0f89f15 100644 --- a/sdk/cwl/tests/test_submit.py +++ b/sdk/cwl/tests/test_submit.py @@ -112,6 +112,11 @@ def stubs(func): "portable_data_hash": "99999999999999999999999999999998+99", "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt" }, + "99999999999999999999999999999997+99": { + "uuid": "", + "portable_data_hash": "99999999999999999999999999999997+99", + "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt" + }, "99999999999999999999999999999994+99": { "uuid": "", "portable_data_hash": "99999999999999999999999999999994+99", @@ -1451,7 +1456,7 @@ class TestSubmit(unittest.TestCase): stubs.api.collections().list.assert_has_calls([ mock.call(count='none', - filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz', 'zzzzz-4zz18-zzzzzzzzzzzzzzz']]], + filters=[['uuid', 'in', ['zzzzz-4zz18-zzzzzzzzzzzzzzz']]], select=['uuid', 'portable_data_hash'])]) stubs.api.container_requests().create.assert_called_with( body=JsonDiffMatcher(expect_container)) @@ -1459,6 +1464,58 @@ class TestSubmit(unittest.TestCase): stubs.expect_container_request_uuid + '\n') self.assertEqual(exited, 0) + @stubs + def test_submit_mismatched_uuid_inputs(self, stubs): + def list_side_effect(**kwargs): + m = mock.MagicMock() + if "count" in kwargs: + m.execute.return_value = {"items": [ + {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz", "portable_data_hash": "99999999999999999999999999999997+99"} + ]} + else: + m.execute.return_value = {"items": []} + return m + stubs.api.collections().list.side_effect = list_side_effect + + for infile in ("tests/submit_test_job_with_mismatched_uuids.json", "tests/submit_test_job_with_inconsistent_uuids.json"): + capture_stderr = io.StringIO() + cwltool_logger = logging.getLogger('cwltool') + stderr_logger = logging.StreamHandler(capture_stderr) + cwltool_logger.addHandler(stderr_logger) + + try: + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", + "tests/wf/submit_wf.cwl", infile], + stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client) + + self.assertEqual(exited, 1) + self.assertRegexpMatches( + capture_stderr.getvalue(), + r"Expected collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz to be 99999999999999999999999999999998\+99 but API server reported 99999999999999999999999999999997\+99") + finally: + cwltool_logger.removeHandler(stderr_logger) + + @stubs + def test_submit_unknown_uuid_inputs(self, stubs): + capture_stderr = io.StringIO() + cwltool_logger = logging.getLogger('cwltool') + stderr_logger = logging.StreamHandler(capture_stderr) + cwltool_logger.addHandler(stderr_logger) + + exited = arvados_cwl.main( + ["--submit", "--no-wait", "--api=containers", "--debug", + "tests/wf/submit_wf.cwl", "tests/submit_test_job_with_uuids.json"], + stubs.capture_stdout, capture_stderr, api_client=stubs.api, keep_client=stubs.keep_client) + + try: + self.assertEqual(exited, 1) + self.assertRegexpMatches( + capture_stderr.getvalue(), + r"Collection uuid zzzzz-4zz18-zzzzzzzzzzzzzzz not found") + finally: + cwltool_logger.removeHandler(stderr_logger) + class TestCreateTemplate(unittest.TestCase): existing_template_uuid = "zzzzz-d1hrv-validworkfloyml" @@ -1648,17 +1705,19 @@ class TestCreateWorkflow(unittest.TestCase): stderr_logger = logging.StreamHandler(capture_stderr) acr_logger.addHandler(stderr_logger) - exited = arvados_cwl.main( - ["--update-workflow", self.existing_workflow_uuid, - "--api=jobs", - "--debug", - "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], - sys.stderr, sys.stderr, api_client=stubs.api) - self.assertEqual(exited, 1) - self.assertRegexpMatches( - capture_stderr.getvalue(), - "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid)) - acr_logger.removeHandler(stderr_logger) + try: + exited = arvados_cwl.main( + ["--update-workflow", self.existing_workflow_uuid, + "--api=jobs", + "--debug", + "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"], + sys.stderr, sys.stderr, api_client=stubs.api) + self.assertEqual(exited, 1) + self.assertRegexpMatches( + capture_stderr.getvalue(), + "--update-workflow arg '{}' uses 'containers' API, but --api='jobs' specified".format(self.existing_workflow_uuid)) + finally: + acr_logger.removeHandler(stderr_logger) @stubs def test_update(self, stubs): -- 2.30.2