1 from builtins import str
2 from builtins import object
3 # Copyright (C) The Arvados Authors. All rights reserved.
5 # SPDX-License-Identifier: Apache-2.0
8 import arvados_cwl.context
9 import arvados_cwl.util
10 from arvados_cwl.arvdocker import arv_docker_clear_cache
18 import cwltool.process
19 import cwltool.secrets
20 from schema_salad.ref_resolver import Loader
21 from schema_salad.sourceline import cmap
23 from .matcher import JsonDiffMatcher
24 from .mock_discovery import get_rootDesc
26 if not os.getenv('ARVADOS_DEBUG'):
27 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
28 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
30 class CollectionMock(object):
31 def __init__(self, vwdmock, *args, **kwargs):
32 self.vwdmock = vwdmock
35 def open(self, *args, **kwargs):
37 return self.vwdmock.open(*args, **kwargs)
39 def copy(self, *args, **kwargs):
41 self.vwdmock.copy(*args, **kwargs)
43 def save_new(self, *args, **kwargs):
49 def portable_data_hash(self):
51 return arvados.config.EMPTY_BLOCK_LOCATOR
53 return "99999999999999999999999999999996+99"
56 class TestContainer(unittest.TestCase):
58 def helper(self, runner, enable_reuse=True):
59 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
61 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
62 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
63 loadingContext = arvados_cwl.context.ArvLoadingContext(
64 {"avsc_names": avsc_names,
66 "make_fs_access": make_fs_access,
68 "metadata": {"cwlVersion": "v1.0"}})
69 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
70 {"work_api": "containers",
72 "name": "test_run_"+str(enable_reuse),
73 "make_fs_access": make_fs_access,
75 "enable_reuse": enable_reuse,
77 "project_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
80 return loadingContext, runtimeContext
82 # The test passes no builder.resources
83 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
84 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
85 def test_run(self, keepdocker):
86 for enable_reuse in (True, False):
87 arv_docker_clear_cache()
89 runner = mock.MagicMock()
90 runner.ignore_docker_for_reuse = False
91 runner.intermediate_output_ttl = 0
92 runner.secret_store = cwltool.secrets.SecretStore()
94 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
95 runner.api.collections().get().execute.return_value = {
96 "portable_data_hash": "99999999999999999999999999999993+99"}
102 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
104 "class": "CommandLineTool"
107 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
109 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
110 arvtool.formatgraph = None
112 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
113 j.run(runtimeContext)
114 runner.api.container_requests().create.assert_called_with(
115 body=JsonDiffMatcher({
117 'HOME': '/var/spool/cwl',
120 'name': 'test_run_'+str(enable_reuse),
121 'runtime_constraints': {
125 'use_existing': enable_reuse,
128 '/tmp': {'kind': 'tmp',
129 "capacity": 1073741824
131 '/var/spool/cwl': {'kind': 'tmp',
132 "capacity": 1073741824 }
134 'state': 'Committed',
135 'output_name': 'Output for step test_run_'+str(enable_reuse),
136 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
137 'output_path': '/var/spool/cwl',
139 'container_image': '99999999999999999999999999999993+99',
140 'command': ['ls', '/var/spool/cwl'],
141 'cwd': '/var/spool/cwl',
142 'scheduling_parameters': {},
147 # The test passes some fields in builder.resources
148 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
149 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
150 def test_resource_requirements(self, keepdocker):
151 arv_docker_clear_cache()
152 runner = mock.MagicMock()
153 runner.ignore_docker_for_reuse = False
154 runner.intermediate_output_ttl = 3600
155 runner.secret_store = cwltool.secrets.SecretStore()
157 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
158 runner.api.collections().get().execute.return_value = {
159 "portable_data_hash": "99999999999999999999999999999993+99"}
165 "class": "ResourceRequirement",
171 "class": "http://arvados.org/cwl#RuntimeConstraints",
174 "class": "http://arvados.org/cwl#APIRequirement",
176 "class": "http://arvados.org/cwl#PartitionRequirement",
179 "class": "http://arvados.org/cwl#IntermediateOutput",
182 "class": "http://arvados.org/cwl#ReuseRequirement",
187 "class": "CommandLineTool"
190 loadingContext, runtimeContext = self.helper(runner)
191 runtimeContext.name = "test_resource_requirements"
193 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
194 arvtool.formatgraph = None
195 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
196 j.run(runtimeContext)
198 call_args, call_kwargs = runner.api.container_requests().create.call_args
200 call_body_expected = {
202 'HOME': '/var/spool/cwl',
205 'name': 'test_resource_requirements',
206 'runtime_constraints': {
209 'keep_cache_ram': 536870912,
212 'use_existing': False,
215 '/tmp': {'kind': 'tmp',
216 "capacity": 4194304000 },
217 '/var/spool/cwl': {'kind': 'tmp',
218 "capacity": 5242880000 }
220 'state': 'Committed',
221 'output_name': 'Output for step test_resource_requirements',
222 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
223 'output_path': '/var/spool/cwl',
225 'container_image': '99999999999999999999999999999993+99',
227 'cwd': '/var/spool/cwl',
228 'scheduling_parameters': {
229 'partitions': ['blurb']
235 call_body = call_kwargs.get('body', None)
236 self.assertNotEqual(None, call_body)
237 for key in call_body:
238 self.assertEqual(call_body_expected.get(key), call_body.get(key))
241 # The test passes some fields in builder.resources
242 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
243 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
244 @mock.patch("arvados.collection.Collection")
245 def test_initial_work_dir(self, collection_mock, keepdocker):
246 arv_docker_clear_cache()
247 runner = mock.MagicMock()
248 runner.ignore_docker_for_reuse = False
249 runner.intermediate_output_ttl = 0
250 runner.secret_store = cwltool.secrets.SecretStore()
252 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
253 runner.api.collections().get().execute.return_value = {
254 "portable_data_hash": "99999999999999999999999999999993+99"}
256 sourcemock = mock.MagicMock()
257 def get_collection_mock(p):
259 return (sourcemock, p.split("/", 1)[1])
261 return (sourcemock, "")
262 runner.fs_access.get_collection.side_effect = get_collection_mock
264 vwdmock = mock.MagicMock()
265 collection_mock.side_effect = lambda *args, **kwargs: CollectionMock(vwdmock, *args, **kwargs)
271 "class": "InitialWorkDirRequirement",
275 "location": "keep:99999999999999999999999999999995+99/bar"
278 "class": "Directory",
280 "location": "keep:99999999999999999999999999999995+99"
284 "basename": "filename",
285 "location": "keep:99999999999999999999999999999995+99/baz/filename"
288 "class": "Directory",
289 "basename": "subdir",
290 "location": "keep:99999999999999999999999999999995+99/subdir"
295 "class": "CommandLineTool"
298 loadingContext, runtimeContext = self.helper(runner)
299 runtimeContext.name = "test_initial_work_dir"
301 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
302 arvtool.formatgraph = None
303 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
304 j.run(runtimeContext)
306 call_args, call_kwargs = runner.api.container_requests().create.call_args
308 vwdmock.copy.assert_has_calls([mock.call('bar', 'foo', source_collection=sourcemock)])
309 vwdmock.copy.assert_has_calls([mock.call('', 'foo2', source_collection=sourcemock)])
310 vwdmock.copy.assert_has_calls([mock.call('baz/filename', 'filename', source_collection=sourcemock)])
311 vwdmock.copy.assert_has_calls([mock.call('subdir', 'subdir', source_collection=sourcemock)])
313 call_body_expected = {
315 'HOME': '/var/spool/cwl',
318 'name': 'test_initial_work_dir',
319 'runtime_constraints': {
323 'use_existing': True,
326 '/tmp': {'kind': 'tmp',
327 "capacity": 1073741824 },
328 '/var/spool/cwl': {'kind': 'tmp',
329 "capacity": 1073741824 },
330 '/var/spool/cwl/foo': {
331 'kind': 'collection',
333 'portable_data_hash': '99999999999999999999999999999996+99'
335 '/var/spool/cwl/foo2': {
336 'kind': 'collection',
338 'portable_data_hash': '99999999999999999999999999999996+99'
340 '/var/spool/cwl/filename': {
341 'kind': 'collection',
343 'portable_data_hash': '99999999999999999999999999999996+99'
345 '/var/spool/cwl/subdir': {
346 'kind': 'collection',
348 'portable_data_hash': '99999999999999999999999999999996+99'
351 'state': 'Committed',
352 'output_name': 'Output for step test_initial_work_dir',
353 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
354 'output_path': '/var/spool/cwl',
356 'container_image': '99999999999999999999999999999993+99',
358 'cwd': '/var/spool/cwl',
359 'scheduling_parameters': {
365 call_body = call_kwargs.get('body', None)
366 self.assertNotEqual(None, call_body)
367 for key in call_body:
368 self.assertEqual(call_body_expected.get(key), call_body.get(key))
371 # Test redirecting stdin/stdout/stderr
372 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
373 def test_redirects(self, keepdocker):
374 arv_docker_clear_cache()
376 runner = mock.MagicMock()
377 runner.ignore_docker_for_reuse = False
378 runner.intermediate_output_ttl = 0
379 runner.secret_store = cwltool.secrets.SecretStore()
381 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
382 runner.api.collections().get().execute.return_value = {
383 "portable_data_hash": "99999999999999999999999999999993+99"}
385 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
391 "stdout": "stdout.txt",
392 "stderr": "stderr.txt",
393 "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
394 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
396 "class": "CommandLineTool"
399 loadingContext, runtimeContext = self.helper(runner)
400 runtimeContext.name = "test_run_redirect"
402 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
403 arvtool.formatgraph = None
404 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
405 j.run(runtimeContext)
406 runner.api.container_requests().create.assert_called_with(
407 body=JsonDiffMatcher({
409 'HOME': '/var/spool/cwl',
412 'name': 'test_run_redirect',
413 'runtime_constraints': {
417 'use_existing': True,
420 '/tmp': {'kind': 'tmp',
421 "capacity": 1073741824 },
422 '/var/spool/cwl': {'kind': 'tmp',
423 "capacity": 1073741824 },
426 "path": "/var/spool/cwl/stderr.txt"
429 "kind": "collection",
431 "portable_data_hash": "99999999999999999999999999999996+99"
435 "path": "/var/spool/cwl/stdout.txt"
438 'state': 'Committed',
439 "output_name": "Output for step test_run_redirect",
440 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
441 'output_path': '/var/spool/cwl',
443 'container_image': '99999999999999999999999999999993+99',
444 'command': ['ls', '/var/spool/cwl'],
445 'cwd': '/var/spool/cwl',
446 'scheduling_parameters': {},
451 @mock.patch("arvados.collection.Collection")
452 def test_done(self, col):
453 api = mock.MagicMock()
455 runner = mock.MagicMock()
457 runner.num_retries = 0
458 runner.ignore_docker_for_reuse = False
459 runner.intermediate_output_ttl = 0
460 runner.secret_store = cwltool.secrets.SecretStore()
462 runner.api.containers().get().execute.return_value = {"state":"Complete",
466 col().open.return_value = []
468 loadingContext, runtimeContext = self.helper(runner)
470 arvjob = arvados_cwl.ArvadosContainer(runner,
478 arvjob.output_callback = mock.MagicMock()
479 arvjob.collect_outputs = mock.MagicMock()
480 arvjob.successCodes = [0]
481 arvjob.outdir = "/var/spool/cwl"
482 arvjob.output_ttl = 3600
484 arvjob.collect_outputs.return_value = {"out": "stuff"}
488 "log_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
489 "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
490 "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
491 "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
492 "modified_at": "2017-05-26T12:01:22Z"
495 self.assertFalse(api.collections().create.called)
496 self.assertFalse(runner.runtime_status_error.called)
498 arvjob.collect_outputs.assert_called_with("keep:abc+123")
499 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
500 runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
502 @mock.patch("arvados_cwl.util.get_current_container")
503 @mock.patch("arvados.collection.CollectionReader")
504 @mock.patch("arvados.collection.Collection")
505 def test_child_failure(self, col, reader, gcc_mock):
506 api = mock.MagicMock()
507 api._rootDesc = copy.deepcopy(get_rootDesc())
508 del api._rootDesc.get('resources')['jobs']['methods']['create']
510 # Set up runner with mocked runtime_status_update()
511 self.assertFalse(gcc_mock.called)
512 runtime_status_update = mock.MagicMock()
513 arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
514 runner = arvados_cwl.ArvCwlExecutor(api)
515 self.assertEqual(runner.work_api, 'containers')
517 # Make sure ArvCwlExecutor thinks it's running inside a container so it
518 # adds the logging handler that will call runtime_status_update() mock
519 gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
520 self.assertTrue(gcc_mock.called)
521 root_logger = logging.getLogger('')
522 handlerClasses = [h.__class__ for h in root_logger.handlers]
523 self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
525 runner.num_retries = 0
526 runner.ignore_docker_for_reuse = False
527 runner.intermediate_output_ttl = 0
528 runner.secret_store = cwltool.secrets.SecretStore()
529 runner.label = mock.MagicMock()
530 runner.label.return_value = '[container testjob]'
532 runner.api.containers().get().execute.return_value = {
539 col().open.return_value = []
541 loadingContext, runtimeContext = self.helper(runner)
543 arvjob = arvados_cwl.ArvadosContainer(runner,
551 arvjob.output_callback = mock.MagicMock()
552 arvjob.collect_outputs = mock.MagicMock()
553 arvjob.successCodes = [0]
554 arvjob.outdir = "/var/spool/cwl"
555 arvjob.output_ttl = 3600
556 arvjob.collect_outputs.return_value = {"out": "stuff"}
560 "log_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
561 "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
562 "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
563 "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
564 "modified_at": "2017-05-26T12:01:22Z"
567 runtime_status_update.assert_called_with(
569 'arvados.cwl-runner: [container testjob] (zzzzz-xvhdp-zzzzzzzzzzzzzzz) error log:',
570 ' ** log is empty **'
572 arvjob.output_callback.assert_called_with({"out": "stuff"}, "permanentFail")
574 # The test passes no builder.resources
575 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
576 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
577 def test_mounts(self, keepdocker):
578 arv_docker_clear_cache()
580 runner = mock.MagicMock()
581 runner.ignore_docker_for_reuse = False
582 runner.intermediate_output_ttl = 0
583 runner.secret_store = cwltool.secrets.SecretStore()
585 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
586 runner.api.collections().get().execute.return_value = {
587 "portable_data_hash": "99999999999999999999999999999994+99",
588 "manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
590 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
599 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
601 "class": "CommandLineTool"
604 loadingContext, runtimeContext = self.helper(runner)
605 runtimeContext.name = "test_run_mounts"
607 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
608 arvtool.formatgraph = None
611 "class": "Directory",
612 "location": "keep:99999999999999999999999999999994+44",
616 "location": "keep:99999999999999999999999999999994+44/file1",
620 "location": "keep:99999999999999999999999999999994+44/file2",
625 for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
626 j.run(runtimeContext)
627 runner.api.container_requests().create.assert_called_with(
628 body=JsonDiffMatcher({
630 'HOME': '/var/spool/cwl',
633 'name': 'test_run_mounts',
634 'runtime_constraints': {
638 'use_existing': True,
641 "/keep/99999999999999999999999999999994+44": {
642 "kind": "collection",
643 "portable_data_hash": "99999999999999999999999999999994+44"
645 '/tmp': {'kind': 'tmp',
646 "capacity": 1073741824 },
647 '/var/spool/cwl': {'kind': 'tmp',
648 "capacity": 1073741824 }
650 'state': 'Committed',
651 'output_name': 'Output for step test_run_mounts',
652 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
653 'output_path': '/var/spool/cwl',
655 'container_image': '99999999999999999999999999999994+99',
656 'command': ['ls', '/var/spool/cwl'],
657 'cwd': '/var/spool/cwl',
658 'scheduling_parameters': {},
663 # The test passes no builder.resources
664 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
665 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
666 def test_secrets(self, keepdocker):
667 arv_docker_clear_cache()
669 runner = mock.MagicMock()
670 runner.ignore_docker_for_reuse = False
671 runner.intermediate_output_ttl = 0
672 runner.secret_store = cwltool.secrets.SecretStore()
674 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
675 runner.api.collections().get().execute.return_value = {
676 "portable_data_hash": "99999999999999999999999999999993+99"}
678 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
680 tool = cmap({"arguments": ["md5sum", "example.conf"],
681 "class": "CommandLineTool",
684 "class": "http://commonwl.org/cwltool#Secrets",
690 "id": "#secret_job.cwl",
693 "id": "#secret_job.cwl/pw",
701 "class": "InitialWorkDirRequirement",
704 "entry": "username: user\npassword: $(inputs.pw)\n",
705 "entryname": "example.conf"
711 loadingContext, runtimeContext = self.helper(runner)
712 runtimeContext.name = "test_secrets"
714 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
715 arvtool.formatgraph = None
717 job_order = {"pw": "blorp"}
718 runner.secret_store.store(["pw"], job_order)
720 for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
721 j.run(runtimeContext)
722 runner.api.container_requests().create.assert_called_with(
723 body=JsonDiffMatcher({
725 'HOME': '/var/spool/cwl',
728 'name': 'test_secrets',
729 'runtime_constraints': {
733 'use_existing': True,
736 '/tmp': {'kind': 'tmp',
737 "capacity": 1073741824
739 '/var/spool/cwl': {'kind': 'tmp',
740 "capacity": 1073741824 }
742 'state': 'Committed',
743 'output_name': 'Output for step test_secrets',
744 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
745 'output_path': '/var/spool/cwl',
747 'container_image': '99999999999999999999999999999993+99',
748 'command': ['md5sum', 'example.conf'],
749 'cwd': '/var/spool/cwl',
750 'scheduling_parameters': {},
753 "/var/spool/cwl/example.conf": {
754 "content": "username: user\npassword: blorp\n",
760 # The test passes no builder.resources
761 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
762 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
763 def test_timelimit(self, keepdocker):
764 arv_docker_clear_cache()
766 runner = mock.MagicMock()
767 runner.ignore_docker_for_reuse = False
768 runner.intermediate_output_ttl = 0
769 runner.secret_store = cwltool.secrets.SecretStore()
771 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
772 runner.api.collections().get().execute.return_value = {
773 "portable_data_hash": "99999999999999999999999999999993+99"}
779 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
781 "class": "CommandLineTool",
784 "class": "http://commonwl.org/cwltool#TimeLimit",
790 loadingContext, runtimeContext = self.helper(runner)
791 runtimeContext.name = "test_timelimit"
793 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
794 arvtool.formatgraph = None
796 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
797 j.run(runtimeContext)
799 _, kwargs = runner.api.container_requests().create.call_args
800 self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))