1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
6 import arvados_cwl.context
7 import arvados_cwl.util
8 from arvados_cwl.arvdocker import arv_docker_clear_cache
16 import cwltool.process
17 import cwltool.secrets
18 from schema_salad.ref_resolver import Loader
19 from schema_salad.sourceline import cmap
21 from .matcher import JsonDiffMatcher
22 from .mock_discovery import get_rootDesc
24 if not os.getenv('ARVADOS_DEBUG'):
25 logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
26 logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
28 class CollectionMock(object):
29 def __init__(self, vwdmock, *args, **kwargs):
30 self.vwdmock = vwdmock
33 def open(self, *args, **kwargs):
35 return self.vwdmock.open(*args, **kwargs)
37 def copy(self, *args, **kwargs):
39 self.vwdmock.copy(*args, **kwargs)
41 def save_new(self, *args, **kwargs):
47 def portable_data_hash(self):
49 return arvados.config.EMPTY_BLOCK_LOCATOR
51 return "99999999999999999999999999999996+99"
54 class TestContainer(unittest.TestCase):
56 def helper(self, runner, enable_reuse=True):
57 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
59 make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
60 collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
61 loadingContext = arvados_cwl.context.ArvLoadingContext(
62 {"avsc_names": avsc_names,
64 "make_fs_access": make_fs_access,
66 "metadata": {"cwlVersion": "v1.0"}})
67 runtimeContext = arvados_cwl.context.ArvRuntimeContext(
68 {"work_api": "containers",
70 "name": "test_run_"+str(enable_reuse),
71 "make_fs_access": make_fs_access,
73 "enable_reuse": enable_reuse,
75 "project_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
78 return loadingContext, runtimeContext
80 # The test passes no builder.resources
81 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
82 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
83 def test_run(self, keepdocker):
84 for enable_reuse in (True, False):
85 arv_docker_clear_cache()
87 runner = mock.MagicMock()
88 runner.ignore_docker_for_reuse = False
89 runner.intermediate_output_ttl = 0
90 runner.secret_store = cwltool.secrets.SecretStore()
92 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
93 runner.api.collections().get().execute.return_value = {
94 "portable_data_hash": "99999999999999999999999999999993+99"}
100 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
102 "class": "CommandLineTool"
105 loadingContext, runtimeContext = self.helper(runner, enable_reuse)
107 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
108 arvtool.formatgraph = None
110 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
111 j.run(runtimeContext)
112 runner.api.container_requests().create.assert_called_with(
113 body=JsonDiffMatcher({
115 'HOME': '/var/spool/cwl',
118 'name': 'test_run_'+str(enable_reuse),
119 'runtime_constraints': {
123 'use_existing': enable_reuse,
126 '/tmp': {'kind': 'tmp',
127 "capacity": 1073741824
129 '/var/spool/cwl': {'kind': 'tmp',
130 "capacity": 1073741824 }
132 'state': 'Committed',
133 'output_name': 'Output for step test_run_'+str(enable_reuse),
134 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
135 'output_path': '/var/spool/cwl',
137 'container_image': '99999999999999999999999999999993+99',
138 'command': ['ls', '/var/spool/cwl'],
139 'cwd': '/var/spool/cwl',
140 'scheduling_parameters': {},
145 # The test passes some fields in builder.resources
146 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
147 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
148 def test_resource_requirements(self, keepdocker):
149 arv_docker_clear_cache()
150 runner = mock.MagicMock()
151 runner.ignore_docker_for_reuse = False
152 runner.intermediate_output_ttl = 3600
153 runner.secret_store = cwltool.secrets.SecretStore()
155 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
156 runner.api.collections().get().execute.return_value = {
157 "portable_data_hash": "99999999999999999999999999999993+99"}
163 "class": "ResourceRequirement",
169 "class": "http://arvados.org/cwl#RuntimeConstraints",
172 "class": "http://arvados.org/cwl#APIRequirement",
174 "class": "http://arvados.org/cwl#PartitionRequirement",
177 "class": "http://arvados.org/cwl#IntermediateOutput",
180 "class": "http://arvados.org/cwl#ReuseRequirement",
185 "class": "CommandLineTool"
188 loadingContext, runtimeContext = self.helper(runner)
189 runtimeContext.name = "test_resource_requirements"
191 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
192 arvtool.formatgraph = None
193 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
194 j.run(runtimeContext)
196 call_args, call_kwargs = runner.api.container_requests().create.call_args
198 call_body_expected = {
200 'HOME': '/var/spool/cwl',
203 'name': 'test_resource_requirements',
204 'runtime_constraints': {
207 'keep_cache_ram': 536870912,
210 'use_existing': False,
213 '/tmp': {'kind': 'tmp',
214 "capacity": 4194304000 },
215 '/var/spool/cwl': {'kind': 'tmp',
216 "capacity": 5242880000 }
218 'state': 'Committed',
219 'output_name': 'Output for step test_resource_requirements',
220 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
221 'output_path': '/var/spool/cwl',
223 'container_image': '99999999999999999999999999999993+99',
225 'cwd': '/var/spool/cwl',
226 'scheduling_parameters': {
227 'partitions': ['blurb']
233 call_body = call_kwargs.get('body', None)
234 self.assertNotEqual(None, call_body)
235 for key in call_body:
236 self.assertEqual(call_body_expected.get(key), call_body.get(key))
239 # The test passes some fields in builder.resources
240 # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
241 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
242 @mock.patch("arvados.collection.Collection")
243 def test_initial_work_dir(self, collection_mock, keepdocker):
244 arv_docker_clear_cache()
245 runner = mock.MagicMock()
246 runner.ignore_docker_for_reuse = False
247 runner.intermediate_output_ttl = 0
248 runner.secret_store = cwltool.secrets.SecretStore()
250 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
251 runner.api.collections().get().execute.return_value = {
252 "portable_data_hash": "99999999999999999999999999999993+99"}
254 sourcemock = mock.MagicMock()
255 def get_collection_mock(p):
257 return (sourcemock, p.split("/", 1)[1])
259 return (sourcemock, "")
260 runner.fs_access.get_collection.side_effect = get_collection_mock
262 vwdmock = mock.MagicMock()
263 collection_mock.side_effect = lambda *args, **kwargs: CollectionMock(vwdmock, *args, **kwargs)
269 "class": "InitialWorkDirRequirement",
273 "location": "keep:99999999999999999999999999999995+99/bar"
276 "class": "Directory",
278 "location": "keep:99999999999999999999999999999995+99"
282 "basename": "filename",
283 "location": "keep:99999999999999999999999999999995+99/baz/filename"
286 "class": "Directory",
287 "basename": "subdir",
288 "location": "keep:99999999999999999999999999999995+99/subdir"
293 "class": "CommandLineTool"
296 loadingContext, runtimeContext = self.helper(runner)
297 runtimeContext.name = "test_initial_work_dir"
299 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
300 arvtool.formatgraph = None
301 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
302 j.run(runtimeContext)
304 call_args, call_kwargs = runner.api.container_requests().create.call_args
306 vwdmock.copy.assert_has_calls([mock.call('bar', 'foo', source_collection=sourcemock)])
307 vwdmock.copy.assert_has_calls([mock.call('', 'foo2', source_collection=sourcemock)])
308 vwdmock.copy.assert_has_calls([mock.call('baz/filename', 'filename', source_collection=sourcemock)])
309 vwdmock.copy.assert_has_calls([mock.call('subdir', 'subdir', source_collection=sourcemock)])
311 call_body_expected = {
313 'HOME': '/var/spool/cwl',
316 'name': 'test_initial_work_dir',
317 'runtime_constraints': {
321 'use_existing': True,
324 '/tmp': {'kind': 'tmp',
325 "capacity": 1073741824 },
326 '/var/spool/cwl': {'kind': 'tmp',
327 "capacity": 1073741824 },
328 '/var/spool/cwl/foo': {
329 'kind': 'collection',
331 'portable_data_hash': '99999999999999999999999999999996+99'
333 '/var/spool/cwl/foo2': {
334 'kind': 'collection',
336 'portable_data_hash': '99999999999999999999999999999996+99'
338 '/var/spool/cwl/filename': {
339 'kind': 'collection',
341 'portable_data_hash': '99999999999999999999999999999996+99'
343 '/var/spool/cwl/subdir': {
344 'kind': 'collection',
346 'portable_data_hash': '99999999999999999999999999999996+99'
349 'state': 'Committed',
350 'output_name': 'Output for step test_initial_work_dir',
351 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
352 'output_path': '/var/spool/cwl',
354 'container_image': '99999999999999999999999999999993+99',
356 'cwd': '/var/spool/cwl',
357 'scheduling_parameters': {
363 call_body = call_kwargs.get('body', None)
364 self.assertNotEqual(None, call_body)
365 for key in call_body:
366 self.assertEqual(call_body_expected.get(key), call_body.get(key))
369 # Test redirecting stdin/stdout/stderr
370 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
371 def test_redirects(self, keepdocker):
372 arv_docker_clear_cache()
374 runner = mock.MagicMock()
375 runner.ignore_docker_for_reuse = False
376 runner.intermediate_output_ttl = 0
377 runner.secret_store = cwltool.secrets.SecretStore()
379 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
380 runner.api.collections().get().execute.return_value = {
381 "portable_data_hash": "99999999999999999999999999999993+99"}
383 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
389 "stdout": "stdout.txt",
390 "stderr": "stderr.txt",
391 "stdin": "/keep/99999999999999999999999999999996+99/file.txt",
392 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
394 "class": "CommandLineTool"
397 loadingContext, runtimeContext = self.helper(runner)
398 runtimeContext.name = "test_run_redirect"
400 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
401 arvtool.formatgraph = None
402 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
403 j.run(runtimeContext)
404 runner.api.container_requests().create.assert_called_with(
405 body=JsonDiffMatcher({
407 'HOME': '/var/spool/cwl',
410 'name': 'test_run_redirect',
411 'runtime_constraints': {
415 'use_existing': True,
418 '/tmp': {'kind': 'tmp',
419 "capacity": 1073741824 },
420 '/var/spool/cwl': {'kind': 'tmp',
421 "capacity": 1073741824 },
424 "path": "/var/spool/cwl/stderr.txt"
427 "kind": "collection",
429 "portable_data_hash": "99999999999999999999999999999996+99"
433 "path": "/var/spool/cwl/stdout.txt"
436 'state': 'Committed',
437 "output_name": "Output for step test_run_redirect",
438 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
439 'output_path': '/var/spool/cwl',
441 'container_image': '99999999999999999999999999999993+99',
442 'command': ['ls', '/var/spool/cwl'],
443 'cwd': '/var/spool/cwl',
444 'scheduling_parameters': {},
449 @mock.patch("arvados.collection.Collection")
450 def test_done(self, col):
451 api = mock.MagicMock()
453 runner = mock.MagicMock()
455 runner.num_retries = 0
456 runner.ignore_docker_for_reuse = False
457 runner.intermediate_output_ttl = 0
458 runner.secret_store = cwltool.secrets.SecretStore()
460 runner.api.containers().get().execute.return_value = {"state":"Complete",
464 col().open.return_value = []
466 loadingContext, runtimeContext = self.helper(runner)
468 arvjob = arvados_cwl.ArvadosContainer(runner,
476 arvjob.output_callback = mock.MagicMock()
477 arvjob.collect_outputs = mock.MagicMock()
478 arvjob.successCodes = [0]
479 arvjob.outdir = "/var/spool/cwl"
480 arvjob.output_ttl = 3600
482 arvjob.collect_outputs.return_value = {"out": "stuff"}
486 "log_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
487 "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
488 "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
489 "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
490 "modified_at": "2017-05-26T12:01:22Z"
493 self.assertFalse(api.collections().create.called)
494 self.assertFalse(runner.runtime_status_error.called)
496 arvjob.collect_outputs.assert_called_with("keep:abc+123")
497 arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
498 runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
500 @mock.patch("arvados_cwl.util.get_current_container")
501 @mock.patch("arvados.collection.CollectionReader")
502 @mock.patch("arvados.collection.Collection")
503 def test_child_failure(self, col, reader, gcc_mock):
504 api = mock.MagicMock()
505 api._rootDesc = copy.deepcopy(get_rootDesc())
506 del api._rootDesc.get('resources')['jobs']['methods']['create']
508 # Set up runner with mocked runtime_status_update()
509 self.assertFalse(gcc_mock.called)
510 runtime_status_update = mock.MagicMock()
511 arvados_cwl.ArvCwlExecutor.runtime_status_update = runtime_status_update
512 runner = arvados_cwl.ArvCwlExecutor(api)
513 self.assertEqual(runner.work_api, 'containers')
515 # Make sure ArvCwlExecutor thinks it's running inside a container so it
516 # adds the logging handler that will call runtime_status_update() mock
517 gcc_mock.return_value = {"uuid" : "zzzzz-dz642-zzzzzzzzzzzzzzz"}
518 self.assertTrue(gcc_mock.called)
519 root_logger = logging.getLogger('')
520 handlerClasses = [h.__class__ for h in root_logger.handlers]
521 self.assertTrue(arvados_cwl.RuntimeStatusLoggingHandler in handlerClasses)
523 runner.num_retries = 0
524 runner.ignore_docker_for_reuse = False
525 runner.intermediate_output_ttl = 0
526 runner.secret_store = cwltool.secrets.SecretStore()
527 runner.label = mock.MagicMock()
528 runner.label.return_value = '[container testjob]'
530 runner.api.containers().get().execute.return_value = {
537 col().open.return_value = []
539 loadingContext, runtimeContext = self.helper(runner)
541 arvjob = arvados_cwl.ArvadosContainer(runner,
549 arvjob.output_callback = mock.MagicMock()
550 arvjob.collect_outputs = mock.MagicMock()
551 arvjob.successCodes = [0]
552 arvjob.outdir = "/var/spool/cwl"
553 arvjob.output_ttl = 3600
554 arvjob.collect_outputs.return_value = {"out": "stuff"}
558 "log_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
559 "output_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
560 "uuid": "zzzzz-xvhdp-zzzzzzzzzzzzzzz",
561 "container_uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
562 "modified_at": "2017-05-26T12:01:22Z"
565 runtime_status_update.assert_called_with(
567 'arvados.cwl-runner: [container testjob] (zzzzz-xvhdp-zzzzzzzzzzzzzzz) error log:',
568 ' ** log is empty **'
570 arvjob.output_callback.assert_called_with({"out": "stuff"}, "permanentFail")
572 # The test passes no builder.resources
573 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
574 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
575 def test_mounts(self, keepdocker):
576 arv_docker_clear_cache()
578 runner = mock.MagicMock()
579 runner.ignore_docker_for_reuse = False
580 runner.intermediate_output_ttl = 0
581 runner.secret_store = cwltool.secrets.SecretStore()
583 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
584 runner.api.collections().get().execute.return_value = {
585 "portable_data_hash": "99999999999999999999999999999994+99",
586 "manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
588 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
597 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
599 "class": "CommandLineTool"
602 loadingContext, runtimeContext = self.helper(runner)
603 runtimeContext.name = "test_run_mounts"
605 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
606 arvtool.formatgraph = None
609 "class": "Directory",
610 "location": "keep:99999999999999999999999999999994+44",
614 "location": "keep:99999999999999999999999999999994+44/file1",
618 "location": "keep:99999999999999999999999999999994+44/file2",
623 for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
624 j.run(runtimeContext)
625 runner.api.container_requests().create.assert_called_with(
626 body=JsonDiffMatcher({
628 'HOME': '/var/spool/cwl',
631 'name': 'test_run_mounts',
632 'runtime_constraints': {
636 'use_existing': True,
639 "/keep/99999999999999999999999999999994+44": {
640 "kind": "collection",
641 "portable_data_hash": "99999999999999999999999999999994+44"
643 '/tmp': {'kind': 'tmp',
644 "capacity": 1073741824 },
645 '/var/spool/cwl': {'kind': 'tmp',
646 "capacity": 1073741824 }
648 'state': 'Committed',
649 'output_name': 'Output for step test_run_mounts',
650 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
651 'output_path': '/var/spool/cwl',
653 'container_image': '99999999999999999999999999999994+99',
654 'command': ['ls', '/var/spool/cwl'],
655 'cwd': '/var/spool/cwl',
656 'scheduling_parameters': {},
661 # The test passes no builder.resources
662 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
663 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
664 def test_secrets(self, keepdocker):
665 arv_docker_clear_cache()
667 runner = mock.MagicMock()
668 runner.ignore_docker_for_reuse = False
669 runner.intermediate_output_ttl = 0
670 runner.secret_store = cwltool.secrets.SecretStore()
672 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
673 runner.api.collections().get().execute.return_value = {
674 "portable_data_hash": "99999999999999999999999999999993+99"}
676 document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
678 tool = cmap({"arguments": ["md5sum", "example.conf"],
679 "class": "CommandLineTool",
682 "class": "http://commonwl.org/cwltool#Secrets",
688 "id": "#secret_job.cwl",
691 "id": "#secret_job.cwl/pw",
699 "class": "InitialWorkDirRequirement",
702 "entry": "username: user\npassword: $(inputs.pw)\n",
703 "entryname": "example.conf"
709 loadingContext, runtimeContext = self.helper(runner)
710 runtimeContext.name = "test_secrets"
712 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
713 arvtool.formatgraph = None
715 job_order = {"pw": "blorp"}
716 runner.secret_store.store(["pw"], job_order)
718 for j in arvtool.job(job_order, mock.MagicMock(), runtimeContext):
719 j.run(runtimeContext)
720 runner.api.container_requests().create.assert_called_with(
721 body=JsonDiffMatcher({
723 'HOME': '/var/spool/cwl',
726 'name': 'test_secrets',
727 'runtime_constraints': {
731 'use_existing': True,
734 '/tmp': {'kind': 'tmp',
735 "capacity": 1073741824
737 '/var/spool/cwl': {'kind': 'tmp',
738 "capacity": 1073741824 }
740 'state': 'Committed',
741 'output_name': 'Output for step test_secrets',
742 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
743 'output_path': '/var/spool/cwl',
745 'container_image': '99999999999999999999999999999993+99',
746 'command': ['md5sum', 'example.conf'],
747 'cwd': '/var/spool/cwl',
748 'scheduling_parameters': {},
751 "/var/spool/cwl/example.conf": {
752 "content": "username: user\npassword: blorp\n",
758 # The test passes no builder.resources
759 # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
760 @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
761 def test_timelimit(self, keepdocker):
762 arv_docker_clear_cache()
764 runner = mock.MagicMock()
765 runner.ignore_docker_for_reuse = False
766 runner.intermediate_output_ttl = 0
767 runner.secret_store = cwltool.secrets.SecretStore()
769 keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
770 runner.api.collections().get().execute.return_value = {
771 "portable_data_hash": "99999999999999999999999999999993+99"}
777 "arguments": [{"valueFrom": "$(runtime.outdir)"}],
779 "class": "CommandLineTool",
782 "class": "http://commonwl.org/cwltool#TimeLimit",
788 loadingContext, runtimeContext = self.helper(runner)
789 runtimeContext.name = "test_timelimit"
791 arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
792 arvtool.formatgraph = None
794 for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
795 j.run(runtimeContext)
797 _, kwargs = runner.api.container_requests().create.call_args
798 self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))