1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
17 import cwltool.process
18 from arvados.errors import ApiError
19 from schema_salad.ref_resolver import Loader
20 from schema_salad.sourceline import cmap
21 from .mock_discovery import get_rootDesc
22 from .matcher import JsonDiffMatcher, StripYAMLComments
24 if not os.getenv('ARVADOS_DEBUG'):
25 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
26 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
28 class MockDateTime(datetime.datetime):
31 return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
33 datetime.datetime = MockDateTime
35 class TestJob(unittest.TestCase):
37 # The test passes no builder.resources
38 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
39 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
40 def test_run(self, list_images_in_arv):
41 for enable_reuse in (True, False):
42 runner = mock.MagicMock()
43 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
44 runner.ignore_docker_for_reuse = False
45 runner.num_retries = 0
46 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
48 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
49 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
50 # Simulate reused job from another project so that we can check is a can_read
52 runner.api.jobs().create().execute.return_value = {
53 'state': 'Complete' if enable_reuse else 'Queued',
54 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
55 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
63 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
65 "class": "CommandLineTool"
67 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
68 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
69 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
70 basedir="", make_fs_access=make_fs_access, loader=Loader({}),
71 metadata={"cwlVersion": "v1.0"})
72 arvtool.formatgraph = None
73 for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
74 j.run(enable_reuse=enable_reuse)
75 runner.api.jobs().create.assert_called_with(
76 body=JsonDiffMatcher({
77 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
78 'runtime_constraints': {},
79 'script_parameters': {
81 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
82 'command': ['ls', '$(task.outdir)']
85 'script_version': 'master',
86 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
87 'repository': 'arvados',
88 'script': 'crunchrunner',
89 'runtime_constraints': {
90 'docker_image': 'arvados/jobs',
91 'min_cores_per_node': 1,
92 'min_ram_mb_per_node': 1024,
93 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
96 find_or_create=enable_reuse,
97 filters=[['repository', '=', 'arvados'],
98 ['script', '=', 'crunchrunner'],
99 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
100 ['docker_image_locator', 'in docker', 'arvados/jobs']]
103 runner.api.links().create.assert_called_with(
104 body=JsonDiffMatcher({
105 'link_class': 'permission',
107 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
108 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
111 # Simulate an API excepction when trying to create a
112 # sharing link on the job
113 runner.api.links().create.side_effect = ApiError(
114 mock.MagicMock(return_value={'status': 403}),
116 j.run(enable_reuse=enable_reuse)
118 assert not runner.api.links().create.called
120 # The test passes some fields in builder.resources
121 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
122 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
123 def test_resource_requirements(self, list_images_in_arv):
124 runner = mock.MagicMock()
125 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
126 runner.ignore_docker_for_reuse = False
127 runner.num_retries = 0
128 arvados_cwl.add_arv_hints()
130 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
131 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
133 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
140 "class": "ResourceRequirement",
145 "class": "http://arvados.org/cwl#RuntimeConstraints",
147 "outputDirType": "keep_output_dir"
149 "class": "http://arvados.org/cwl#APIRequirement",
152 "class": "http://arvados.org/cwl#ReuseRequirement",
157 "class": "CommandLineTool"
159 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
160 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
161 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
162 make_fs_access=make_fs_access, loader=Loader({}),
163 metadata={"cwlVersion": "v1.0"})
164 arvtool.formatgraph = None
165 for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
166 j.run(enable_reuse=True)
167 runner.api.jobs().create.assert_called_with(
168 body=JsonDiffMatcher({
169 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
170 'runtime_constraints': {},
171 'script_parameters': {
173 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
174 'task.keepTmpOutput': True,
178 'script_version': 'master',
179 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
180 'repository': 'arvados',
181 'script': 'crunchrunner',
182 'runtime_constraints': {
183 'docker_image': 'arvados/jobs',
184 'min_cores_per_node': 3,
185 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
186 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
187 'keep_cache_mb_per_task': 512
190 find_or_create=False,
191 filters=[['repository', '=', 'arvados'],
192 ['script', '=', 'crunchrunner'],
193 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
194 ['docker_image_locator', 'in docker', 'arvados/jobs']])
196 @mock.patch("arvados.collection.CollectionReader")
197 def test_done(self, reader):
198 api = mock.MagicMock()
200 runner = mock.MagicMock()
202 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
203 runner.num_retries = 0
204 runner.ignore_docker_for_reuse = False
206 reader().open.return_value = StringIO.StringIO(
207 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
208 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
209 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
211 api.collections().list().execute.side_effect = ({"items": []},
212 {"items": [{"manifest_text": "XYZ"}]},
214 {"items": [{"manifest_text": "ABC"}]})
216 arvjob = arvados_cwl.ArvadosJob(runner)
217 arvjob.name = "testjob"
218 arvjob.builder = mock.MagicMock()
219 arvjob.output_callback = mock.MagicMock()
220 arvjob.collect_outputs = mock.MagicMock()
221 arvjob.collect_outputs.return_value = {"out": "stuff"}
225 "output": "99999999999999999999999999999993+99",
226 "log": "99999999999999999999999999999994+99",
227 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
230 api.collections().list.assert_has_calls([
232 # Output collection check
233 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
234 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
235 ['name', '=', 'Output 9999999 of testjob']]),
236 mock.call().execute(num_retries=0),
237 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
238 select=['manifest_text']),
239 mock.call().execute(num_retries=0),
240 # Log collection's turn
241 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
242 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
243 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
244 mock.call().execute(num_retries=0),
245 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
246 select=['manifest_text']),
247 mock.call().execute(num_retries=0)])
249 api.collections().create.assert_has_calls([
250 mock.call(ensure_unique_name=True,
251 body={'portable_data_hash': '99999999999999999999999999999993+99',
252 'manifest_text': 'XYZ',
253 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
254 'name': 'Output 9999999 of testjob'}),
255 mock.call().execute(num_retries=0),
256 mock.call(ensure_unique_name=True,
257 body={'portable_data_hash': '99999999999999999999999999999994+99',
258 'manifest_text': 'ABC',
259 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
260 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
261 mock.call().execute(num_retries=0),
264 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
266 @mock.patch("arvados.collection.CollectionReader")
267 def test_done_use_existing_collection(self, reader):
268 api = mock.MagicMock()
270 runner = mock.MagicMock()
272 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
273 runner.num_retries = 0
275 reader().open.return_value = StringIO.StringIO(
276 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
277 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
278 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
281 api.collections().list().execute.side_effect = (
282 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
283 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
286 arvjob = arvados_cwl.ArvadosJob(runner)
287 arvjob.name = "testjob"
288 arvjob.builder = mock.MagicMock()
289 arvjob.output_callback = mock.MagicMock()
290 arvjob.collect_outputs = mock.MagicMock()
291 arvjob.collect_outputs.return_value = {"out": "stuff"}
295 "output": "99999999999999999999999999999993+99",
296 "log": "99999999999999999999999999999994+99",
297 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
300 api.collections().list.assert_has_calls([
303 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
304 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
305 ['name', '=', 'Output 9999999 of testjob']]),
306 mock.call().execute(num_retries=0),
308 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
309 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
310 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
311 mock.call().execute(num_retries=0)
314 self.assertFalse(api.collections().create.called)
316 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
318 def test_get_intermediate_collection_info(self):
319 arvrunner = mock.MagicMock()
320 arvrunner.intermediate_output_ttl = 60
321 arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
323 job = arvados_cwl.ArvadosJob(arvrunner)
325 info = job._get_intermediate_collection_info()
327 self.assertEqual(info["name"], "Intermediate collection")
328 self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
329 self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
333 class TestWorkflow(unittest.TestCase):
334 # The test passes no builder.resources
335 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
336 @mock.patch("arvados.collection.CollectionReader")
337 @mock.patch("arvados.collection.Collection")
338 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
339 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
340 arvados_cwl.add_arv_hints()
342 api = mock.MagicMock()
343 api._rootDesc = get_rootDesc()
345 runner = arvados_cwl.ArvCwlRunner(api)
346 self.assertEqual(runner.work_api, 'jobs')
348 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
349 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
350 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
352 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
353 runner.ignore_docker_for_reuse = False
354 runner.num_retries = 0
355 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
357 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
358 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
359 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
360 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
361 document_loader.fetch_text = document_loader.fetcher.fetch_text
362 document_loader.check_exists = document_loader.fetcher.check_exists
364 tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
365 metadata["cwlVersion"] = tool["cwlVersion"]
367 mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
369 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
370 basedir="", make_fs_access=make_fs_access, loader=document_loader,
371 makeTool=runner.arv_make_tool, metadata=metadata)
372 arvtool.formatgraph = None
373 it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
377 with open("tests/wf/scatter2_subwf.cwl") as f:
378 subwf = StripYAMLComments(f.read())
380 runner.api.jobs().create.assert_called_with(
381 body=JsonDiffMatcher({
382 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
383 'repository': 'arvados',
384 'script_version': 'master',
385 'script': 'crunchrunner',
386 'script_parameters': {
387 'tasks': [{'task.env': {
388 'HOME': '$(task.outdir)',
389 'TMPDIR': '$(task.tmpdir)'},
391 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
392 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
394 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
395 'task.stdout': 'cwl.output.json'}]},
396 'runtime_constraints': {
397 'min_scratch_mb_per_node': 2048,
398 'min_cores_per_node': 1,
399 'docker_image': 'arvados/jobs',
400 'min_ram_mb_per_node': 1024
402 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
403 filters=[['repository', '=', 'arvados'],
404 ['script', '=', 'crunchrunner'],
405 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
406 ['docker_image_locator', 'in docker', 'arvados/jobs']],
409 mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
410 mockcollection().open().__enter__().write.assert_has_calls([mock.call(
413 "basename": "token.txt",
415 "location": "/keep/99999999999999999999999999999999+118/token.txt"
420 # The test passes no builder.resources
421 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
422 @mock.patch("arvados.collection.CollectionReader")
423 @mock.patch("arvados.collection.Collection")
424 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
425 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
426 arvados_cwl.add_arv_hints()
428 api = mock.MagicMock()
429 api._rootDesc = get_rootDesc()
431 runner = arvados_cwl.ArvCwlRunner(api)
432 self.assertEqual(runner.work_api, 'jobs')
434 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
435 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
436 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
438 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
439 runner.ignore_docker_for_reuse = False
440 runner.num_retries = 0
441 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
443 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
444 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
445 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
446 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
447 document_loader.fetch_text = document_loader.fetcher.fetch_text
448 document_loader.check_exists = document_loader.fetcher.check_exists
450 tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
451 metadata["cwlVersion"] = tool["cwlVersion"]
453 mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
455 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
456 basedir="", make_fs_access=make_fs_access, loader=document_loader,
457 makeTool=runner.arv_make_tool, metadata=metadata)
458 arvtool.formatgraph = None
459 it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
463 with open("tests/wf/echo-subwf.cwl") as f:
464 subwf = StripYAMLComments(f.read())
466 runner.api.jobs().create.assert_called_with(
467 body=JsonDiffMatcher({
468 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
469 'repository': 'arvados',
470 'script_version': 'master',
471 'script': 'crunchrunner',
472 'script_parameters': {
473 'tasks': [{'task.env': {
474 'HOME': '$(task.outdir)',
475 'TMPDIR': '$(task.tmpdir)'},
477 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
478 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
480 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
481 'task.stdout': 'cwl.output.json'}]},
482 'runtime_constraints': {
483 'min_scratch_mb_per_node': 4096,
484 'min_cores_per_node': 3,
485 'docker_image': 'arvados/jobs',
486 'min_ram_mb_per_node': 1024
488 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
489 filters=[['repository', '=', 'arvados'],
490 ['script', '=', 'crunchrunner'],
491 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
492 ['docker_image_locator', 'in docker', 'arvados/jobs']],
495 def test_default_work_api(self):
496 arvados_cwl.add_arv_hints()
498 api = mock.MagicMock()
499 api._rootDesc = copy.deepcopy(get_rootDesc())
500 del api._rootDesc.get('resources')['jobs']['methods']['create']
501 runner = arvados_cwl.ArvCwlRunner(api)
502 self.assertEqual(runner.work_api, 'containers')