1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
16 import cwltool.process
17 from arvados.errors import ApiError
18 from schema_salad.ref_resolver import Loader
19 from schema_salad.sourceline import cmap
20 from .mock_discovery import get_rootDesc
21 from .matcher import JsonDiffMatcher, StripYAMLComments
23 if not os.getenv('ARVADOS_DEBUG'):
24 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
25 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
27 class TestJob(unittest.TestCase):
29 def helper(self, runner, enable_reuse=True):
30 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
32 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
33 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
34 loadingContext = arvados_cwl.context.ArvLoadingContext(
35 {"avsc_names": avsc_names,
37 "make_fs_access": make_fs_access,
39 "metadata": {"cwlVersion": "v1.0"},
40 "makeTool": runner.arv_make_tool})
41 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
44 "name": "test_run_job_"+str(enable_reuse),
45 "make_fs_access": make_fs_access,
46 "enable_reuse": enable_reuse,
49 return loadingContext, runtimeContext
51 # The test passes no builder.resources
52 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
53 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
54 def test_run(self, list_images_in_arv):
55 for enable_reuse in (True, False):
56 runner = mock.MagicMock()
57 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
58 runner.ignore_docker_for_reuse = False
59 runner.num_retries = 0
61 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
62 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
63 # Simulate reused job from another project so that we can check is a can_read
65 runner.api.jobs().create().execute.return_value = {
66 'state': 'Complete' if enable_reuse else 'Queued',
67 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
68 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
76 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
78 "class": "CommandLineTool"
81 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
83 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
84 arvtool.formatgraph = None
85 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
87 runner.api.jobs().create.assert_called_with(
88 body=JsonDiffMatcher({
89 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
90 'runtime_constraints': {},
91 'script_parameters': {
93 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
94 'command': ['ls', '$(task.outdir)']
97 'script_version': 'master',
98 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
99 'repository': 'arvados',
100 'script': 'crunchrunner',
101 'runtime_constraints': {
102 'docker_image': 'arvados/jobs',
103 'min_cores_per_node': 1,
104 'min_ram_mb_per_node': 1024,
105 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
108 find_or_create=enable_reuse,
109 filters=[['repository', '=', 'arvados'],
110 ['script', '=', 'crunchrunner'],
111 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
112 ['docker_image_locator', 'in docker', 'arvados/jobs']]
115 runner.api.links().create.assert_called_with(
116 body=JsonDiffMatcher({
117 'link_class': 'permission',
119 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
120 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
123 # Simulate an API excepction when trying to create a
124 # sharing link on the job
125 runner.api.links().create.side_effect = ApiError(
126 mock.MagicMock(return_value={'status': 403}),
128 j.run(runtimeContext)
130 assert not runner.api.links().create.called
132 # The test passes some fields in builder.resources
133 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
134 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
135 def test_resource_requirements(self, list_images_in_arv):
136 runner = mock.MagicMock()
137 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
138 runner.ignore_docker_for_reuse = False
139 runner.num_retries = 0
140 arvados_cwl.add_arv_hints()
142 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
143 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
149 "class": "ResourceRequirement",
154 "class": "http://arvados.org/cwl#RuntimeConstraints",
156 "outputDirType": "keep_output_dir"
158 "class": "http://arvados.org/cwl#APIRequirement",
161 "class": "http://arvados.org/cwl#ReuseRequirement",
166 "class": "CommandLineTool"
169 loadingContext, runtimeContext = self.helper(runner)
171 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
172 arvtool.formatgraph = None
173 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
174 j.run(runtimeContext)
175 runner.api.jobs().create.assert_called_with(
176 body=JsonDiffMatcher({
177 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
178 'runtime_constraints': {},
179 'script_parameters': {
181 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
182 'task.keepTmpOutput': True,
186 'script_version': 'master',
187 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
188 'repository': 'arvados',
189 'script': 'crunchrunner',
190 'runtime_constraints': {
191 'docker_image': 'arvados/jobs',
192 'min_cores_per_node': 3,
193 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
194 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
195 'keep_cache_mb_per_task': 512
198 find_or_create=False,
199 filters=[['repository', '=', 'arvados'],
200 ['script', '=', 'crunchrunner'],
201 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
202 ['docker_image_locator', 'in docker', 'arvados/jobs']])
204 @mock.patch("arvados.collection.CollectionReader")
205 def test_done(self, reader):
206 api = mock.MagicMock()
208 runner = mock.MagicMock()
210 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
211 runner.num_retries = 0
212 runner.ignore_docker_for_reuse = False
214 reader().open.return_value = StringIO.StringIO(
215 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
216 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
217 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
219 api.collections().list().execute.side_effect = ({"items": []},
220 {"items": [{"manifest_text": "XYZ"}]},
222 {"items": [{"manifest_text": "ABC"}]})
224 arvjob = arvados_cwl.ArvadosJob(runner,
231 arvjob.output_callback = mock.MagicMock()
232 arvjob.collect_outputs = mock.MagicMock()
233 arvjob.collect_outputs.return_value = {"out": "stuff"}
237 "output": "99999999999999999999999999999993+99",
238 "log": "99999999999999999999999999999994+99",
239 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
242 api.collections().list.assert_has_calls([
244 # Output collection check
245 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
246 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
247 ['name', '=', 'Output 9999999 of testjob']]),
248 mock.call().execute(num_retries=0),
249 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
250 select=['manifest_text']),
251 mock.call().execute(num_retries=0),
252 # Log collection's turn
253 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
254 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
255 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
256 mock.call().execute(num_retries=0),
257 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
258 select=['manifest_text']),
259 mock.call().execute(num_retries=0)])
261 api.collections().create.assert_has_calls([
262 mock.call(ensure_unique_name=True,
263 body={'portable_data_hash': '99999999999999999999999999999993+99',
264 'manifest_text': 'XYZ',
265 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
266 'name': 'Output 9999999 of testjob'}),
267 mock.call().execute(num_retries=0),
268 mock.call(ensure_unique_name=True,
269 body={'portable_data_hash': '99999999999999999999999999999994+99',
270 'manifest_text': 'ABC',
271 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
272 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
273 mock.call().execute(num_retries=0),
276 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
278 @mock.patch("arvados.collection.CollectionReader")
279 def test_done_use_existing_collection(self, reader):
280 api = mock.MagicMock()
282 runner = mock.MagicMock()
284 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
285 runner.num_retries = 0
287 reader().open.return_value = StringIO.StringIO(
288 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
289 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
290 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
293 api.collections().list().execute.side_effect = (
294 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
295 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
298 arvjob = arvados_cwl.ArvadosJob(runner,
305 arvjob.output_callback = mock.MagicMock()
306 arvjob.collect_outputs = mock.MagicMock()
307 arvjob.collect_outputs.return_value = {"out": "stuff"}
311 "output": "99999999999999999999999999999993+99",
312 "log": "99999999999999999999999999999994+99",
313 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
316 api.collections().list.assert_has_calls([
319 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
320 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
321 ['name', '=', 'Output 9999999 of testjob']]),
322 mock.call().execute(num_retries=0),
324 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
325 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
326 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
327 mock.call().execute(num_retries=0)
330 self.assertFalse(api.collections().create.called)
332 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
335 class TestWorkflow(unittest.TestCase):
336 def helper(self, runner, enable_reuse=True):
337 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
339 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
340 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
342 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
343 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
344 document_loader.fetch_text = document_loader.fetcher.fetch_text
345 document_loader.check_exists = document_loader.fetcher.check_exists
347 loadingContext = arvados_cwl.context.ArvLoadingContext(
348 {"avsc_names": avsc_names,
350 "make_fs_access": make_fs_access,
351 "loader": document_loader,
352 "metadata": {"cwlVersion": "v1.0"},
353 "construct_tool_object": runner.arv_make_tool})
354 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
357 "name": "test_run_wf_"+str(enable_reuse),
358 "make_fs_access": make_fs_access,
359 "enable_reuse": enable_reuse,
362 return loadingContext, runtimeContext
364 # The test passes no builder.resources
365 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
366 @mock.patch("arvados.collection.CollectionReader")
367 @mock.patch("arvados.collection.Collection")
368 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
369 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
370 arvados_cwl.add_arv_hints()
372 api = mock.MagicMock()
373 api._rootDesc = get_rootDesc()
375 runner = arvados_cwl.ArvCwlRunner(api)
376 self.assertEqual(runner.work_api, 'jobs')
378 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
379 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
380 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
382 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
383 runner.ignore_docker_for_reuse = False
384 runner.num_retries = 0
386 loadingContext, runtimeContext = self.helper(runner)
388 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
389 metadata["cwlVersion"] = tool["cwlVersion"]
391 mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
392 mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
394 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
395 arvtool.formatgraph = None
396 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
398 it.next().run(runtimeContext)
399 it.next().run(runtimeContext)
401 with open("tests/wf/scatter2_subwf.cwl") as f:
402 subwf = StripYAMLComments(f.read())
404 runner.api.jobs().create.assert_called_with(
405 body=JsonDiffMatcher({
406 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
407 'repository': 'arvados',
408 'script_version': 'master',
409 'script': 'crunchrunner',
410 'script_parameters': {
411 'tasks': [{'task.env': {
412 'HOME': '$(task.outdir)',
413 'TMPDIR': '$(task.tmpdir)'},
415 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
416 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
418 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
419 'task.stdout': 'cwl.output.json'}]},
420 'runtime_constraints': {
421 'min_scratch_mb_per_node': 2048,
422 'min_cores_per_node': 1,
423 'docker_image': 'arvados/jobs',
424 'min_ram_mb_per_node': 1024
426 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
427 filters=[['repository', '=', 'arvados'],
428 ['script', '=', 'crunchrunner'],
429 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
430 ['docker_image_locator', 'in docker', 'arvados/jobs']],
433 mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
434 mockcollection().open().__enter__().write.assert_has_calls([mock.call(
437 "basename": "token.txt",
439 "location": "/keep/99999999999999999999999999999999+118/token.txt",
445 # The test passes no builder.resources
446 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
447 @mock.patch("arvados.collection.CollectionReader")
448 @mock.patch("arvados.collection.Collection")
449 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
450 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
451 arvados_cwl.add_arv_hints()
453 api = mock.MagicMock()
454 api._rootDesc = get_rootDesc()
456 runner = arvados_cwl.ArvCwlRunner(api)
457 self.assertEqual(runner.work_api, 'jobs')
459 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
460 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
461 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
463 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
464 runner.ignore_docker_for_reuse = False
465 runner.num_retries = 0
467 loadingContext, runtimeContext = self.helper(runner)
469 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
470 metadata["cwlVersion"] = tool["cwlVersion"]
472 mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
474 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
475 arvtool.formatgraph = None
476 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
477 it.next().run(runtimeContext)
478 it.next().run(runtimeContext)
480 with open("tests/wf/echo-subwf.cwl") as f:
481 subwf = StripYAMLComments(f.read())
483 runner.api.jobs().create.assert_called_with(
484 body=JsonDiffMatcher({
485 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
486 'repository': 'arvados',
487 'script_version': 'master',
488 'script': 'crunchrunner',
489 'script_parameters': {
490 'tasks': [{'task.env': {
491 'HOME': '$(task.outdir)',
492 'TMPDIR': '$(task.tmpdir)'},
494 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
495 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
497 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
498 'task.stdout': 'cwl.output.json'}]},
499 'runtime_constraints': {
500 'min_scratch_mb_per_node': 4096,
501 'min_cores_per_node': 3,
502 'docker_image': 'arvados/jobs',
503 'min_ram_mb_per_node': 1024
505 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
506 filters=[['repository', '=', 'arvados'],
507 ['script', '=', 'crunchrunner'],
508 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
509 ['docker_image_locator', 'in docker', 'arvados/jobs']],
512 def test_default_work_api(self):
513 arvados_cwl.add_arv_hints()
515 api = mock.MagicMock()
516 api._rootDesc = copy.deepcopy(get_rootDesc())
517 del api._rootDesc.get('resources')['jobs']['methods']['create']
518 runner = arvados_cwl.ArvCwlRunner(api)
519 self.assertEqual(runner.work_api, 'containers')