Merge branch '10172-crunch2-container-output' closes #10172
[arvados.git] / sdk / cwl / tests / test_job.py
1 import functools
2 import json
3 import logging
4 import mock
5 import os
6 import unittest
7
8 import arvados
9 import arvados_cwl
10 import cwltool.process
11 from schema_salad.ref_resolver import Loader
12
13 if not os.getenv('ARVADOS_DEBUG'):
14     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
15     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
16
17
18 class TestJob(unittest.TestCase):
19
20     # The test passes no builder.resources
21     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
22     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
23     def test_run(self, list_images_in_arv):
24         runner = mock.MagicMock()
25         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
26         runner.ignore_docker_for_reuse = False
27         runner.num_retries = 0
28         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
29
30         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
31         runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
32
33         tool = {
34             "inputs": [],
35             "outputs": [],
36             "baseCommand": "ls",
37             "arguments": [{"valueFrom": "$(runtime.outdir)"}]
38         }
39         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
40         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
41                                                  basedir="", make_fs_access=make_fs_access, loader=Loader({}))
42         arvtool.formatgraph = None
43         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
44             j.run()
45             runner.api.jobs().create.assert_called_with(
46                 body={
47                     'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
48                     'runtime_constraints': {},
49                     'script_parameters': {
50                         'tasks': [{
51                             'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
52                             'command': ['ls', '$(task.outdir)']
53                         }],
54                     },
55                     'script_version': 'master',
56                     'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
57                     'repository': 'arvados',
58                     'script': 'crunchrunner',
59                     'runtime_constraints': {
60                         'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
61                         'min_cores_per_node': 1,
62                         'min_ram_mb_per_node': 1024,
63                         'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
64                     }
65                 },
66                 find_or_create=True,
67                 filters=[['repository', '=', 'arvados'],
68                          ['script', '=', 'crunchrunner'],
69                          ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
70                          ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
71             )
72
73     # The test passes some fields in builder.resources
74     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
75     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
76     def test_resource_requirements(self, list_images_in_arv):
77         runner = mock.MagicMock()
78         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
79         runner.ignore_docker_for_reuse = False
80         runner.num_retries = 0
81         arvados_cwl.add_arv_hints()
82
83         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
84         runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
85
86         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
87
88
89         tool = {
90             "inputs": [],
91             "outputs": [],
92             "hints": [{
93                 "class": "ResourceRequirement",
94                 "coresMin": 3,
95                 "ramMin": 3000,
96                 "tmpdirMin": 4000
97             }, {
98                 "class": "http://arvados.org/cwl#RuntimeConstraints",
99                 "keep_cache": 512,
100                 "outputDirType": "keep_output_dir"
101             }, {
102                 "class": "http://arvados.org/cwl#APIRequirement",
103             }],
104             "baseCommand": "ls"
105         }
106         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
107         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
108                                                  make_fs_access=make_fs_access, loader=Loader({}))
109         arvtool.formatgraph = None
110         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
111             j.run()
112         runner.api.jobs().create.assert_called_with(
113             body={
114                 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
115                 'runtime_constraints': {},
116                 'script_parameters': {
117                     'tasks': [{
118                         'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
119                         'task.keepTmpOutput': True,
120                         'command': ['ls']
121                     }]
122             },
123             'script_version': 'master',
124                 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
125                 'repository': 'arvados',
126                 'script': 'crunchrunner',
127                 'runtime_constraints': {
128                     'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
129                     'min_cores_per_node': 3,
130                     'min_ram_mb_per_node': 3000,
131                     'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
132                     'keep_cache_mb_per_task': 512
133                 }
134             },
135             find_or_create=True,
136             filters=[['repository', '=', 'arvados'],
137                      ['script', '=', 'crunchrunner'],
138                      ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
139                      ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]])
140
141     @mock.patch("arvados.collection.CollectionReader")
142     def test_done(self, reader):
143         api = mock.MagicMock()
144
145         runner = mock.MagicMock()
146         runner.api = api
147         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
148         runner.num_retries = 0
149         runner.ignore_docker_for_reuse = False
150
151         reader().open.return_value = []
152         api.collections().list().execute.side_effect = ({"items": []},
153                                                         {"items": [{"manifest_text": "XYZ"}]})
154
155         arvjob = arvados_cwl.ArvadosJob(runner)
156         arvjob.name = "testjob"
157         arvjob.builder = mock.MagicMock()
158         arvjob.output_callback = mock.MagicMock()
159         arvjob.collect_outputs = mock.MagicMock()
160
161         arvjob.done({
162             "state": "Complete",
163             "output": "99999999999999999999999999999993+99",
164             "log": "99999999999999999999999999999994+99",
165             "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
166         })
167
168         api.collections().list.assert_has_calls([
169             mock.call(),
170             mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
171                           ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
172                           ['name', '=', 'Output 9999999 of testjob']]),
173             mock.call().execute(num_retries=0),
174             mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
175                  select=['manifest_text']),
176             mock.call().execute(num_retries=0)])
177
178         api.collections().create.assert_called_with(
179             ensure_unique_name=True,
180             body={'portable_data_hash': '99999999999999999999999999999993+99',
181                   'manifest_text': 'XYZ',
182                   'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
183                   'name': 'Output 9999999 of testjob'})
184
185     @mock.patch("arvados.collection.CollectionReader")
186     def test_done_use_existing_collection(self, reader):
187         api = mock.MagicMock()
188
189         runner = mock.MagicMock()
190         runner.api = api
191         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
192         runner.num_retries = 0
193
194         reader().open.return_value = []
195         api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
196
197         arvjob = arvados_cwl.ArvadosJob(runner)
198         arvjob.name = "testjob"
199         arvjob.builder = mock.MagicMock()
200         arvjob.output_callback = mock.MagicMock()
201         arvjob.collect_outputs = mock.MagicMock()
202
203         arvjob.done({
204             "state": "Complete",
205             "output": "99999999999999999999999999999993+99",
206             "log": "99999999999999999999999999999994+99",
207             "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
208         })
209
210         api.collections().list.assert_has_calls([
211             mock.call(),
212             mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
213                                ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
214                                ['name', '=', 'Output 9999999 of testjob']]),
215             mock.call().execute(num_retries=0)])
216
217         self.assertFalse(api.collections().create.called)
218
219
220 class TestWorkflow(unittest.TestCase):
221     # The test passes no builder.resources
222     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
223     @mock.patch("arvados.collection.Collection")
224     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
225     def test_run(self, list_images_in_arv, mockcollection):
226         arvados_cwl.add_arv_hints()
227
228         api = mock.MagicMock()
229         api._rootDesc = arvados.api('v1')._rootDesc
230         runner = arvados_cwl.ArvCwlRunner(api)
231         self.assertEqual(runner.work_api, 'jobs')
232
233         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
234         runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
235
236         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
237         runner.ignore_docker_for_reuse = False
238         runner.num_retries = 0
239         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
240
241         tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
242         metadata["cwlVersion"] = tool["cwlVersion"]
243
244         mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
245
246         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
247         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
248                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
249                                               makeTool=runner.arv_make_tool, metadata=metadata)
250         arvtool.formatgraph = None
251         it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
252         it.next().run()
253         it.next().run()
254
255         with open("tests/wf/scatter2_subwf.cwl") as f:
256             subwf = f.read()
257
258         runner.api.jobs().create.assert_called_with(
259             body={
260                 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
261                 'repository': 'arvados',
262                 'script_version': 'master',
263                 'script': 'crunchrunner',
264                 'script_parameters': {
265                     'tasks': [{'task.env': {
266                         'HOME': '$(task.outdir)',
267                         'TMPDIR': '$(task.tmpdir)'},
268                                'task.vwd': {
269                                    'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
270                                    'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
271                                },
272                     'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
273                     'task.stdout': 'cwl.output.json'}]},
274                 'runtime_constraints': {
275                     'min_scratch_mb_per_node': 2048,
276                     'min_cores_per_node': 1,
277                     'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
278                     'min_ram_mb_per_node': 1024
279                 },
280                 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
281             filters=[['repository', '=', 'arvados'],
282                      ['script', '=', 'crunchrunner'],
283                      ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
284                      ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]],
285             find_or_create=True)
286
287         mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
288         mockcollection().open().__enter__().write.assert_has_calls([mock.call('{sleeptime: 5}')])
289
290     def test_default_work_api(self):
291         arvados_cwl.add_arv_hints()
292
293         api = mock.MagicMock()
294         api._rootDesc = arvados.api('v1')._rootDesc
295         del api._rootDesc.get('resources')['jobs']['methods']['create']
296         runner = arvados_cwl.ArvCwlRunner(api)
297         self.assertEqual(runner.work_api, 'containers')