1 from future import standard_library
2 standard_library.install_aliases()
3 from builtins import str
4 # Copyright (C) The Arvados Authors. All rights reserved.
6 # SPDX-License-Identifier: Apache-2.0
19 import arvados_cwl.executor
20 import cwltool.process
21 from arvados.errors import ApiError
22 from schema_salad.ref_resolver import Loader
23 from schema_salad.sourceline import cmap
24 from .mock_discovery import get_rootDesc
25 from .matcher import JsonDiffMatcher, StripYAMLComments
26 from .test_container import CollectionMock
28 if not os.getenv('ARVADOS_DEBUG'):
29 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
30 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
32 class TestJob(unittest.TestCase):
34 def helper(self, runner, enable_reuse=True):
35 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
37 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
38 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
39 loadingContext = arvados_cwl.context.ArvLoadingContext(
40 {"avsc_names": avsc_names,
42 "make_fs_access": make_fs_access,
44 "metadata": {"cwlVersion": "v1.0"},
45 "makeTool": runner.arv_make_tool})
46 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
49 "name": "test_run_job_"+str(enable_reuse),
50 "make_fs_access": make_fs_access,
51 "enable_reuse": enable_reuse,
54 return loadingContext, runtimeContext
56 # The test passes no builder.resources
57 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
58 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
59 def test_run(self, list_images_in_arv):
60 for enable_reuse in (True, False):
61 runner = mock.MagicMock()
62 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
63 runner.ignore_docker_for_reuse = False
64 runner.num_retries = 0
66 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
67 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
68 # Simulate reused job from another project so that we can check is a can_read
70 runner.api.jobs().create().execute.return_value = {
71 'state': 'Complete' if enable_reuse else 'Queued',
72 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
73 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
81 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
83 "class": "CommandLineTool"
86 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
88 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
89 arvtool.formatgraph = None
90 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
92 runner.api.jobs().create.assert_called_with(
93 body=JsonDiffMatcher({
94 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
95 'runtime_constraints': {},
96 'script_parameters': {
98 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
99 'command': ['ls', '$(task.outdir)']
102 'script_version': 'master',
103 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
104 'repository': 'arvados',
105 'script': 'crunchrunner',
106 'runtime_constraints': {
107 'docker_image': 'arvados/jobs',
108 'min_cores_per_node': 1,
109 'min_ram_mb_per_node': 1024,
110 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
113 find_or_create=enable_reuse,
114 filters=[['repository', '=', 'arvados'],
115 ['script', '=', 'crunchrunner'],
116 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
117 ['docker_image_locator', 'in docker', 'arvados/jobs']]
120 runner.api.links().create.assert_called_with(
121 body=JsonDiffMatcher({
122 'link_class': 'permission',
124 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
125 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
128 # Simulate an API excepction when trying to create a
129 # sharing link on the job
130 runner.api.links().create.side_effect = ApiError(
131 mock.MagicMock(return_value={'status': 403}),
133 j.run(runtimeContext)
135 assert not runner.api.links().create.called
137 # The test passes some fields in builder.resources
138 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
139 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
140 def test_resource_requirements(self, list_images_in_arv):
141 runner = mock.MagicMock()
142 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
143 runner.ignore_docker_for_reuse = False
144 runner.num_retries = 0
145 arvados_cwl.add_arv_hints()
147 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
148 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
154 "class": "ResourceRequirement",
159 "class": "http://arvados.org/cwl#RuntimeConstraints",
161 "outputDirType": "keep_output_dir"
163 "class": "http://arvados.org/cwl#APIRequirement",
166 "class": "http://arvados.org/cwl#ReuseRequirement",
171 "class": "CommandLineTool"
174 loadingContext, runtimeContext = self.helper(runner)
176 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
177 arvtool.formatgraph = None
178 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
179 j.run(runtimeContext)
180 runner.api.jobs().create.assert_called_with(
181 body=JsonDiffMatcher({
182 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
183 'runtime_constraints': {},
184 'script_parameters': {
186 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
187 'task.keepTmpOutput': True,
191 'script_version': 'master',
192 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
193 'repository': 'arvados',
194 'script': 'crunchrunner',
195 'runtime_constraints': {
196 'docker_image': 'arvados/jobs',
197 'min_cores_per_node': 3,
198 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
199 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
200 'keep_cache_mb_per_task': 512
203 find_or_create=False,
204 filters=[['repository', '=', 'arvados'],
205 ['script', '=', 'crunchrunner'],
206 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
207 ['docker_image_locator', 'in docker', 'arvados/jobs']])
209 @mock.patch("arvados.collection.CollectionReader")
210 def test_done(self, reader):
211 api = mock.MagicMock()
213 runner = mock.MagicMock()
215 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
216 runner.num_retries = 0
217 runner.ignore_docker_for_reuse = False
219 reader().open.return_value = io.StringIO(
220 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
221 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
222 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
224 api.collections().list().execute.side_effect = ({"items": []},
225 {"items": [{"manifest_text": "XYZ"}]},
227 {"items": [{"manifest_text": "ABC"}]})
229 arvjob = arvados_cwl.ArvadosJob(runner,
236 arvjob.output_callback = mock.MagicMock()
237 arvjob.collect_outputs = mock.MagicMock()
238 arvjob.collect_outputs.return_value = {"out": "stuff"}
242 "output": "99999999999999999999999999999993+99",
243 "log": "99999999999999999999999999999994+99",
244 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
247 api.collections().list.assert_has_calls([
249 # Output collection check
250 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
251 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
252 ['name', '=', 'Output 9999999 of testjob']]),
253 mock.call().execute(num_retries=0),
254 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
255 select=['manifest_text']),
256 mock.call().execute(num_retries=0),
257 # Log collection's turn
258 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
259 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
260 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
261 mock.call().execute(num_retries=0),
262 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
263 select=['manifest_text']),
264 mock.call().execute(num_retries=0)])
266 api.collections().create.assert_has_calls([
267 mock.call(ensure_unique_name=True,
268 body={'portable_data_hash': '99999999999999999999999999999993+99',
269 'manifest_text': 'XYZ',
270 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
271 'name': 'Output 9999999 of testjob'}),
272 mock.call().execute(num_retries=0),
273 mock.call(ensure_unique_name=True,
274 body={'portable_data_hash': '99999999999999999999999999999994+99',
275 'manifest_text': 'ABC',
276 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
277 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
278 mock.call().execute(num_retries=0),
281 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
283 @mock.patch("arvados.collection.CollectionReader")
284 def test_done_use_existing_collection(self, reader):
285 api = mock.MagicMock()
287 runner = mock.MagicMock()
289 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
290 runner.num_retries = 0
292 reader().open.return_value = io.StringIO(
293 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
294 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
295 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
298 api.collections().list().execute.side_effect = (
299 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
300 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
303 arvjob = arvados_cwl.ArvadosJob(runner,
310 arvjob.output_callback = mock.MagicMock()
311 arvjob.collect_outputs = mock.MagicMock()
312 arvjob.collect_outputs.return_value = {"out": "stuff"}
316 "output": "99999999999999999999999999999993+99",
317 "log": "99999999999999999999999999999994+99",
318 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
321 api.collections().list.assert_has_calls([
324 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
325 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
326 ['name', '=', 'Output 9999999 of testjob']]),
327 mock.call().execute(num_retries=0),
329 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
330 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
331 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
332 mock.call().execute(num_retries=0)
335 self.assertFalse(api.collections().create.called)
337 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
340 class TestWorkflow(unittest.TestCase):
341 def helper(self, runner, enable_reuse=True):
342 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
344 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
345 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
347 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
348 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
349 document_loader.fetch_text = document_loader.fetcher.fetch_text
350 document_loader.check_exists = document_loader.fetcher.check_exists
352 loadingContext = arvados_cwl.context.ArvLoadingContext(
353 {"avsc_names": avsc_names,
355 "make_fs_access": make_fs_access,
356 "loader": document_loader,
357 "metadata": {"cwlVersion": "v1.0"},
358 "construct_tool_object": runner.arv_make_tool})
359 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
362 "name": "test_run_wf_"+str(enable_reuse),
363 "make_fs_access": make_fs_access,
364 "enable_reuse": enable_reuse,
367 return loadingContext, runtimeContext
369 # The test passes no builder.resources
370 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
371 @mock.patch("arvados.collection.CollectionReader")
372 @mock.patch("arvados.collection.Collection")
373 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
374 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
375 arvados_cwl.add_arv_hints()
377 api = mock.MagicMock()
378 api._rootDesc = get_rootDesc()
380 runner = arvados_cwl.executor.ArvCwlExecutor(api)
381 self.assertEqual(runner.work_api, 'jobs')
383 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
384 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
385 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
387 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
388 runner.ignore_docker_for_reuse = False
389 runner.num_retries = 0
391 loadingContext, runtimeContext = self.helper(runner)
393 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
394 metadata["cwlVersion"] = tool["cwlVersion"]
396 mockc = mock.MagicMock()
397 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
398 mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
400 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
401 arvtool.formatgraph = None
402 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
404 it.next().run(runtimeContext)
405 it.next().run(runtimeContext)
407 with open("tests/wf/scatter2_subwf.cwl") as f:
408 subwf = StripYAMLComments(f.read())
410 runner.api.jobs().create.assert_called_with(
411 body=JsonDiffMatcher({
412 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
413 'repository': 'arvados',
414 'script_version': 'master',
415 'script': 'crunchrunner',
416 'script_parameters': {
417 'tasks': [{'task.env': {
418 'HOME': '$(task.outdir)',
419 'TMPDIR': '$(task.tmpdir)'},
421 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
422 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
424 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
425 'task.stdout': 'cwl.output.json'}]},
426 'runtime_constraints': {
427 'min_scratch_mb_per_node': 2048,
428 'min_cores_per_node': 1,
429 'docker_image': 'arvados/jobs',
430 'min_ram_mb_per_node': 1024
432 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
433 filters=[['repository', '=', 'arvados'],
434 ['script', '=', 'crunchrunner'],
435 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
436 ['docker_image_locator', 'in docker', 'arvados/jobs']],
439 mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
440 mockc.open().__enter__().write.assert_has_calls([mock.call(
443 "basename": "token.txt",
445 "location": "/keep/99999999999999999999999999999999+118/token.txt",
451 # The test passes no builder.resources
452 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
453 @mock.patch("arvados.collection.CollectionReader")
454 @mock.patch("arvados.collection.Collection")
455 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
456 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
457 arvados_cwl.add_arv_hints()
459 api = mock.MagicMock()
460 api._rootDesc = get_rootDesc()
462 runner = arvados_cwl.executor.ArvCwlExecutor(api)
463 self.assertEqual(runner.work_api, 'jobs')
465 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
466 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
467 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
469 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
470 runner.ignore_docker_for_reuse = False
471 runner.num_retries = 0
473 loadingContext, runtimeContext = self.helper(runner)
475 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
476 metadata["cwlVersion"] = tool["cwlVersion"]
478 mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
480 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
481 arvtool.formatgraph = None
482 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
483 it.next().run(runtimeContext)
484 it.next().run(runtimeContext)
486 with open("tests/wf/echo-subwf.cwl") as f:
487 subwf = StripYAMLComments(f.read())
489 runner.api.jobs().create.assert_called_with(
490 body=JsonDiffMatcher({
491 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
492 'repository': 'arvados',
493 'script_version': 'master',
494 'script': 'crunchrunner',
495 'script_parameters': {
496 'tasks': [{'task.env': {
497 'HOME': '$(task.outdir)',
498 'TMPDIR': '$(task.tmpdir)'},
500 'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
501 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
503 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
504 'task.stdout': 'cwl.output.json'}]},
505 'runtime_constraints': {
506 'min_scratch_mb_per_node': 4096,
507 'min_cores_per_node': 3,
508 'docker_image': 'arvados/jobs',
509 'min_ram_mb_per_node': 1024
511 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
512 filters=[['repository', '=', 'arvados'],
513 ['script', '=', 'crunchrunner'],
514 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
515 ['docker_image_locator', 'in docker', 'arvados/jobs']],
518 def test_default_work_api(self):
519 arvados_cwl.add_arv_hints()
521 api = mock.MagicMock()
522 api._rootDesc = copy.deepcopy(get_rootDesc())
523 del api._rootDesc.get('resources')['jobs']['methods']['create']
524 runner = arvados_cwl.executor.ArvCwlExecutor(api)
525 self.assertEqual(runner.work_api, 'containers')