1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
16 import arvados_cwl.executor
17 import cwltool.process
18 from arvados.errors import ApiError
19 from schema_salad.ref_resolver import Loader
20 from schema_salad.sourceline import cmap
21 from .mock_discovery import get_rootDesc
22 from .matcher import JsonDiffMatcher, StripYAMLComments
23 from .test_container import CollectionMock
25 if not os.getenv('ARVADOS_DEBUG'):
26 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
27 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
29 class TestJob(unittest.TestCase):
31 def helper(self, runner, enable_reuse=True):
32 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
34 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
35 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
36 loadingContext = arvados_cwl.context.ArvLoadingContext(
37 {"avsc_names": avsc_names,
39 "make_fs_access": make_fs_access,
41 "metadata": {"cwlVersion": "v1.0"},
42 "makeTool": runner.arv_make_tool})
43 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
46 "name": "test_run_job_"+str(enable_reuse),
47 "make_fs_access": make_fs_access,
48 "enable_reuse": enable_reuse,
51 return loadingContext, runtimeContext
53 # The test passes no builder.resources
54 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
55 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
56 def test_run(self, list_images_in_arv):
57 for enable_reuse in (True, False):
58 runner = mock.MagicMock()
59 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
60 runner.ignore_docker_for_reuse = False
61 runner.num_retries = 0
63 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
64 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
65 # Simulate reused job from another project so that we can check is a can_read
67 runner.api.jobs().create().execute.return_value = {
68 'state': 'Complete' if enable_reuse else 'Queued',
69 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
70 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
78 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
80 "class": "CommandLineTool"
83 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
85 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
86 arvtool.formatgraph = None
87 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
89 runner.api.jobs().create.assert_called_with(
90 body=JsonDiffMatcher({
91 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
92 'runtime_constraints': {},
93 'script_parameters': {
95 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
96 'command': ['ls', '$(task.outdir)']
99 'script_version': 'master',
100 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
101 'repository': 'arvados',
102 'script': 'crunchrunner',
103 'runtime_constraints': {
104 'docker_image': 'arvados/jobs',
105 'min_cores_per_node': 1,
106 'min_ram_mb_per_node': 1024,
107 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
110 find_or_create=enable_reuse,
111 filters=[['repository', '=', 'arvados'],
112 ['script', '=', 'crunchrunner'],
113 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
114 ['docker_image_locator', 'in docker', 'arvados/jobs']]
117 runner.api.links().create.assert_called_with(
118 body=JsonDiffMatcher({
119 'link_class': 'permission',
121 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
122 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
125 # Simulate an API excepction when trying to create a
126 # sharing link on the job
127 runner.api.links().create.side_effect = ApiError(
128 mock.MagicMock(return_value={'status': 403}),
130 j.run(runtimeContext)
132 assert not runner.api.links().create.called
134 # The test passes some fields in builder.resources
135 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
136 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
137 def test_resource_requirements(self, list_images_in_arv):
138 runner = mock.MagicMock()
139 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
140 runner.ignore_docker_for_reuse = False
141 runner.num_retries = 0
142 arvados_cwl.add_arv_hints()
144 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
145 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
151 "class": "ResourceRequirement",
156 "class": "http://arvados.org/cwl#RuntimeConstraints",
158 "outputDirType": "keep_output_dir"
160 "class": "http://arvados.org/cwl#APIRequirement",
163 "class": "http://arvados.org/cwl#ReuseRequirement",
168 "class": "CommandLineTool"
171 loadingContext, runtimeContext = self.helper(runner)
173 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
174 arvtool.formatgraph = None
175 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
176 j.run(runtimeContext)
177 runner.api.jobs().create.assert_called_with(
178 body=JsonDiffMatcher({
179 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
180 'runtime_constraints': {},
181 'script_parameters': {
183 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
184 'task.keepTmpOutput': True,
188 'script_version': 'master',
189 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
190 'repository': 'arvados',
191 'script': 'crunchrunner',
192 'runtime_constraints': {
193 'docker_image': 'arvados/jobs',
194 'min_cores_per_node': 3,
195 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
196 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
197 'keep_cache_mb_per_task': 512
200 find_or_create=False,
201 filters=[['repository', '=', 'arvados'],
202 ['script', '=', 'crunchrunner'],
203 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
204 ['docker_image_locator', 'in docker', 'arvados/jobs']])
206 @mock.patch("arvados.collection.CollectionReader")
207 def test_done(self, reader):
208 api = mock.MagicMock()
210 runner = mock.MagicMock()
212 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
213 runner.num_retries = 0
214 runner.ignore_docker_for_reuse = False
216 reader().open.return_value = StringIO.StringIO(
217 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
218 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
219 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
221 api.collections().list().execute.side_effect = ({"items": []},
222 {"items": [{"manifest_text": "XYZ"}]},
224 {"items": [{"manifest_text": "ABC"}]})
226 arvjob = arvados_cwl.ArvadosJob(runner,
233 arvjob.output_callback = mock.MagicMock()
234 arvjob.collect_outputs = mock.MagicMock()
235 arvjob.collect_outputs.return_value = {"out": "stuff"}
239 "output": "99999999999999999999999999999993+99",
240 "log": "99999999999999999999999999999994+99",
241 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
244 api.collections().list.assert_has_calls([
246 # Output collection check
247 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
248 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
249 ['name', '=', 'Output 9999999 of testjob']]),
250 mock.call().execute(num_retries=0),
251 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
252 select=['manifest_text']),
253 mock.call().execute(num_retries=0),
254 # Log collection's turn
255 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
256 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
257 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
258 mock.call().execute(num_retries=0),
259 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
260 select=['manifest_text']),
261 mock.call().execute(num_retries=0)])
263 api.collections().create.assert_has_calls([
264 mock.call(ensure_unique_name=True,
265 body={'portable_data_hash': '99999999999999999999999999999993+99',
266 'manifest_text': 'XYZ',
267 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
268 'name': 'Output 9999999 of testjob'}),
269 mock.call().execute(num_retries=0),
270 mock.call(ensure_unique_name=True,
271 body={'portable_data_hash': '99999999999999999999999999999994+99',
272 'manifest_text': 'ABC',
273 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
274 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
275 mock.call().execute(num_retries=0),
278 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
280 @mock.patch("arvados.collection.CollectionReader")
281 def test_done_use_existing_collection(self, reader):
282 api = mock.MagicMock()
284 runner = mock.MagicMock()
286 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
287 runner.num_retries = 0
289 reader().open.return_value = StringIO.StringIO(
290 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
291 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
292 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
295 api.collections().list().execute.side_effect = (
296 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
297 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
300 arvjob = arvados_cwl.ArvadosJob(runner,
307 arvjob.output_callback = mock.MagicMock()
308 arvjob.collect_outputs = mock.MagicMock()
309 arvjob.collect_outputs.return_value = {"out": "stuff"}
313 "output": "99999999999999999999999999999993+99",
314 "log": "99999999999999999999999999999994+99",
315 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
318 api.collections().list.assert_has_calls([
321 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
322 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
323 ['name', '=', 'Output 9999999 of testjob']]),
324 mock.call().execute(num_retries=0),
326 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
327 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
328 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
329 mock.call().execute(num_retries=0)
332 self.assertFalse(api.collections().create.called)
334 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
337 class TestWorkflow(unittest.TestCase):
338 def helper(self, runner, enable_reuse=True):
339 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
341 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
342 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
344 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
345 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
346 document_loader.fetch_text = document_loader.fetcher.fetch_text
347 document_loader.check_exists = document_loader.fetcher.check_exists
349 loadingContext = arvados_cwl.context.ArvLoadingContext(
350 {"avsc_names": avsc_names,
352 "make_fs_access": make_fs_access,
353 "loader": document_loader,
354 "metadata": {"cwlVersion": "v1.0"},
355 "construct_tool_object": runner.arv_make_tool})
356 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
359 "name": "test_run_wf_"+str(enable_reuse),
360 "make_fs_access": make_fs_access,
361 "enable_reuse": enable_reuse,
364 return loadingContext, runtimeContext
366 # The test passes no builder.resources
367 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
368 @mock.patch("arvados.collection.CollectionReader")
369 @mock.patch("arvados.collection.Collection")
370 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
371 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
372 arvados_cwl.add_arv_hints()
374 api = mock.MagicMock()
375 api._rootDesc = get_rootDesc()
377 runner = arvados_cwl.executor.ArvCwlExecutor(api)
378 self.assertEqual(runner.work_api, 'jobs')
380 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
381 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
382 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
384 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
385 runner.ignore_docker_for_reuse = False
386 runner.num_retries = 0
388 loadingContext, runtimeContext = self.helper(runner)
390 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
391 metadata["cwlVersion"] = tool["cwlVersion"]
393 mockc = mock.MagicMock()
394 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
395 mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
397 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
398 arvtool.formatgraph = None
399 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
401 it.next().run(runtimeContext)
402 it.next().run(runtimeContext)
404 with open("tests/wf/scatter2_subwf.cwl") as f:
405 subwf = StripYAMLComments(f.read())
407 runner.api.jobs().create.assert_called_with(
408 body=JsonDiffMatcher({
409 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
410 'repository': 'arvados',
411 'script_version': 'master',
412 'script': 'crunchrunner',
413 'script_parameters': {
414 'tasks': [{'task.env': {
415 'HOME': '$(task.outdir)',
416 'TMPDIR': '$(task.tmpdir)'},
418 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
419 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
421 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
422 'task.stdout': 'cwl.output.json'}]},
423 'runtime_constraints': {
424 'min_scratch_mb_per_node': 2048,
425 'min_cores_per_node': 1,
426 'docker_image': 'arvados/jobs',
427 'min_ram_mb_per_node': 1024
429 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
430 filters=[['repository', '=', 'arvados'],
431 ['script', '=', 'crunchrunner'],
432 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
433 ['docker_image_locator', 'in docker', 'arvados/jobs']],
436 mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
437 mockc.open().__enter__().write.assert_has_calls([mock.call(
440 "basename": "token.txt",
442 "location": "/keep/99999999999999999999999999999999+118/token.txt",
448 # The test passes no builder.resources
449 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
450 @mock.patch("arvados.collection.CollectionReader")
451 @mock.patch("arvados.collection.Collection")
452 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
453 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
454 arvados_cwl.add_arv_hints()
456 api = mock.MagicMock()
457 api._rootDesc = get_rootDesc()
459 runner = arvados_cwl.executor.ArvCwlExecutor(api)
460 self.assertEqual(runner.work_api, 'jobs')
462 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
463 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
464 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
466 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
467 runner.ignore_docker_for_reuse = False
468 runner.num_retries = 0
470 loadingContext, runtimeContext = self.helper(runner)
472 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
473 metadata["cwlVersion"] = tool["cwlVersion"]
475 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
477 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
478 arvtool.formatgraph = None
479 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
480 it.next().run(runtimeContext)
481 it.next().run(runtimeContext)
483 with open("tests/wf/echo-subwf.cwl") as f:
484 subwf = StripYAMLComments(f.read())
486 runner.api.jobs().create.assert_called_with(
487 body=JsonDiffMatcher({
488 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
489 'repository': 'arvados',
490 'script_version': 'master',
491 'script': 'crunchrunner',
492 'script_parameters': {
493 'tasks': [{'task.env': {
494 'HOME': '$(task.outdir)',
495 'TMPDIR': '$(task.tmpdir)'},
497 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
498 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
500 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
501 'task.stdout': 'cwl.output.json'}]},
502 'runtime_constraints': {
503 'min_scratch_mb_per_node': 4096,
504 'min_cores_per_node': 3,
505 'docker_image': 'arvados/jobs',
506 'min_ram_mb_per_node': 1024
508 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
509 filters=[['repository', '=', 'arvados'],
510 ['script', '=', 'crunchrunner'],
511 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
512 ['docker_image_locator', 'in docker', 'arvados/jobs']],
515 def test_default_work_api(self):
516 arvados_cwl.add_arv_hints()
518 api = mock.MagicMock()
519 api._rootDesc = copy.deepcopy(get_rootDesc())
520 del api._rootDesc.get('resources')['jobs']['methods']['create']
521 runner = arvados_cwl.executor.ArvCwlExecutor(api)
522 self.assertEqual(runner.work_api, 'containers')