From: Peter Amstutz Date: Fri, 24 Aug 2018 19:28:16 +0000 (-0400) Subject: Merge branch 'master' into 14075-uploadfiles X-Git-Tag: 1.2.0~9^2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/95e5ccacf6c1193b313fa90a6d39baafa2ba67d8 Merge branch 'master' into 14075-uploadfiles Arvados-DCO-1.1-Signed-off-by: Peter Amstutz --- 95e5ccacf6c1193b313fa90a6d39baafa2ba67d8 diff --cc sdk/cwl/setup.py index bdbe778292,e452ce2644..2b7b31b9f3 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@@ -36,12 -36,11 +36,13 @@@ setup(name='arvados-cwl-runner' 'cwltool==1.0.20180806194258', 'schema-salad==2.7.20180719125426', 'typing >= 3.6.4', - 'ruamel.yaml >=0.13.11, <0.16', + # Need to limit ruamel.yaml version to 0.15.26 because of bug + # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping + 'ruamel.yaml >=0.13.11, <= 0.15.26', 'arvados-python-client>=1.1.4.20180607143841', 'setuptools', - 'ciso8601 >=1.0.6, <2.0.0' + 'ciso8601 >=1.0.6, <2.0.0', + 'subprocess32>=3.5.1', ], data_files=[ ('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']), diff --cc sdk/cwl/tests/test_container.py index 0209d2eba9,3f8a32816d..69f3ae046e --- a/sdk/cwl/tests/test_container.py +++ b/sdk/cwl/tests/test_container.py @@@ -3,8 -3,8 +3,9 @@@ # SPDX-License-Identifier: Apache-2.0 import arvados_cwl + import arvados_cwl.context from arvados_cwl.arvdocker import arv_docker_clear_cache +import arvados.config import logging import mock import unittest @@@ -21,35 -21,30 +22,56 @@@ if not os.getenv('ARVADOS_DEBUG') logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN) logging.getLogger('arvados.arv-run').setLevel(logging.WARN) - +class CollectionMock(object): + def __init__(self, vwdmock, *args, **kwargs): + self.vwdmock = vwdmock + self.count = 0 + + def open(self, *args, **kwargs): + self.count += 1 + return self.vwdmock.open(*args, **kwargs) + + def copy(self, *args, **kwargs): + self.count += 1 + self.vwdmock.copy(*args, **kwargs) + + def save_new(self, *args, **kwargs): + pass + + def __len__(self): + return self.count + + def portable_data_hash(self): + if self.count == 0: + return arvados.config.EMPTY_BLOCK_LOCATOR + else: + return "99999999999999999999999999999996+99" + + class TestContainer(unittest.TestCase): + def helper(self, runner, enable_reuse=True): + document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0") + + make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, + collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) + loadingContext = arvados_cwl.context.ArvLoadingContext( + {"avsc_names": avsc_names, + "basedir": "", + "make_fs_access": make_fs_access, + "loader": Loader({}), + "metadata": {"cwlVersion": "v1.0"}}) + runtimeContext = arvados_cwl.context.ArvRuntimeContext( + {"work_api": "containers", + "basedir": "", + "name": "test_run_"+str(enable_reuse), + "make_fs_access": make_fs_access, + "tmpdir": "/tmp", + "enable_reuse": enable_reuse, + "priority": 500}) + + return loadingContext, runtimeContext + # The test passes no builder.resources # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024} @mock.patch("arvados.commands.keepdocker.list_images_in_arv") diff --cc sdk/cwl/tests/test_job.py index 1a3f6a9c8e,4473b88ca0..20efe15139 --- a/sdk/cwl/tests/test_job.py +++ b/sdk/cwl/tests/test_job.py @@@ -331,28 -382,21 +383,22 @@@ class TestWorkflow(unittest.TestCase) runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" runner.ignore_docker_for_reuse = False runner.num_retries = 0 - document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0") - make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, - collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) - document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access("")) - document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session) - document_loader.fetch_text = document_loader.fetcher.fetch_text - document_loader.check_exists = document_loader.fetcher.check_exists + loadingContext, runtimeContext = self.helper(runner) - tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl") + tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl") metadata["cwlVersion"] = tool["cwlVersion"] - mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118" + mockc = mock.MagicMock() + mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs) + mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt") - arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=document_loader, - makeTool=runner.arv_make_tool, metadata=metadata) + arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext) arvtool.formatgraph = None - it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="") - it.next().run() - it.next().run() + it = arvtool.job({}, mock.MagicMock(), runtimeContext) + + it.next().run(runtimeContext) + it.next().run(runtimeContext) with open("tests/wf/scatter2_subwf.cwl") as f: subwf = StripYAMLComments(f.read()) @@@ -418,27 -463,19 +465,19 @@@ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz" runner.ignore_docker_for_reuse = False runner.num_retries = 0 - document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0") - make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, - collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0)) - document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access("")) - document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session) - document_loader.fetch_text = document_loader.fetcher.fetch_text - document_loader.check_exists = document_loader.fetcher.check_exists + loadingContext, runtimeContext = self.helper(runner) - tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl") + tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl") metadata["cwlVersion"] = tool["cwlVersion"] - mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118" + mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs) - arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names, - basedir="", make_fs_access=make_fs_access, loader=document_loader, - makeTool=runner.arv_make_tool, metadata=metadata) + arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext) arvtool.formatgraph = None - it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="") - it.next().run() - it.next().run() + it = arvtool.job({}, mock.MagicMock(), runtimeContext) + it.next().run(runtimeContext) + it.next().run(runtimeContext) with open("tests/wf/echo-subwf.cwl") as f: subwf = StripYAMLComments(f.read()) diff --cc sdk/python/arvados/commands/run.py index 5063d75f2a,c4748fa995..96f5bdd44a --- a/sdk/python/arvados/commands/run.py +++ b/sdk/python/arvados/commands/run.py @@@ -218,50 -205,24 +219,50 @@@ def uploadfiles(files, api, dry_run=Fal for root, dirs, iterfiles in os.walk(localpath): root = root[len(pathprefix):] for src in iterfiles: - write_file(collection, pathprefix, os.path.join(root, src)) + write_file(collection, pathprefix, os.path.join(root, src), not packed) - filters=[["portable_data_hash", "=", collection.portable_data_hash()]] - if name: - filters.append(["name", "like", name+"%"]) - if project: - filters.append(["owner_uuid", "=", project]) + pdh = None + if len(collection) > 0: + # non-empty collection + filters = [["portable_data_hash", "=", collection.portable_data_hash()]] + name_pdh = "%s (%s)" % (name, collection.portable_data_hash()) + if name: + filters.append(["name", "=", name_pdh]) + if project: + filters.append(["owner_uuid", "=", project]) - exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries) + # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False` + # and there is a potential race with other workflows that may have created the collection + # between when we list it and find it does not exist and when we attempt to create it. + tries = 2 + while pdh is None and tries > 0: + exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries) - if exists["items"]: - item = exists["items"][0] - pdh = item["portable_data_hash"] - logger.info("Using collection %s (%s)", pdh, item["uuid"]) - elif len(collection) > 0: - collection.save_new(name=name, owner_uuid=project, ensure_unique_name=True) + if exists["items"]: + item = exists["items"][0] + pdh = item["portable_data_hash"] + logger.info("Using collection %s (%s)", pdh, item["uuid"]) + else: + try: + collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False) + pdh = collection.portable_data_hash() + logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) + except arvados.errors.ApiError as ae: + tries -= 1 + if pdh is None: + # Something weird going on here, probably a collection + # with a conflicting name but wrong PDH. We won't + # able to reuse it but we still need to save our + # collection, so so save it with unique name. + logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh) + collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True) + pdh = collection.portable_data_hash() + logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) + else: + # empty collection pdh = collection.portable_data_hash() - logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator()) + assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh + logger.info("Using empty collection %s", pdh) for c in files: c.keepref = "%s/%s" % (pdh, c.fn)