'cwltool==1.0.20180806194258',
'schema-salad==2.7.20180719125426',
'typing >= 3.6.4',
- 'ruamel.yaml >=0.13.11, <0.16',
+ # Need to limit ruamel.yaml version to 0.15.26 because of bug
+ # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping
+ 'ruamel.yaml >=0.13.11, <= 0.15.26',
'arvados-python-client>=1.1.4.20180607143841',
'setuptools',
- 'ciso8601 >=1.0.6, <2.0.0'
+ 'ciso8601 >=1.0.6, <2.0.0',
+ 'subprocess32>=3.5.1',
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
# SPDX-License-Identifier: Apache-2.0
import arvados_cwl
+ import arvados_cwl.context
from arvados_cwl.arvdocker import arv_docker_clear_cache
+import arvados.config
import logging
import mock
import unittest
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
+class CollectionMock(object):
+ def __init__(self, vwdmock, *args, **kwargs):
+ self.vwdmock = vwdmock
+ self.count = 0
+
+ def open(self, *args, **kwargs):
+ self.count += 1
+ return self.vwdmock.open(*args, **kwargs)
+
+ def copy(self, *args, **kwargs):
+ self.count += 1
+ self.vwdmock.copy(*args, **kwargs)
+
+ def save_new(self, *args, **kwargs):
+ pass
+
+ def __len__(self):
+ return self.count
+
+ def portable_data_hash(self):
+ if self.count == 0:
+ return arvados.config.EMPTY_BLOCK_LOCATOR
+ else:
+ return "99999999999999999999999999999996+99"
+
+
class TestContainer(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"}})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "containers",
+ "basedir": "",
+ "name": "test_run_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "tmpdir": "/tmp",
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ mockc = mock.MagicMock()
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+ mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/scatter2_subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/echo-subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
for root, dirs, iterfiles in os.walk(localpath):
root = root[len(pathprefix):]
for src in iterfiles:
- write_file(collection, pathprefix, os.path.join(root, src))
+ write_file(collection, pathprefix, os.path.join(root, src), not packed)
- filters=[["portable_data_hash", "=", collection.portable_data_hash()]]
- if name:
- filters.append(["name", "like", name+"%"])
- if project:
- filters.append(["owner_uuid", "=", project])
+ pdh = None
+ if len(collection) > 0:
+ # non-empty collection
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
+ name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
+ if name:
+ filters.append(["name", "=", name_pdh])
+ if project:
+ filters.append(["owner_uuid", "=", project])
- exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
+ # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
+ # and there is a potential race with other workflows that may have created the collection
+ # between when we list it and find it does not exist and when we attempt to create it.
+ tries = 2
+ while pdh is None and tries > 0:
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
- if exists["items"]:
- item = exists["items"][0]
- pdh = item["portable_data_hash"]
- logger.info("Using collection %s (%s)", pdh, item["uuid"])
- elif len(collection) > 0:
- collection.save_new(name=name, owner_uuid=project, ensure_unique_name=True)
+ if exists["items"]:
+ item = exists["items"][0]
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])
+ else:
+ try:
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ except arvados.errors.ApiError as ae:
+ tries -= 1
+ if pdh is None:
+ # Something weird going on here, probably a collection
+ # with a conflicting name but wrong PDH. We won't
+ # able to reuse it but we still need to save our
+ # collection, so so save it with unique name.
+ logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ else:
+ # empty collection
pdh = collection.portable_data_hash()
- logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
+ logger.info("Using empty collection %s", pdh)
for c in files:
c.keepref = "%s/%s" % (pdh, c.fn)