1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from builtins import next
21 import arvados_cwl.executor
22 import cwltool.process
23 from arvados.errors import ApiError
24 from schema_salad.ref_resolver import Loader
25 from schema_salad.sourceline import cmap
26 from .mock_discovery import get_rootDesc
27 from .matcher import JsonDiffMatcher, StripYAMLComments
28 from .test_container import CollectionMock
30 if not os.getenv('ARVADOS_DEBUG'):
31 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
32 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
34 class TestJob(unittest.TestCase):
36 def helper(self, runner, enable_reuse=True):
37 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
39 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
40 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
41 loadingContext = arvados_cwl.context.ArvLoadingContext(
42 {"avsc_names": avsc_names,
44 "make_fs_access": make_fs_access,
46 "metadata": {"cwlVersion": "v1.0"},
47 "makeTool": runner.arv_make_tool})
48 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
51 "name": "test_run_job_"+str(enable_reuse),
52 "make_fs_access": make_fs_access,
53 "enable_reuse": enable_reuse,
56 return loadingContext, runtimeContext
58 # The test passes no builder.resources
59 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
60 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
61 def test_run(self, list_images_in_arv):
62 for enable_reuse in (True, False):
63 runner = mock.MagicMock()
64 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
65 runner.ignore_docker_for_reuse = False
66 runner.num_retries = 0
68 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
69 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
70 # Simulate reused job from another project so that we can check is a can_read
72 runner.api.jobs().create().execute.return_value = {
73 'state': 'Complete' if enable_reuse else 'Queued',
74 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
75 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
83 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
85 "class": "CommandLineTool"
88 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
90 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
91 arvtool.formatgraph = None
92 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
94 runner.api.jobs().create.assert_called_with(
95 body=JsonDiffMatcher({
96 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
97 'runtime_constraints': {},
98 'script_parameters': {
100 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
101 'command': ['ls', '$(task.outdir)']
104 'script_version': 'master',
105 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
106 'repository': 'arvados',
107 'script': 'crunchrunner',
108 'runtime_constraints': {
109 'docker_image': 'arvados/jobs',
110 'min_cores_per_node': 1,
111 'min_ram_mb_per_node': 1024,
112 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
115 find_or_create=enable_reuse,
116 filters=[['repository', '=', 'arvados'],
117 ['script', '=', 'crunchrunner'],
118 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
119 ['docker_image_locator', 'in docker', 'arvados/jobs']]
122 runner.api.links().create.assert_called_with(
123 body=JsonDiffMatcher({
124 'link_class': 'permission',
126 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
127 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
130 # Simulate an API excepction when trying to create a
131 # sharing link on the job
132 runner.api.links().create.side_effect = ApiError(
133 mock.MagicMock(return_value={'status': 403}),
134 bytes(b'Permission denied'))
135 j.run(runtimeContext)
137 assert not runner.api.links().create.called
139 # The test passes some fields in builder.resources
140 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
141 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
142 def test_resource_requirements(self, list_images_in_arv):
143 runner = mock.MagicMock()
144 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
145 runner.ignore_docker_for_reuse = False
146 runner.num_retries = 0
147 arvados_cwl.add_arv_hints()
149 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
150 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
156 "class": "ResourceRequirement",
161 "class": "http://arvados.org/cwl#RuntimeConstraints",
163 "outputDirType": "keep_output_dir"
165 "class": "http://arvados.org/cwl#APIRequirement",
168 "class": "http://arvados.org/cwl#ReuseRequirement",
173 "class": "CommandLineTool"
176 loadingContext, runtimeContext = self.helper(runner)
178 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
179 arvtool.formatgraph = None
180 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
181 j.run(runtimeContext)
182 runner.api.jobs().create.assert_called_with(
183 body=JsonDiffMatcher({
184 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
185 'runtime_constraints': {},
186 'script_parameters': {
188 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
189 'task.keepTmpOutput': True,
193 'script_version': 'master',
194 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
195 'repository': 'arvados',
196 'script': 'crunchrunner',
197 'runtime_constraints': {
198 'docker_image': 'arvados/jobs',
199 'min_cores_per_node': 3,
200 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
201 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
202 'keep_cache_mb_per_task': 512
205 find_or_create=False,
206 filters=[['repository', '=', 'arvados'],
207 ['script', '=', 'crunchrunner'],
208 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
209 ['docker_image_locator', 'in docker', 'arvados/jobs']])
211 @mock.patch("arvados.collection.CollectionReader")
212 def test_done(self, reader):
213 api = mock.MagicMock()
215 runner = mock.MagicMock()
217 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
218 runner.num_retries = 0
219 runner.ignore_docker_for_reuse = False
221 reader().keys.return_value = "log.txt"
222 reader().open.return_value = io.StringIO(
223 str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
224 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
225 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
227 api.collections().list().execute.side_effect = ({"items": []},
228 {"items": [{"manifest_text": "XYZ"}]},
230 {"items": [{"manifest_text": "ABC"}]})
232 arvjob = arvados_cwl.ArvadosJob(runner,
239 arvjob.output_callback = mock.MagicMock()
240 arvjob.collect_outputs = mock.MagicMock()
241 arvjob.collect_outputs.return_value = {"out": "stuff"}
245 "output": "99999999999999999999999999999993+99",
246 "log": "99999999999999999999999999999994+99",
247 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
250 api.collections().list.assert_has_calls([
252 # Output collection check
253 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
254 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
255 ['name', '=', 'Output 9999999 of testjob']]),
256 mock.call().execute(num_retries=0),
257 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
258 select=['manifest_text']),
259 mock.call().execute(num_retries=0),
260 # Log collection's turn
261 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
262 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
263 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
264 mock.call().execute(num_retries=0),
265 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
266 select=['manifest_text']),
267 mock.call().execute(num_retries=0)])
269 api.collections().create.assert_has_calls([
270 mock.call(ensure_unique_name=True,
271 body={'portable_data_hash': '99999999999999999999999999999993+99',
272 'manifest_text': 'XYZ',
273 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
274 'name': 'Output 9999999 of testjob'}),
275 mock.call().execute(num_retries=0),
276 mock.call(ensure_unique_name=True,
277 body={'portable_data_hash': '99999999999999999999999999999994+99',
278 'manifest_text': 'ABC',
279 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
280 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
281 mock.call().execute(num_retries=0),
284 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
286 @mock.patch("arvados.collection.CollectionReader")
287 def test_done_use_existing_collection(self, reader):
288 api = mock.MagicMock()
290 runner = mock.MagicMock()
292 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
293 runner.num_retries = 0
295 reader().keys.return_value = "log.txt"
296 reader().open.return_value = io.StringIO(
297 str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
298 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
299 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
302 api.collections().list().execute.side_effect = (
303 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
304 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
307 arvjob = arvados_cwl.ArvadosJob(runner,
314 arvjob.output_callback = mock.MagicMock()
315 arvjob.collect_outputs = mock.MagicMock()
316 arvjob.collect_outputs.return_value = {"out": "stuff"}
320 "output": "99999999999999999999999999999993+99",
321 "log": "99999999999999999999999999999994+99",
322 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
325 api.collections().list.assert_has_calls([
328 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
329 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
330 ['name', '=', 'Output 9999999 of testjob']]),
331 mock.call().execute(num_retries=0),
333 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
334 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
335 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
336 mock.call().execute(num_retries=0)
339 self.assertFalse(api.collections().create.called)
341 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
344 class TestWorkflow(unittest.TestCase):
345 def helper(self, runner, enable_reuse=True):
346 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
348 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
349 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
351 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
352 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
353 document_loader.fetch_text = document_loader.fetcher.fetch_text
354 document_loader.check_exists = document_loader.fetcher.check_exists
356 loadingContext = arvados_cwl.context.ArvLoadingContext(
357 {"avsc_names": avsc_names,
359 "make_fs_access": make_fs_access,
360 "loader": document_loader,
361 "metadata": {"cwlVersion": "v1.0"},
362 "construct_tool_object": runner.arv_make_tool})
363 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
366 "name": "test_run_wf_"+str(enable_reuse),
367 "make_fs_access": make_fs_access,
368 "enable_reuse": enable_reuse,
371 return loadingContext, runtimeContext
373 # The test passes no builder.resources
374 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
375 @mock.patch("arvados.collection.CollectionReader")
376 @mock.patch("arvados.collection.Collection")
377 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
378 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
379 arvados_cwl.add_arv_hints()
381 api = mock.MagicMock()
382 api._rootDesc = get_rootDesc()
384 runner = arvados_cwl.executor.ArvCwlExecutor(api)
385 self.assertEqual(runner.work_api, 'jobs')
387 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
388 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
389 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
391 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
392 runner.ignore_docker_for_reuse = False
393 runner.num_retries = 0
395 loadingContext, runtimeContext = self.helper(runner)
397 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
398 metadata["cwlVersion"] = tool["cwlVersion"]
400 mockc = mock.MagicMock()
401 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
402 mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
404 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
405 arvtool.formatgraph = None
406 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
408 next(it).run(runtimeContext)
409 next(it).run(runtimeContext)
411 with open("tests/wf/scatter2_subwf.cwl") as f:
412 subwf = StripYAMLComments(f.read())
414 runner.api.jobs().create.assert_called_with(
415 body=JsonDiffMatcher({
416 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
417 'repository': 'arvados',
418 'script_version': 'master',
419 'script': 'crunchrunner',
420 'script_parameters': {
421 'tasks': [{'task.env': {
422 'HOME': '$(task.outdir)',
423 'TMPDIR': '$(task.tmpdir)'},
425 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
426 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
428 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
429 'task.stdout': 'cwl.output.json'}]},
430 'runtime_constraints': {
431 'min_scratch_mb_per_node': 2048,
432 'min_cores_per_node': 1,
433 'docker_image': 'arvados/jobs',
434 'min_ram_mb_per_node': 1024
436 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
437 filters=[['repository', '=', 'arvados'],
438 ['script', '=', 'crunchrunner'],
439 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
440 ['docker_image_locator', 'in docker', 'arvados/jobs']],
443 mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
444 mockc.open().__enter__().write.assert_has_calls([mock.call(
447 "basename": "token.txt",
449 "location": "/keep/99999999999999999999999999999999+118/token.txt",
455 # The test passes no builder.resources
456 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
457 @mock.patch("arvados.collection.CollectionReader")
458 @mock.patch("arvados.collection.Collection")
459 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
460 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
461 arvados_cwl.add_arv_hints()
463 api = mock.MagicMock()
464 api._rootDesc = get_rootDesc()
466 runner = arvados_cwl.executor.ArvCwlExecutor(api)
467 self.assertEqual(runner.work_api, 'jobs')
469 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
470 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
471 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
473 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
474 runner.ignore_docker_for_reuse = False
475 runner.num_retries = 0
477 loadingContext, runtimeContext = self.helper(runner)
479 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
480 metadata["cwlVersion"] = tool["cwlVersion"]
482 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
484 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
485 arvtool.formatgraph = None
486 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
488 next(it).run(runtimeContext)
489 next(it).run(runtimeContext)
491 with open("tests/wf/echo-subwf.cwl") as f:
492 subwf = StripYAMLComments(f.read())
494 runner.api.jobs().create.assert_called_with(
495 body=JsonDiffMatcher({
496 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
497 'repository': 'arvados',
498 'script_version': 'master',
499 'script': 'crunchrunner',
500 'script_parameters': {
501 'tasks': [{'task.env': {
502 'HOME': '$(task.outdir)',
503 'TMPDIR': '$(task.tmpdir)'},
505 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
506 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
508 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
509 'task.stdout': 'cwl.output.json'}]},
510 'runtime_constraints': {
511 'min_scratch_mb_per_node': 4096,
512 'min_cores_per_node': 3,
513 'docker_image': 'arvados/jobs',
514 'min_ram_mb_per_node': 1024
516 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
517 filters=[['repository', '=', 'arvados'],
518 ['script', '=', 'crunchrunner'],
519 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
520 ['docker_image_locator', 'in docker', 'arvados/jobs']],
523 def test_default_work_api(self):
524 arvados_cwl.add_arv_hints()
526 api = mock.MagicMock()
527 api._rootDesc = copy.deepcopy(get_rootDesc())
528 del api._rootDesc.get('resources')['jobs']['methods']['create']
529 runner = arvados_cwl.executor.ArvCwlExecutor(api)
530 self.assertEqual(runner.work_api, 'containers')