1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from future import standard_library
6 standard_library.install_aliases()
7 from builtins import str
8 from builtins import next
22 import arvados_cwl.executor
23 import cwltool.process
24 from arvados.errors import ApiError
25 from schema_salad.ref_resolver import Loader
26 from schema_salad.sourceline import cmap
27 from .mock_discovery import get_rootDesc
28 from .matcher import JsonDiffMatcher, StripYAMLComments
29 from .test_container import CollectionMock
30 from arvados_cwl.arvdocker import arv_docker_clear_cache
32 if not os.getenv('ARVADOS_DEBUG'):
33 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
34 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
36 class TestJob(unittest.TestCase):
38 def helper(self, runner, enable_reuse=True):
39 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
41 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
42 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
43 loadingContext = arvados_cwl.context.ArvLoadingContext(
44 {"avsc_names": avsc_names,
46 "make_fs_access": make_fs_access,
48 "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
49 "makeTool": runner.arv_make_tool})
50 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
53 "name": "test_run_job_"+str(enable_reuse),
54 "make_fs_access": make_fs_access,
55 "enable_reuse": enable_reuse,
58 return loadingContext, runtimeContext
60 # The test passes no builder.resources
61 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
62 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
63 def test_run(self, list_images_in_arv):
64 for enable_reuse in (True, False):
65 arv_docker_clear_cache()
66 runner = mock.MagicMock()
67 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
68 runner.ignore_docker_for_reuse = False
69 runner.num_retries = 0
71 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
72 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
73 # Simulate reused job from another project so that we can check is a can_read
75 runner.api.jobs().create().execute.return_value = {
76 'state': 'Complete' if enable_reuse else 'Queued',
77 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
78 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
86 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
88 "class": "CommandLineTool"
91 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
93 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
94 arvtool.formatgraph = None
95 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
97 runner.api.jobs().create.assert_called_with(
98 body=JsonDiffMatcher({
99 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
100 'runtime_constraints': {},
101 'script_parameters': {
103 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
104 'command': ['ls', '$(task.outdir)']
107 'script_version': 'master',
108 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
109 'repository': 'arvados',
110 'script': 'crunchrunner',
111 'runtime_constraints': {
112 'docker_image': 'arvados/jobs',
113 'min_cores_per_node': 1,
114 'min_ram_mb_per_node': 1024,
115 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
118 find_or_create=enable_reuse,
119 filters=[['repository', '=', 'arvados'],
120 ['script', '=', 'crunchrunner'],
121 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
122 ['docker_image_locator', 'in docker', 'arvados/jobs']]
125 runner.api.links().create.assert_called_with(
126 body=JsonDiffMatcher({
127 'link_class': 'permission',
129 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
130 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
133 # Simulate an API excepction when trying to create a
134 # sharing link on the job
135 runner.api.links().create.side_effect = ApiError(
136 mock.MagicMock(return_value={'status': 403}),
137 bytes(b'Permission denied'))
138 j.run(runtimeContext)
140 assert not runner.api.links().create.called
142 # The test passes some fields in builder.resources
143 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
144 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
145 def test_resource_requirements(self, list_images_in_arv):
146 runner = mock.MagicMock()
147 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
148 runner.ignore_docker_for_reuse = False
149 runner.num_retries = 0
150 arvados_cwl.add_arv_hints()
152 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
153 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
159 "class": "ResourceRequirement",
164 "class": "http://arvados.org/cwl#RuntimeConstraints",
166 "outputDirType": "keep_output_dir"
168 "class": "http://arvados.org/cwl#APIRequirement",
171 "class": "http://arvados.org/cwl#ReuseRequirement",
176 "class": "CommandLineTool"
179 loadingContext, runtimeContext = self.helper(runner)
181 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
182 arvtool.formatgraph = None
183 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
184 j.run(runtimeContext)
185 runner.api.jobs().create.assert_called_with(
186 body=JsonDiffMatcher({
187 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
188 'runtime_constraints': {},
189 'script_parameters': {
191 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
192 'task.keepTmpOutput': True,
196 'script_version': 'master',
197 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
198 'repository': 'arvados',
199 'script': 'crunchrunner',
200 'runtime_constraints': {
201 'docker_image': 'arvados/jobs',
202 'min_cores_per_node': 3,
203 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
204 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
205 'keep_cache_mb_per_task': 512
208 find_or_create=False,
209 filters=[['repository', '=', 'arvados'],
210 ['script', '=', 'crunchrunner'],
211 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
212 ['docker_image_locator', 'in docker', 'arvados/jobs']])
214 @mock.patch("arvados.collection.CollectionReader")
215 def test_done(self, reader):
216 api = mock.MagicMock()
218 runner = mock.MagicMock()
220 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
221 runner.num_retries = 0
222 runner.ignore_docker_for_reuse = False
224 reader().keys.return_value = "log.txt"
225 reader().open.return_value = io.StringIO(
226 str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
227 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
228 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
230 api.collections().list().execute.side_effect = ({"items": []},
231 {"items": [{"manifest_text": "XYZ"}]},
233 {"items": [{"manifest_text": "ABC"}]})
235 arvjob = arvados_cwl.ArvadosJob(runner,
242 arvjob.output_callback = mock.MagicMock()
243 arvjob.collect_outputs = mock.MagicMock()
244 arvjob.collect_outputs.return_value = {"out": "stuff"}
248 "output": "99999999999999999999999999999993+99",
249 "log": "99999999999999999999999999999994+99",
250 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
253 api.collections().list.assert_has_calls([
255 # Output collection check
256 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
257 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
258 ['name', '=', 'Output 9999999 of testjob']]),
259 mock.call().execute(num_retries=0),
260 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
261 select=['manifest_text']),
262 mock.call().execute(num_retries=0),
263 # Log collection's turn
264 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
265 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
266 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
267 mock.call().execute(num_retries=0),
268 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
269 select=['manifest_text']),
270 mock.call().execute(num_retries=0)])
272 api.collections().create.assert_has_calls([
273 mock.call(ensure_unique_name=True,
274 body={'portable_data_hash': '99999999999999999999999999999993+99',
275 'manifest_text': 'XYZ',
276 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
277 'name': 'Output 9999999 of testjob'}),
278 mock.call().execute(num_retries=0),
279 mock.call(ensure_unique_name=True,
280 body={'portable_data_hash': '99999999999999999999999999999994+99',
281 'manifest_text': 'ABC',
282 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
283 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
284 mock.call().execute(num_retries=0),
287 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
289 @mock.patch("arvados.collection.CollectionReader")
290 def test_done_use_existing_collection(self, reader):
291 api = mock.MagicMock()
293 runner = mock.MagicMock()
295 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
296 runner.num_retries = 0
298 reader().keys.return_value = "log.txt"
299 reader().open.return_value = io.StringIO(
300 str(u"""2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
301 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
302 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
305 api.collections().list().execute.side_effect = (
306 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
307 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
310 arvjob = arvados_cwl.ArvadosJob(runner,
317 arvjob.output_callback = mock.MagicMock()
318 arvjob.collect_outputs = mock.MagicMock()
319 arvjob.collect_outputs.return_value = {"out": "stuff"}
323 "output": "99999999999999999999999999999993+99",
324 "log": "99999999999999999999999999999994+99",
325 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
328 api.collections().list.assert_has_calls([
331 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
332 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
333 ['name', '=', 'Output 9999999 of testjob']]),
334 mock.call().execute(num_retries=0),
336 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
337 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
338 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
339 mock.call().execute(num_retries=0)
342 self.assertFalse(api.collections().create.called)
344 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
347 class TestWorkflow(unittest.TestCase):
348 def helper(self, runner, enable_reuse=True):
349 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
351 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
352 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
354 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
355 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
356 document_loader.fetch_text = document_loader.fetcher.fetch_text
357 document_loader.check_exists = document_loader.fetcher.check_exists
359 loadingContext = arvados_cwl.context.ArvLoadingContext(
360 {"avsc_names": avsc_names,
362 "make_fs_access": make_fs_access,
363 "loader": document_loader,
364 "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
365 "construct_tool_object": runner.arv_make_tool})
366 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
369 "name": "test_run_wf_"+str(enable_reuse),
370 "make_fs_access": make_fs_access,
371 "enable_reuse": enable_reuse,
374 return loadingContext, runtimeContext
376 # The test passes no builder.resources
377 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
378 @mock.patch("arvados.collection.CollectionReader")
379 @mock.patch("arvados.collection.Collection")
380 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
381 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
382 arv_docker_clear_cache()
383 arvados_cwl.add_arv_hints()
385 api = mock.MagicMock()
386 api._rootDesc = get_rootDesc()
388 runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
392 collection_cache_size=None))
393 self.assertEqual(runner.work_api, 'jobs')
395 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
396 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
397 runner.api.collections().list().execute.return_value = {"items": [{
398 "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
399 "portable_data_hash": "99999999999999999999999999999993+99"}]}
401 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
402 runner.ignore_docker_for_reuse = False
403 runner.num_retries = 0
405 loadingContext, runtimeContext = self.helper(runner)
406 runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
407 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
408 metadata["cwlVersion"] = tool["cwlVersion"]
410 mockc = mock.MagicMock()
411 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
412 mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
414 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
415 arvtool.formatgraph = None
416 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
418 next(it).run(runtimeContext)
419 next(it).run(runtimeContext)
421 with open("tests/wf/scatter2_subwf.cwl") as f:
422 subwf = StripYAMLComments(f.read().rstrip())
424 runner.api.jobs().create.assert_called_with(
425 body=JsonDiffMatcher({
426 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
427 'repository': 'arvados',
428 'script_version': 'master',
429 'script': 'crunchrunner',
430 'script_parameters': {
431 'tasks': [{'task.env': {
432 'HOME': '$(task.outdir)',
433 'TMPDIR': '$(task.tmpdir)'},
435 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
436 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
438 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
439 'task.stdout': 'cwl.output.json'}]},
440 'runtime_constraints': {
441 'min_scratch_mb_per_node': 2048,
442 'min_cores_per_node': 1,
443 'docker_image': 'arvados/jobs',
444 'min_ram_mb_per_node': 1024
446 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
447 filters=[['repository', '=', 'arvados'],
448 ['script', '=', 'crunchrunner'],
449 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
450 ['docker_image_locator', 'in docker', 'arvados/jobs']],
453 mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
454 mockc.open().__enter__().write.assert_has_calls([mock.call(
457 "basename": "token.txt",
459 "location": "/keep/99999999999999999999999999999999+118/token.txt",
465 # The test passes no builder.resources
466 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
467 @mock.patch("arvados.collection.CollectionReader")
468 @mock.patch("arvados.collection.Collection")
469 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
470 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
471 arv_docker_clear_cache()
472 arvados_cwl.add_arv_hints()
474 api = mock.MagicMock()
475 api._rootDesc = get_rootDesc()
477 runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
481 collection_cache_size=None))
482 self.assertEqual(runner.work_api, 'jobs')
484 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
485 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
486 runner.api.collections().list().execute.return_value = {"items": [{
487 "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
488 "portable_data_hash": "99999999999999999999999999999993+99"}]}
490 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
491 runner.ignore_docker_for_reuse = False
492 runner.num_retries = 0
494 loadingContext, runtimeContext = self.helper(runner)
495 loadingContext.do_update = True
496 runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
497 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
499 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
501 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
502 arvtool.formatgraph = None
503 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
505 next(it).run(runtimeContext)
506 next(it).run(runtimeContext)
508 with open("tests/wf/echo-subwf.cwl") as f:
509 subwf = StripYAMLComments(f.read())
511 runner.api.jobs().create.assert_called_with(
512 body=JsonDiffMatcher({
513 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
514 'repository': 'arvados',
515 'script_version': 'master',
516 'script': 'crunchrunner',
517 'script_parameters': {
518 'tasks': [{'task.env': {
519 'HOME': '$(task.outdir)',
520 'TMPDIR': '$(task.tmpdir)'},
522 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
523 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
525 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
526 'task.stdout': 'cwl.output.json'}]},
527 'runtime_constraints': {
528 'min_scratch_mb_per_node': 4096,
529 'min_cores_per_node': 3,
530 'docker_image': 'arvados/jobs',
531 'min_ram_mb_per_node': 1024
533 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
534 filters=[['repository', '=', 'arvados'],
535 ['script', '=', 'crunchrunner'],
536 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
537 ['docker_image_locator', 'in docker', 'arvados/jobs']],
540 def test_default_work_api(self):
541 arvados_cwl.add_arv_hints()
543 api = mock.MagicMock()
544 api._rootDesc = copy.deepcopy(get_rootDesc())
545 del api._rootDesc.get('resources')['jobs']['methods']['create']
546 runner = arvados_cwl.executor.ArvCwlExecutor(api)
547 self.assertEqual(runner.work_api, 'containers')