1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
17 import cwltool.process
18 from arvados.errors import ApiError
19 from schema_salad.ref_resolver import Loader
20 from schema_salad.sourceline import cmap
21 from .mock_discovery import get_rootDesc
22 from .matcher import JsonDiffMatcher, StripYAMLComments
24 if not os.getenv('ARVADOS_DEBUG'):
25 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
26 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
28 class MockDateTime(datetime.datetime):
31 return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
33 datetime.datetime = MockDateTime
35 class TestJob(unittest.TestCase):
37 def helper(self, runner, enable_reuse=True):
38 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
40 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
41 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
42 loadingContext = arvados_cwl.context.ArvLoadingContext(
43 {"avsc_names": avsc_names,
45 "make_fs_access": make_fs_access,
47 "metadata": {"cwlVersion": "v1.0"},
48 "makeTool": runner.arv_make_tool})
49 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
52 "name": "test_run_job_"+str(enable_reuse),
53 "make_fs_access": make_fs_access,
54 "enable_reuse": enable_reuse,
57 return loadingContext, runtimeContext
59 # The test passes no builder.resources
60 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
61 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
62 def test_run(self, list_images_in_arv):
63 for enable_reuse in (True, False):
64 runner = mock.MagicMock()
65 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
66 runner.ignore_docker_for_reuse = False
67 runner.num_retries = 0
69 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
70 runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
71 # Simulate reused job from another project so that we can check is a can_read
73 runner.api.jobs().create().execute.return_value = {
74 'state': 'Complete' if enable_reuse else 'Queued',
75 'owner_uuid': 'zzzzz-tpzed-yyyyyyyyyyyyyyy' if enable_reuse else 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
76 'uuid': 'zzzzz-819sb-yyyyyyyyyyyyyyy',
84 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
86 "class": "CommandLineTool"
89 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
91 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
92 arvtool.formatgraph = None
93 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
95 runner.api.jobs().create.assert_called_with(
96 body=JsonDiffMatcher({
97 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
98 'runtime_constraints': {},
99 'script_parameters': {
101 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
102 'command': ['ls', '$(task.outdir)']
105 'script_version': 'master',
106 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
107 'repository': 'arvados',
108 'script': 'crunchrunner',
109 'runtime_constraints': {
110 'docker_image': 'arvados/jobs',
111 'min_cores_per_node': 1,
112 'min_ram_mb_per_node': 1024,
113 'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
116 find_or_create=enable_reuse,
117 filters=[['repository', '=', 'arvados'],
118 ['script', '=', 'crunchrunner'],
119 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
120 ['docker_image_locator', 'in docker', 'arvados/jobs']]
123 runner.api.links().create.assert_called_with(
124 body=JsonDiffMatcher({
125 'link_class': 'permission',
127 "tail_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
128 "head_uuid": "zzzzz-819sb-yyyyyyyyyyyyyyy",
131 # Simulate an API excepction when trying to create a
132 # sharing link on the job
133 runner.api.links().create.side_effect = ApiError(
134 mock.MagicMock(return_value={'status': 403}),
136 j.run(runtimeContext)
138 assert not runner.api.links().create.called
140 # The test passes some fields in builder.resources
141 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
142 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
143 def test_resource_requirements(self, list_images_in_arv):
144 runner = mock.MagicMock()
145 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
146 runner.ignore_docker_for_reuse = False
147 runner.num_retries = 0
148 arvados_cwl.add_arv_hints()
150 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
151 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
157 "class": "ResourceRequirement",
162 "class": "http://arvados.org/cwl#RuntimeConstraints",
164 "outputDirType": "keep_output_dir"
166 "class": "http://arvados.org/cwl#APIRequirement",
169 "class": "http://arvados.org/cwl#ReuseRequirement",
174 "class": "CommandLineTool"
177 loadingContext, runtimeContext = self.helper(runner)
179 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
180 arvtool.formatgraph = None
181 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
182 j.run(runtimeContext)
183 runner.api.jobs().create.assert_called_with(
184 body=JsonDiffMatcher({
185 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
186 'runtime_constraints': {},
187 'script_parameters': {
189 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
190 'task.keepTmpOutput': True,
194 'script_version': 'master',
195 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
196 'repository': 'arvados',
197 'script': 'crunchrunner',
198 'runtime_constraints': {
199 'docker_image': 'arvados/jobs',
200 'min_cores_per_node': 3,
201 'min_ram_mb_per_node': 3512, # ramMin + keep_cache
202 'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
203 'keep_cache_mb_per_task': 512
206 find_or_create=False,
207 filters=[['repository', '=', 'arvados'],
208 ['script', '=', 'crunchrunner'],
209 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
210 ['docker_image_locator', 'in docker', 'arvados/jobs']])
212 @mock.patch("arvados.collection.CollectionReader")
213 def test_done(self, reader):
214 api = mock.MagicMock()
216 runner = mock.MagicMock()
218 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
219 runner.num_retries = 0
220 runner.ignore_docker_for_reuse = False
222 reader().open.return_value = StringIO.StringIO(
223 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
224 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
225 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
227 api.collections().list().execute.side_effect = ({"items": []},
228 {"items": [{"manifest_text": "XYZ"}]},
230 {"items": [{"manifest_text": "ABC"}]})
232 arvjob = arvados_cwl.ArvadosJob(runner,
239 arvjob.output_callback = mock.MagicMock()
240 arvjob.collect_outputs = mock.MagicMock()
241 arvjob.collect_outputs.return_value = {"out": "stuff"}
245 "output": "99999999999999999999999999999993+99",
246 "log": "99999999999999999999999999999994+99",
247 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
250 api.collections().list.assert_has_calls([
252 # Output collection check
253 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
254 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
255 ['name', '=', 'Output 9999999 of testjob']]),
256 mock.call().execute(num_retries=0),
257 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
258 select=['manifest_text']),
259 mock.call().execute(num_retries=0),
260 # Log collection's turn
261 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
262 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
263 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
264 mock.call().execute(num_retries=0),
265 mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999994+99']],
266 select=['manifest_text']),
267 mock.call().execute(num_retries=0)])
269 api.collections().create.assert_has_calls([
270 mock.call(ensure_unique_name=True,
271 body={'portable_data_hash': '99999999999999999999999999999993+99',
272 'manifest_text': 'XYZ',
273 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
274 'name': 'Output 9999999 of testjob'}),
275 mock.call().execute(num_retries=0),
276 mock.call(ensure_unique_name=True,
277 body={'portable_data_hash': '99999999999999999999999999999994+99',
278 'manifest_text': 'ABC',
279 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
280 'name': 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
281 mock.call().execute(num_retries=0),
284 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
286 @mock.patch("arvados.collection.CollectionReader")
287 def test_done_use_existing_collection(self, reader):
288 api = mock.MagicMock()
290 runner = mock.MagicMock()
292 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
293 runner.num_retries = 0
295 reader().open.return_value = StringIO.StringIO(
296 """2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.tmpdir)=/tmp/crunch-job-task-work/compute3.1/tmpdir
297 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.outdir)=/tmp/crunch-job-task-work/compute3.1/outdir
298 2016-11-02_23:12:18 c97qk-8i9sb-cryqw2blvzy4yaj 13358 0 stderr 2016/11/02 23:12:18 crunchrunner: $(task.keep)=/keep
301 api.collections().list().execute.side_effect = (
302 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
303 {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
306 arvjob = arvados_cwl.ArvadosJob(runner,
313 arvjob.output_callback = mock.MagicMock()
314 arvjob.collect_outputs = mock.MagicMock()
315 arvjob.collect_outputs.return_value = {"out": "stuff"}
319 "output": "99999999999999999999999999999993+99",
320 "log": "99999999999999999999999999999994+99",
321 "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
324 api.collections().list.assert_has_calls([
327 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
328 ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
329 ['name', '=', 'Output 9999999 of testjob']]),
330 mock.call().execute(num_retries=0),
332 mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
333 ['portable_data_hash', '=', '99999999999999999999999999999994+99'],
334 ['name', '=', 'Log of zzzzz-8i9sb-zzzzzzzzzzzzzzz']]),
335 mock.call().execute(num_retries=0)
338 self.assertFalse(api.collections().create.called)
340 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
342 def test_get_intermediate_collection_info(self):
343 arvrunner = mock.MagicMock()
344 arvrunner.intermediate_output_ttl = 60
345 arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
347 job = arvados_cwl.ArvadosJob(arvrunner)
349 info = job._get_intermediate_collection_info()
351 self.assertEqual(info["name"], "Intermediate collection")
352 self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
353 self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
357 class TestWorkflow(unittest.TestCase):
358 def helper(self, runner, enable_reuse=True):
359 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
361 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
362 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
364 document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
365 document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
366 document_loader.fetch_text = document_loader.fetcher.fetch_text
367 document_loader.check_exists = document_loader.fetcher.check_exists
369 loadingContext = arvados_cwl.context.ArvLoadingContext(
370 {"avsc_names": avsc_names,
372 "make_fs_access": make_fs_access,
373 "loader": document_loader,
374 "metadata": {"cwlVersion": "v1.0"},
375 "construct_tool_object": runner.arv_make_tool})
376 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
379 "name": "test_run_wf_"+str(enable_reuse),
380 "make_fs_access": make_fs_access,
381 "enable_reuse": enable_reuse,
384 return loadingContext, runtimeContext
386 # The test passes no builder.resources
387 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
388 @mock.patch("arvados.collection.CollectionReader")
389 @mock.patch("arvados.collection.Collection")
390 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
391 def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
392 arvados_cwl.add_arv_hints()
394 api = mock.MagicMock()
395 api._rootDesc = get_rootDesc()
397 runner = arvados_cwl.ArvCwlRunner(api)
398 self.assertEqual(runner.work_api, 'jobs')
400 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
401 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
402 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
404 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
405 runner.ignore_docker_for_reuse = False
406 runner.num_retries = 0
408 loadingContext, runtimeContext = self.helper(runner)
410 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
411 metadata["cwlVersion"] = tool["cwlVersion"]
413 mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
415 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
416 arvtool.formatgraph = None
417 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
419 it.next().run(runtimeContext)
420 it.next().run(runtimeContext)
422 with open("tests/wf/scatter2_subwf.cwl") as f:
423 subwf = StripYAMLComments(f.read())
425 runner.api.jobs().create.assert_called_with(
426 body=JsonDiffMatcher({
427 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
428 'repository': 'arvados',
429 'script_version': 'master',
430 'script': 'crunchrunner',
431 'script_parameters': {
432 'tasks': [{'task.env': {
433 'HOME': '$(task.outdir)',
434 'TMPDIR': '$(task.tmpdir)'},
436 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
437 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
439 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
440 'task.stdout': 'cwl.output.json'}]},
441 'runtime_constraints': {
442 'min_scratch_mb_per_node': 2048,
443 'min_cores_per_node': 1,
444 'docker_image': 'arvados/jobs',
445 'min_ram_mb_per_node': 1024
447 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
448 filters=[['repository', '=', 'arvados'],
449 ['script', '=', 'crunchrunner'],
450 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
451 ['docker_image_locator', 'in docker', 'arvados/jobs']],
454 mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
455 mockcollection().open().__enter__().write.assert_has_calls([mock.call(
458 "basename": "token.txt",
460 "location": "/keep/99999999999999999999999999999999+118/token.txt"
465 # The test passes no builder.resources
466 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
467 @mock.patch("arvados.collection.CollectionReader")
468 @mock.patch("arvados.collection.Collection")
469 @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
470 def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
471 arvados_cwl.add_arv_hints()
473 api = mock.MagicMock()
474 api._rootDesc = get_rootDesc()
476 runner = arvados_cwl.ArvCwlRunner(api)
477 self.assertEqual(runner.work_api, 'jobs')
479 list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
480 runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
481 runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
483 runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
484 runner.ignore_docker_for_reuse = False
485 runner.num_retries = 0
487 loadingContext, runtimeContext = self.helper(runner)
489 tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
490 metadata["cwlVersion"] = tool["cwlVersion"]
492 mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
494 arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
495 arvtool.formatgraph = None
496 it = arvtool.job({}, mock.MagicMock(), runtimeContext)
497 it.next().run(runtimeContext)
498 it.next().run(runtimeContext)
500 with open("tests/wf/echo-subwf.cwl") as f:
501 subwf = StripYAMLComments(f.read())
503 runner.api.jobs().create.assert_called_with(
504 body=JsonDiffMatcher({
505 'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
506 'repository': 'arvados',
507 'script_version': 'master',
508 'script': 'crunchrunner',
509 'script_parameters': {
510 'tasks': [{'task.env': {
511 'HOME': '$(task.outdir)',
512 'TMPDIR': '$(task.tmpdir)'},
514 'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
515 'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
517 'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
518 'task.stdout': 'cwl.output.json'}]},
519 'runtime_constraints': {
520 'min_scratch_mb_per_node': 4096,
521 'min_cores_per_node': 3,
522 'docker_image': 'arvados/jobs',
523 'min_ram_mb_per_node': 1024
525 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
526 filters=[['repository', '=', 'arvados'],
527 ['script', '=', 'crunchrunner'],
528 ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
529 ['docker_image_locator', 'in docker', 'arvados/jobs']],
532 def test_default_work_api(self):
533 arvados_cwl.add_arv_hints()
535 api = mock.MagicMock()
536 api._rootDesc = copy.deepcopy(get_rootDesc())
537 del api._rootDesc.get('resources')['jobs']['methods']['create']
538 runner = arvados_cwl.ArvCwlRunner(api)
539 self.assertEqual(runner.work_api, 'containers')