'cwltool==1.0.20180806194258',
'schema-salad==2.7.20180719125426',
'typing >= 3.6.4',
- 'ruamel.yaml >=0.13.11, <0.16',
+ # Need to limit ruamel.yaml version to 0.15.26 because of bug
+ # https://bitbucket.org/ruamel/yaml/issues/227/regression-parsing-flow-mapping
+ 'ruamel.yaml >=0.13.11, <= 0.15.26',
'arvados-python-client>=1.1.4.20180607143841',
'setuptools',
- 'ciso8601 >=1.0.6, <2.0.0'
+ 'ciso8601 >=1.0.6, <2.0.0',
+ 'subprocess32>=3.5.1',
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
],
test_suite='tests',
- tests_require=['mock>=1.0'],
+ tests_require=[
+ 'mock>=1.0',
+ 'subprocess32>=3.5.1',
+ ],
zip_safe=True
)
# SPDX-License-Identifier: Apache-2.0
import arvados_cwl
+ import arvados_cwl.context
from arvados_cwl.arvdocker import arv_docker_clear_cache
+import arvados.config
import logging
import mock
import unittest
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
+class CollectionMock(object):
+ def __init__(self, vwdmock, *args, **kwargs):
+ self.vwdmock = vwdmock
+ self.count = 0
+
+ def open(self, *args, **kwargs):
+ self.count += 1
+ return self.vwdmock.open(*args, **kwargs)
+
+ def copy(self, *args, **kwargs):
+ self.count += 1
+ self.vwdmock.copy(*args, **kwargs)
+
+ def save_new(self, *args, **kwargs):
+ pass
+
+ def __len__(self):
+ return self.count
+
+ def portable_data_hash(self):
+ if self.count == 0:
+ return arvados.config.EMPTY_BLOCK_LOCATOR
+ else:
+ return "99999999999999999999999999999996+99"
+
+
class TestContainer(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"}})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "containers",
+ "basedir": "",
+ "name": "test_run_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "tmpdir": "/tmp",
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.commands.keepdocker.list_images_in_arv")
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
tool = cmap({
"inputs": [],
"outputs": [],
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=enable_reuse, priority=500)
+
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
"capacity": 1073741824 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_run_'+str(enable_reuse),
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
runner.intermediate_output_ttl = 3600
runner.secret_store = cwltool.secrets.SecretStore()
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_resource_requirements"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=True, priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
call_args, call_kwargs = runner.api.container_requests().create.call_args
"capacity": 5242880000 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_resource_requirements',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 7200,
runner.intermediate_output_ttl = 0
runner.secret_store = cwltool.secrets.SecretStore()
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
runner.fs_access.get_collection.side_effect = get_collection_mock
vwdmock = mock.MagicMock()
- collection_mock.return_value = vwdmock
- vwdmock.portable_data_hash.return_value = "99999999999999999999999999999996+99"
+ collection_mock.side_effect = lambda *args, **kwargs: CollectionMock(vwdmock, *args, **kwargs)
tool = cmap({
"inputs": [],
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
- avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}), metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_initial_work_dir"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
call_args, call_kwargs = runner.api.container_requests().create.call_args
}
},
'state': 'Committed',
+ 'output_name': 'Output for step test_initial_work_dir',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_run_redirect"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
},
},
'state': 'Committed',
+ "output_name": "Output for step test_run_redirect",
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
col().open.return_value = []
- arvjob = arvados_cwl.ArvadosContainer(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosContainer(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.successCodes = [0]
keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
runner.api.collections().get().execute.return_value = {
- "portable_data_hash": "99999999999999999999999999999993+99"}
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_run_mounts"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
job_order = {
"p1": {
]
}
}
- for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_run_mounts",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(priority=500)
+ for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
"capacity": 1073741824 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_run_mounts',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
]
}
]})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_secrets"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
job_order = {"pw": "blorp"}
runner.secret_store.store(["pw"], job_order)
- for j in arvtool.job(job_order, mock.MagicMock(), basedir="", name="test_secrets",
- make_fs_access=make_fs_access, tmpdir="/tmp"):
- j.run(enable_reuse=True, priority=500)
+ for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher({
'environment': {
"capacity": 1073741824 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_secrets',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
}
}
}))
+
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_timelimit(self, keepdocker):
+ arv_docker_clear_cache()
+
+ runner = mock.MagicMock()
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.intermediate_output_ttl = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ tool = cmap({
+ "inputs": [],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}],
+ "id": "#",
+ "class": "CommandLineTool",
+ "hints": [
+ {
+ "class": "http://commonwl.org/cwltool#TimeLimit",
+ "timelimit": 42
+ }
+ ]
+ })
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_timelimit"
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
+ arvtool.formatgraph = None
+
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
+
+ _, kwargs = runner.api.container_requests().create.call_args
+ self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
from schema_salad.sourceline import cmap
from .mock_discovery import get_rootDesc
from .matcher import JsonDiffMatcher, StripYAMLComments
+from .test_container import CollectionMock
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
class TestJob(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": Loader({}),
+ "metadata": {"cwlVersion": "v1.0"},
+ "makeTool": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "jobs",
+ "basedir": "",
+ "name": "test_run_job_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
"id": "#",
"class": "CommandLineTool"
})
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run(enable_reuse=enable_reuse)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
runner.api.links().create.side_effect = ApiError(
mock.MagicMock(return_value={'status': 403}),
'Permission denied')
- j.run(enable_reuse=enable_reuse)
+ j.run(runtimeContext)
else:
assert not runner.api.links().create.called
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
-
tool = {
"inputs": [],
"outputs": [],
"id": "#",
"class": "CommandLineTool"
}
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- make_fs_access=make_fs_access, loader=Loader({}),
- metadata={"cwlVersion": "v1.0"})
+
+ loadingContext, runtimeContext = self.helper(runner)
+
+ arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
arvtool.formatgraph = None
- for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
- j.run(enable_reuse=True)
+ for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
{"items": []},
{"items": [{"manifest_text": "ABC"}]})
- arvjob = arvados_cwl.ArvadosJob(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosJob(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.collect_outputs.return_value = {"out": "stuff"}
{"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
)
- arvjob = arvados_cwl.ArvadosJob(runner)
- arvjob.name = "testjob"
- arvjob.builder = mock.MagicMock()
+ arvjob = arvados_cwl.ArvadosJob(runner,
+ mock.MagicMock(),
+ {},
+ None,
+ [],
+ [],
+ "testjob")
arvjob.output_callback = mock.MagicMock()
arvjob.collect_outputs = mock.MagicMock()
arvjob.collect_outputs.return_value = {"out": "stuff"}
class TestWorkflow(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+
+ document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+ document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+ document_loader.fetch_text = document_loader.fetcher.fetch_text
+ document_loader.check_exists = document_loader.fetcher.check_exists
+
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": document_loader,
+ "metadata": {"cwlVersion": "v1.0"},
+ "construct_tool_object": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "jobs",
+ "basedir": "",
+ "name": "test_run_wf_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
# The test passes no builder.resources
# Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@mock.patch("arvados.collection.CollectionReader")
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ mockc = mock.MagicMock()
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+ mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/scatter2_subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
'HOME': '$(task.outdir)',
'TMPDIR': '$(task.tmpdir)'},
'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+ 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
+ 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
},
'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
'task.stdout': 'cwl.output.json'}]},
['docker_image_locator', 'in docker', 'arvados/jobs']],
find_or_create=True)
- mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
- mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+ mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
+ mockc.open().__enter__().write.assert_has_calls([mock.call(
'''{
"fileblub": {
"basename": "token.txt",
"class": "File",
- "location": "/keep/99999999999999999999999999999999+118/token.txt"
+ "location": "/keep/99999999999999999999999999999999+118/token.txt",
+ "size": 0
},
"sleeptime": 5
}''')])
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
- make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
- collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
- document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
- document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
- document_loader.fetch_text = document_loader.fetcher.fetch_text
- document_loader.check_exists = document_loader.fetcher.check_exists
+ loadingContext, runtimeContext = self.helper(runner)
- tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
- mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
- arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=document_loader,
- makeTool=runner.arv_make_tool, metadata=metadata)
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
- it.next().run()
- it.next().run()
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+ it.next().run(runtimeContext)
+ it.next().run(runtimeContext)
with open("tests/wf/echo-subwf.cwl") as f:
subwf = StripYAMLComments(f.read())
'HOME': '$(task.outdir)',
'TMPDIR': '$(task.tmpdir)'},
'task.vwd': {
- 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
- 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+ 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
+ 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
},
'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
'task.stdout': 'cwl.output.json'}]},
"listing": [{
"basename": "renamed.txt",
"class": "File",
- "location": "keep:99999999999999999999999999999998+99/file1.txt"
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "size": 0
}],
'class': 'Directory'
},
{
'basename': 'renamed.txt',
'class': 'File', 'location':
- 'keep:99999999999999999999999999999998+99/file1.txt'
+ 'keep:99999999999999999999999999999998+99/file1.txt',
+ 'size': 0
}
]}},
'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main',
'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
{'basename': 'renamed.txt',
'class': 'File',
- 'location': 'keep:99999999999999999999999999999998+99/file1.txt'
+ 'location': 'keep:99999999999999999999999999999998+99/file1.txt',
+ 'size': 0
}
]}
},
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies',
- }), ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ }), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input',
- }), ensure_unique_name=True),
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ }), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 61df2ed9ee3eb7dd9b799e5ca35305fa+1217 0:1217:workflow.cwl\n',
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_pipeline_uuid + '\n')
+ @stubs
+ def test_error_when_multiple_storage_classes_specified(self, stubs):
+ storage_classes = "foo,bar"
+ exited = arvados_cwl.main(
+ ["--debug", "--storage-classes", storage_classes,
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdin, sys.stderr, api_client=stubs.api)
+ self.assertEqual(exited, 1)
+
@mock.patch("time.sleep")
@stubs
def test_submit_on_error(self, stubs, tm):
'manifest_text':
'. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies',
- }), ensure_unique_name=True),
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ }), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input',
- }), ensure_unique_name=True)])
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ }), ensure_unique_name=False)])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_storage_classes(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', "--debug",
+ "--storage-classes=foo", '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+ @mock.patch("arvados_cwl.task_queue.TaskQueue")
+ @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
+ @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+ @stubs
+ def test_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+ def set_final_output(job_order, output_callback, runtimeContext):
+ output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ return []
+ job.side_effect = set_final_output
+
+ try:
+ exited = arvados_cwl.main(
+ ["--debug", "--local", "--storage-classes=foo",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+
+ @mock.patch("arvados_cwl.task_queue.TaskQueue")
+ @mock.patch("arvados_cwl.arvworkflow.ArvadosWorkflow.job")
+ @mock.patch("arvados_cwl.ArvCwlRunner.make_output_collection", return_value = (None, None))
+ @stubs
+ def test_default_storage_classes_correctly_propagate_to_make_output_collection(self, stubs, make_output, job, tq):
+ def set_final_output(job_order, output_callback, runtimeContext):
+ output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ return []
+ job.side_effect = set_final_output
+
+ try:
+ exited = arvados_cwl.main(
+ ["--debug", "--local",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ sys.stdin, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
@stubs
def test_submit_container_output_ttl(self, stubs):
@stubs
def test_submit_file_keepref(self, stubs, tm, collectionReader):
capture_stdout = cStringIO.StringIO()
+ collectionReader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "blorp.txt")
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/submit_keepref_wf.cwl"],
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_wf_runner_resources(self, stubs):
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug",
+ "tests/wf/submit_wf_runner_resources.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["runtime_constraints"] = {
+ "API": True,
+ "vcpus": 2,
+ "ram": 2000 * 2**20
+ }
+ expect_container["name"] = "submit_wf_runner_resources.cwl"
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
+ {
+ "class": "http://arvados.org/cwl#WorkflowRunnerResources",
+ "coresMin": 2,
+ "ramMin": 2000
+ }
+ ]
+ expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][0]["$namespaces"] = {
+ "arv": "http://arvados.org/cwl#",
+ }
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
@mock.patch("arvados.commands.keepdocker.find_one_image_hash")
@mock.patch("cwltool.docker.DockerCommandLineJob.get_image")
@mock.patch("arvados.api")
# Copyright (C) The Arvados Authors. All rights reserved.
+# Copyright (C) 2018 Genome Research Ltd.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
from __future__ import print_function
from __future__ import absolute_import
import errno
import arvados.commands._util as arv_cmd
import arvados.collection
+import arvados.config as config
from arvados._version import __version__
return prefix+fn
- def write_file(collection, pathprefix, fn):
+ def write_file(collection, pathprefix, fn, flush=False):
with open(os.path.join(pathprefix, fn)) as src:
dst = collection.open(fn, "w")
r = src.read(1024*128)
while r:
dst.write(r)
r = src.read(1024*128)
- dst.close(flush=False)
+ dst.close(flush=flush)
def uploadfiles(files, api, dry_run=False, num_retries=0,
project=None,
fnPattern="$(file %s/%s)",
name=None,
- collection=None):
+ collection=None,
+ packed=True):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file paths no longer have a common parent.
continue
prev = localpath
if os.path.isfile(localpath):
- write_file(collection, pathprefix, f.fn)
+ write_file(collection, pathprefix, f.fn, not packed)
elif os.path.isdir(localpath):
for root, dirs, iterfiles in os.walk(localpath):
root = root[len(pathprefix):]
for src in iterfiles:
- write_file(collection, pathprefix, os.path.join(root, src))
+ write_file(collection, pathprefix, os.path.join(root, src), not packed)
- filters=[["portable_data_hash", "=", collection.portable_data_hash()]]
- if name:
- filters.append(["name", "like", name+"%"])
- if project:
- filters.append(["owner_uuid", "=", project])
+ pdh = None
+ if len(collection) > 0:
+ # non-empty collection
+ filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
+ name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
+ if name:
+ filters.append(["name", "=", name_pdh])
+ if project:
+ filters.append(["owner_uuid", "=", project])
- exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
+ # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
+ # and there is a potential race with other workflows that may have created the collection
+ # between when we list it and find it does not exist and when we attempt to create it.
+ tries = 2
+ while pdh is None and tries > 0:
+ exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
- if exists["items"]:
- item = exists["items"][0]
- pdh = item["portable_data_hash"]
- logger.info("Using collection %s (%s)", pdh, item["uuid"])
- elif len(collection) > 0:
- collection.save_new(name=name, owner_uuid=project, ensure_unique_name=True)
+ if exists["items"]:
+ item = exists["items"][0]
+ pdh = item["portable_data_hash"]
+ logger.info("Using collection %s (%s)", pdh, item["uuid"])
+ else:
+ try:
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ except arvados.errors.ApiError as ae:
+ tries -= 1
+ if pdh is None:
+ # Something weird going on here, probably a collection
+ # with a conflicting name but wrong PDH. We won't
+ # able to reuse it but we still need to save our
+ # collection, so so save it with unique name.
+ logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
+ collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
+ pdh = collection.portable_data_hash()
+ logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ else:
+ # empty collection
pdh = collection.portable_data_hash()
- logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
+ assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
+ logger.info("Using empty collection %s", pdh)
for c in files:
c.keepref = "%s/%s" % (pdh, c.fn)