1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from builtins import str
6 from builtins import object
9 import arvados_cwl.context
10 import arvados_cwl.util
11 from arvados_cwl.arvdocker import arv_docker_clear_cache
19 import cwltool.process
20 import cwltool.secrets
21 from schema_salad.ref_resolver import Loader
22 from schema_salad.sourceline import cmap
24 from .matcher import JsonDiffMatcher
25 from .mock_discovery import get_rootDesc
27 if not os.getenv('ARVADOS_DEBUG'):
28 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
29 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
31 class CollectionMock(object):
32 def __init__(self, vwdmock, *args, **kwargs):
33 self.vwdmock = vwdmock
36 def open(self, *args, **kwargs):
38 return self.vwdmock.open(*args, **kwargs)
40 def copy(self, *args, **kwargs):
42 self.vwdmock.copy(*args, **kwargs)
44 def save_new(self, *args, **kwargs):
50 def portable_data_hash(self):
52 return arvados.config.EMPTY_BLOCK_LOCATOR
54 return "99999999999999999999999999999996+99"
57 class TestContainer(unittest.TestCase):
59 def helper(self, runner, enable_reuse=True):
60 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
62 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
63 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
64 loadingContext = arvados_cwl.context.ArvLoadingContext(
65 {"avsc_names": avsc_names,
67 "make_fs_access": make_fs_access,
69 "metadata": {"cwlVersion": "v1.0"}})
70 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
71 {"work_api": "containers",
73 "name": "test_run_"+str(enable_reuse),
74 "make_fs_access": make_fs_access,
76 "enable_reuse": enable_reuse,
78 "project_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
81 return loadingContext, runtimeContext
83 # The test passes no builder.resources
84 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
85 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
86 def test_run(self, keepdocker):
87 for enable_reuse in (True, False):
88 arv_docker_clear_cache()
90 runner = mock.MagicMock()
91 runner.ignore_docker_for_reuse = False
92 runner.intermediate_output_ttl = 0
93 runner.secret_store = cwltool.secrets.SecretStore()
95 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
96 runner.api.collections().get().execute.return_value = {
97 "portable_data_hash": "99999999999999999999999999999993+99"}
103 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
105 "class": "CommandLineTool"
108 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
110 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
111 arvtool.formatgraph = None
113 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
114 j.run(runtimeContext)
115 runner.api.container_requests().create.assert_called_with(
116 body=JsonDiffMatcher({
118 'HOME': '/var/spool/cwl',
121 'name': 'test_run_'+str(enable_reuse),
122 'runtime_constraints': {
126 'use_existing': enable_reuse,
129 '/tmp': {'kind': 'tmp',
130 "capacity": 1073741824
132 '/var/spool/cwl': {'kind': 'tmp',
133 "capacity": 1073741824 }
135 'state': 'Committed',
136 'output_name': 'Output for step test_run_'+str(enable_reuse),
137 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
138 'output_path': '/var/spool/cwl',
140 'container_image': '99999999999999999999999999999993+99',
141 'command': ['ls', '/var/spool/cwl'],
142 'cwd': '/var/spool/cwl',
143 'scheduling_parameters': {},
148 # The test passes some fields in builder.resources
149 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
150 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
151 def test_resource_requirements(self, keepdocker):
152 arv_docker_clear_cache()
153 runner = mock.MagicMock()
154 runner.ignore_docker_for_reuse = False
155 runner.intermediate_output_ttl = 3600
156 runner.secret_store = cwltool.secrets.SecretStore()
158 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
159 runner.api.collections().get().execute.return_value = {
160 "portable_data_hash": "99999999999999999999999999999993+99"}
166 "class": "ResourceRequirement",
172 "class": "http://arvados.org/cwl#RuntimeConstraints",
175 "class": "http://arvados.org/cwl#APIRequirement",
177 "class": "http://arvados.org/cwl#PartitionRequirement",
180 "class": "http://arvados.org/cwl#IntermediateOutput",
183 "class": "http://arvados.org/cwl#ReuseRequirement",
188 "class": "CommandLineTool"
191 loadingContext, runtimeContext = self.helper(runner)
192 runtimeContext.name = "test_resource_requirements"
194 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
195 arvtool.formatgraph = None
196 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
197 j.run(runtimeContext)
199 call_args, call_kwargs = runner.api.container_requests().create.call_args
201 call_body_expected = {
203 'HOME': '/var/spool/cwl',
206 'name': 'test_resource_requirements',
207 'runtime_constraints': {
210 'keep_cache_ram': 536870912,
213 'use_existing': False,
216 '/tmp': {'kind': 'tmp',
217 "capacity": 4194304000 },
218 '/var/spool/cwl': {'kind': 'tmp',
219 "capacity": 5242880000 }
221 'state': 'Committed',
222 'output_name': 'Output for step test_resource_requirements',
223 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
224 'output_path': '/var/spool/cwl',
226 'container_image': '99999999999999999999999999999993+99',
228 'cwd': '/var/spool/cwl',
229 'scheduling_parameters': {
230 'partitions': ['blurb']
236 call_body = call_kwargs.get('body', None)
237 self.assertNotEqual(None, call_body)
238 for key in call_body:
239 self.assertEqual(call_body_expected.get(key), call_body.get(key))
242 # The test passes some fields in builder.resources
243 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
244 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
245 @mock.patch("arvados.collection.Collection")
246 def test_initial_work_dir(self, collection_mock, keepdocker):
247 arv_docker_clear_cache()
248 runner = mock.MagicMock()
249 runner.ignore_docker_for_reuse = False
250 runner.intermediate_output_ttl = 0
251 runner.secret_store = cwltool.secrets.SecretStore()
253 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
254 runner.api.collections().get().execute.return_value = {
255 "portable_data_hash": "99999999999999999999999999999993+99"}
257 sourcemock = mock.MagicMock()
258 def get_collection_mock(p):
260 return (sourcemock, p.split("/", 1)[1])
262 return (sourcemock, "")
263 runner.fs_access.get_collection.side_effect = get_collection_mock
265 vwdmock = mock.MagicMock()
266 collection_mock.side_effect = lambda *args, **kwargs: CollectionMock(vwdmock, *args, **kwargs)
272 "class": "InitialWorkDirRequirement",
276 "location": "keep:99999999999999999999999999999995+99/bar"
279 "class": "Directory",
281 "location": "keep:99999999999999999999999999999995+99"
285 "basename": "filename",
286 "location": "keep:99999999999999999999999999999995+99/baz/filename"
289 "class": "Directory",
290 "basename": "subdir",
291 "location": "keep:99999999999999999999999999999995+99/subdir"
296 "class": "CommandLineTool"
299 loadingContext, runtimeContext = self.helper(runner)
300 runtimeContext.name = "test_initial_work_dir"
302 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
303 arvtool.formatgraph = None
304 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
305 j.run(runtimeContext)
307 call_args, call_kwargs = runner.api.container_requests().create.call_args
309 vwdmock.copy.assert_has_calls([mock.call('bar', 'foo', source_collection=sourcemock)])
310 vwdmock.copy.assert_has_calls([mock.call('', 'foo2', source_collection=sourcemock)])
311 vwdmock.copy.assert_has_calls([mock.call('baz/filename', 'filename', source_collection=sourcemock)])
312 vwdmock.copy.assert_has_calls([mock.call('subdir', 'subdir', source_collection=sourcemock)])
314 call_body_expected = {
316 'HOME': '/var/spool/cwl',
319 'name': 'test_initial_work_dir',
320 'runtime_constraints': {
324 'use_existing': True,
327 '/tmp': {'kind': 'tmp',
328 "capacity": 1073741824 },
329 '/var/spool/cwl': {'kind': 'tmp',
330 "capacity": 1073741824 },
331 '/var/spool/cwl/foo': {
332 'kind': 'collection',
334 'portable_data_hash': '99999999999999999999999999999996+99'
336 '/var/spool/cwl/foo2': {
337 'kind': 'collection',
339 'portable_data_hash': '99999999999999999999999999999996+99'
341 '/var/spool/cwl/filename': {
342 'kind': 'collection',
344 'portable_data_hash': '99999999999999999999999999999996+99'
346 '/var/spool/cwl/subdir': {
347 'kind': 'collection',
349 'portable_data_hash': '99999999999999999999999999999996+99'
352 'state': 'Committed',
353 'output_name': 'Output for step test_initial_work_dir',
354 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
355 'output_path': '/var/spool/cwl',
357 'container_image': '99999999999999999999999999999993+99',
359 'cwd': '/var/spool/cwl',
360 'scheduling_parameters': {
366 call_body = call_kwargs.get('body', None)
367 self.assertNotEqual(None, call_body)
368 for key in call_body:
369 self.assertEqual(call_body_expected.get(key), call_body.get(key))
372 # Test redirecting stdin/stdout/stderr
373 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
374 def test_redirects(self, keepdocker):
375 arv_docker_clear_cache()
377 runner = mock.MagicMock()
378 runner.ignore_docker_for_reuse = False
379 runner.intermediate_output_ttl = 0
380 runner.secret_store = cwltool.secrets.SecretStore()
382 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
383 runner.api.collections().get().execute.return_value = {
384 "portable_data_hash": "99999999999999999999999999999993+99"}
386 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
392 "stdout": "stdout.txt",
393 "stderr": "stderr.txt",
394 "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
395 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
397 "class": "CommandLineTool"
400 loadingContext, runtimeContext = self.helper(runner)
401 runtimeContext.name = "test_run_redirect"
403 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
404 arvtool.formatgraph = None
405 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
406 j.run(runtimeContext)
407 runner.api.container_requests().create.assert_called_with(
408 body=JsonDiffMatcher({
410 'HOME': '/var/spool/cwl',
413 'name': 'test_run_redirect',
414 'runtime_constraints': {
418 'use_existing': True,
421 '/tmp': {'kind': 'tmp',
422 "capacity": 1073741824 },
423 '/var/spool/cwl': {'kind': 'tmp',
424 "capacity": 1073741824 },
427 "path": "/var/spool/cwl/stderr.txt"
430 "kind": "collection",
432 "portable_data_hash": "99999999999999999999999999999996+99"
436 "path": "/var/spool/cwl/stdout.txt"
439 'state': 'Committed',
440 "output_name": "Output for step test_run_redirect",
441 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
442 'output_path': '/var/spool/cwl',
444 'container_image': '99999999999999999999999999999993+99',
445 'command': ['ls', '/var/spool/cwl'],
446 'cwd': '/var/spool/cwl',
447 'scheduling_parameters': {},
452 @mock.patch("arvados.collection.Collection")
453 def test_done(self, col):
454 api = mock.MagicMock()
456 runner = mock.MagicMock()
458 runner.num_retries = 0
459 runner.ignore_docker_for_reuse = False
460 runner.intermediate_output_ttl = 0
461 runner.secret_store = cwltool.secrets.SecretStore()
463 runner.api.containers().get().execute.return_value = {"state":"Complete",
467 col().open.return_value = []
469 loadingContext, runtimeContext = self.helper(runner)
471 arvjob = arvados_cwl.ArvadosContainer(runner,
479 arvjob.output_callback = mock.MagicMock()
480 arvjob.collect_outputs = mock.MagicMock()
481 arvjob.successCodes = [0]
482 arvjob.outdir = "/var/spool/cwl"
483 arvjob.output_ttl = 3600
485 arvjob.collect_outputs.return_value = {"out": "stuff"}
489 "log_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
490 "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
491 "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
492 "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
493 "modified_at": "2017-05-26T12:01:22Z"
496 self.assertFalse(api.collections().create.called)
497 self.assertFalse(runner.runtime_status_error.called)
499 arvjob.collect_outputs.assert_called_with("keep:abc+123")
500 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
501 runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
503 @mock.patch("arvados_cwl.util.get_current_container")
504 @mock.patch("arvados.collection.CollectionReader")
505 @mock.patch("arvados.collection.Collection")
506 def test_child_failure(self, col, reader, gcc_mock):
507 api = mock.MagicMock()
508 api._rootDesc = copy.deepcopy(get_rootDesc())
509 del api._rootDesc.get('resources')['jobs']['methods']['create']
511 # Set up runner with mocked runtime_status_update()
512 self.assertFalse(gcc_mock.called)
513 runtime_status_update = mock.MagicMock()
514 arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
515 runner = arvados_cwl.ArvCwlExecutor(api)
516 self.assertEqual(runner.work_api, 'containers')
518 # Make sure ArvCwlExecutor thinks it's running inside a container so it
519 # adds the logging handler that will call runtime_status_update() mock
520 gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
521 self.assertTrue(gcc_mock.called)
522 root_logger = logging.getLogger('')
523 handlerClasses = [h.__class__ for h in root_logger.handlers]
524 self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
526 runner.num_retries = 0
527 runner.ignore_docker_for_reuse = False
528 runner.intermediate_output_ttl = 0
529 runner.secret_store = cwltool.secrets.SecretStore()
530 runner.label = mock.MagicMock()
531 runner.label.return_value = '[container testjob]'
533 runner.api.containers().get().execute.return_value = {
540 col().open.return_value = []
542 loadingContext, runtimeContext = self.helper(runner)
544 arvjob = arvados_cwl.ArvadosContainer(runner,
552 arvjob.output_callback = mock.MagicMock()
553 arvjob.collect_outputs = mock.MagicMock()
554 arvjob.successCodes = [0]
555 arvjob.outdir = "/var/spool/cwl"
556 arvjob.output_ttl = 3600
557 arvjob.collect_outputs.return_value = {"out": "stuff"}
561 "log_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
562 "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
563 "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
564 "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
565 "modified_at": "2017-05-26T12:01:22Z"
568 runtime_status_update.assert_called_with(
570 'arvados.cwl-runner: [container testjob] (zzzzz-xvhdp-zzzzzzzzzzzzzzz) error log:',
571 ' ** log is empty **'
573 arvjob.output_callback.assert_called_with({"out": "stuff"}, "permanentFail")
575 # The test passes no builder.resources
576 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
577 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
578 def test_mounts(self, keepdocker):
579 arv_docker_clear_cache()
581 runner = mock.MagicMock()
582 runner.ignore_docker_for_reuse = False
583 runner.intermediate_output_ttl = 0
584 runner.secret_store = cwltool.secrets.SecretStore()
586 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
587 runner.api.collections().get().execute.return_value = {
588 "portable_data_hash": "99999999999999999999999999999994+99",
589 "manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
591 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
600 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
602 "class": "CommandLineTool"
605 loadingContext, runtimeContext = self.helper(runner)
606 runtimeContext.name = "test_run_mounts"
608 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
609 arvtool.formatgraph = None
612 "class": "Directory",
613 "location": "keep:99999999999999999999999999999994+44",
617 "location": "keep:99999999999999999999999999999994+44/file1",
621 "location": "keep:99999999999999999999999999999994+44/file2",
626 for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
627 j.run(runtimeContext)
628 runner.api.container_requests().create.assert_called_with(
629 body=JsonDiffMatcher({
631 'HOME': '/var/spool/cwl',
634 'name': 'test_run_mounts',
635 'runtime_constraints': {
639 'use_existing': True,
642 "/keep/99999999999999999999999999999994+44": {
643 "kind": "collection",
644 "portable_data_hash": "99999999999999999999999999999994+44"
646 '/tmp': {'kind': 'tmp',
647 "capacity": 1073741824 },
648 '/var/spool/cwl': {'kind': 'tmp',
649 "capacity": 1073741824 }
651 'state': 'Committed',
652 'output_name': 'Output for step test_run_mounts',
653 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
654 'output_path': '/var/spool/cwl',
656 'container_image': '99999999999999999999999999999994+99',
657 'command': ['ls', '/var/spool/cwl'],
658 'cwd': '/var/spool/cwl',
659 'scheduling_parameters': {},
664 # The test passes no builder.resources
665 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
666 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
667 def test_secrets(self, keepdocker):
668 arv_docker_clear_cache()
670 runner = mock.MagicMock()
671 runner.ignore_docker_for_reuse = False
672 runner.intermediate_output_ttl = 0
673 runner.secret_store = cwltool.secrets.SecretStore()
675 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
676 runner.api.collections().get().execute.return_value = {
677 "portable_data_hash": "99999999999999999999999999999993+99"}
679 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
681 tool = cmap({"arguments": ["md5sum", "example.conf"],
682 "class": "CommandLineTool",
685 "class": "http://commonwl.org/cwltool#Secrets",
691 "id": "#secret_job.cwl",
694 "id": "#secret_job.cwl/pw",
702 "class": "InitialWorkDirRequirement",
705 "entry": "username: user\npassword: $(inputs.pw)\n",
706 "entryname": "example.conf"
712 loadingContext, runtimeContext = self.helper(runner)
713 runtimeContext.name = "test_secrets"
715 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
716 arvtool.formatgraph = None
718 job_order = {"pw": "blorp"}
719 runner.secret_store.store(["pw"], job_order)
721 for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
722 j.run(runtimeContext)
723 runner.api.container_requests().create.assert_called_with(
724 body=JsonDiffMatcher({
726 'HOME': '/var/spool/cwl',
729 'name': 'test_secrets',
730 'runtime_constraints': {
734 'use_existing': True,
737 '/tmp': {'kind': 'tmp',
738 "capacity": 1073741824
740 '/var/spool/cwl': {'kind': 'tmp',
741 "capacity": 1073741824 }
743 'state': 'Committed',
744 'output_name': 'Output for step test_secrets',
745 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
746 'output_path': '/var/spool/cwl',
748 'container_image': '99999999999999999999999999999993+99',
749 'command': ['md5sum', 'example.conf'],
750 'cwd': '/var/spool/cwl',
751 'scheduling_parameters': {},
754 "/var/spool/cwl/example.conf": {
755 "content": "username: user\npassword: blorp\n",
761 # The test passes no builder.resources
762 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
763 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
764 def test_timelimit(self, keepdocker):
765 arv_docker_clear_cache()
767 runner = mock.MagicMock()
768 runner.ignore_docker_for_reuse = False
769 runner.intermediate_output_ttl = 0
770 runner.secret_store = cwltool.secrets.SecretStore()
772 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
773 runner.api.collections().get().execute.return_value = {
774 "portable_data_hash": "99999999999999999999999999999993+99"}
780 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
782 "class": "CommandLineTool",
785 "class": "http://commonwl.org/cwltool#TimeLimit",
791 loadingContext, runtimeContext = self.helper(runner)
792 runtimeContext.name = "test_timelimit"
794 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
795 arvtool.formatgraph = None
797 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
798 j.run(runtimeContext)
800 _, kwargs = runner.api.container_requests().create.call_args
801 self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))