9849: Cache docker lookup.
[arvados.git] / sdk / cwl / tests / test_job.py
1 import functools
2 import json
3 import logging
4 import mock
5 import os
6 import unittest
7 import copy
8
9 import arvados
10 import arvados_cwl
11 import cwltool.process
12 from schema_salad.ref_resolver import Loader
13 from .mock_discovery import get_rootDesc
14
15 if not os.getenv('ARVADOS_DEBUG'):
16     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
17     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
18
19 class TestJob(unittest.TestCase):
20
21     # The test passes no builder.resources
22     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
23     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
24     def test_run(self, list_images_in_arv):
25         for enable_reuse in (True, False):
26             runner = mock.MagicMock()
27             runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
28             runner.ignore_docker_for_reuse = False
29             runner.num_retries = 0
30             document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
31
32             list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
33             runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
34
35             tool = {
36                 "inputs": [],
37                 "outputs": [],
38                 "baseCommand": "ls",
39                 "arguments": [{"valueFrom": "$(runtime.outdir)"}]
40             }
41             make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
42             arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
43                                                      basedir="", make_fs_access=make_fs_access, loader=Loader({}))
44             arvtool.formatgraph = None
45             for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
46                 j.run(enable_reuse=enable_reuse)
47                 runner.api.jobs().create.assert_called_with(
48                     body={
49                         'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
50                         'runtime_constraints': {},
51                         'script_parameters': {
52                             'tasks': [{
53                                 'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
54                                 'command': ['ls', '$(task.outdir)']
55                             }],
56                         },
57                         'script_version': 'master',
58                         'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
59                         'repository': 'arvados',
60                         'script': 'crunchrunner',
61                         'runtime_constraints': {
62                             'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
63                             'min_cores_per_node': 1,
64                             'min_ram_mb_per_node': 1024,
65                             'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
66                         }
67                     },
68                     find_or_create=enable_reuse,
69                     filters=[['repository', '=', 'arvados'],
70                              ['script', '=', 'crunchrunner'],
71                              ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
72                              ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
73                 )
74
75     # The test passes some fields in builder.resources
76     # For the remaining fields, the defaults will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
77     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
78     def test_resource_requirements(self, list_images_in_arv):
79         runner = mock.MagicMock()
80         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
81         runner.ignore_docker_for_reuse = False
82         runner.num_retries = 0
83         arvados_cwl.add_arv_hints()
84
85         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
86         runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
87
88         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
89
90
91         tool = {
92             "inputs": [],
93             "outputs": [],
94             "hints": [{
95                 "class": "ResourceRequirement",
96                 "coresMin": 3,
97                 "ramMin": 3000,
98                 "tmpdirMin": 4000
99             }, {
100                 "class": "http://arvados.org/cwl#RuntimeConstraints",
101                 "keep_cache": 512,
102                 "outputDirType": "keep_output_dir"
103             }, {
104                 "class": "http://arvados.org/cwl#APIRequirement",
105             }],
106             "baseCommand": "ls"
107         }
108         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
109         arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
110                                                  make_fs_access=make_fs_access, loader=Loader({}))
111         arvtool.formatgraph = None
112         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
113             j.run()
114         runner.api.jobs().create.assert_called_with(
115             body={
116                 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
117                 'runtime_constraints': {},
118                 'script_parameters': {
119                     'tasks': [{
120                         'task.env': {'HOME': '$(task.outdir)', 'TMPDIR': '$(task.tmpdir)'},
121                         'task.keepTmpOutput': True,
122                         'command': ['ls']
123                     }]
124             },
125             'script_version': 'master',
126                 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
127                 'repository': 'arvados',
128                 'script': 'crunchrunner',
129                 'runtime_constraints': {
130                     'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
131                     'min_cores_per_node': 3,
132                     'min_ram_mb_per_node': 3000,
133                     'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
134                     'keep_cache_mb_per_task': 512
135                 }
136             },
137             find_or_create=True,
138             filters=[['repository', '=', 'arvados'],
139                      ['script', '=', 'crunchrunner'],
140                      ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
141                      ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]])
142
143     @mock.patch("arvados.collection.CollectionReader")
144     def test_done(self, reader):
145         api = mock.MagicMock()
146
147         runner = mock.MagicMock()
148         runner.api = api
149         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
150         runner.num_retries = 0
151         runner.ignore_docker_for_reuse = False
152
153         reader().open.return_value = []
154         api.collections().list().execute.side_effect = ({"items": []},
155                                                         {"items": [{"manifest_text": "XYZ"}]})
156
157         arvjob = arvados_cwl.ArvadosJob(runner)
158         arvjob.name = "testjob"
159         arvjob.builder = mock.MagicMock()
160         arvjob.output_callback = mock.MagicMock()
161         arvjob.collect_outputs = mock.MagicMock()
162
163         arvjob.done({
164             "state": "Complete",
165             "output": "99999999999999999999999999999993+99",
166             "log": "99999999999999999999999999999994+99",
167             "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
168         })
169
170         api.collections().list.assert_has_calls([
171             mock.call(),
172             mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
173                           ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
174                           ['name', '=', 'Output 9999999 of testjob']]),
175             mock.call().execute(num_retries=0),
176             mock.call(limit=1, filters=[['portable_data_hash', '=', '99999999999999999999999999999993+99']],
177                  select=['manifest_text']),
178             mock.call().execute(num_retries=0)])
179
180         api.collections().create.assert_called_with(
181             ensure_unique_name=True,
182             body={'portable_data_hash': '99999999999999999999999999999993+99',
183                   'manifest_text': 'XYZ',
184                   'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
185                   'name': 'Output 9999999 of testjob'})
186
187     @mock.patch("arvados.collection.CollectionReader")
188     def test_done_use_existing_collection(self, reader):
189         api = mock.MagicMock()
190
191         runner = mock.MagicMock()
192         runner.api = api
193         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
194         runner.num_retries = 0
195
196         reader().open.return_value = []
197         api.collections().list().execute.side_effect = ({"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},)
198
199         arvjob = arvados_cwl.ArvadosJob(runner)
200         arvjob.name = "testjob"
201         arvjob.builder = mock.MagicMock()
202         arvjob.output_callback = mock.MagicMock()
203         arvjob.collect_outputs = mock.MagicMock()
204
205         arvjob.done({
206             "state": "Complete",
207             "output": "99999999999999999999999999999993+99",
208             "log": "99999999999999999999999999999994+99",
209             "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
210         })
211
212         api.collections().list.assert_has_calls([
213             mock.call(),
214             mock.call(filters=[['owner_uuid', '=', 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'],
215                                ['portable_data_hash', '=', '99999999999999999999999999999993+99'],
216                                ['name', '=', 'Output 9999999 of testjob']]),
217             mock.call().execute(num_retries=0)])
218
219         self.assertFalse(api.collections().create.called)
220
221
222 class TestWorkflow(unittest.TestCase):
223     # The test passes no builder.resources
224     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
225     @mock.patch("arvados.collection.Collection")
226     @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
227     def test_run(self, list_images_in_arv, mockcollection):
228         arvados_cwl.add_arv_hints()
229
230         api = mock.MagicMock()
231         api._rootDesc = get_rootDesc()
232
233         runner = arvados_cwl.ArvCwlRunner(api)
234         self.assertEqual(runner.work_api, 'jobs')
235
236         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
237         runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
238
239         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
240         runner.ignore_docker_for_reuse = False
241         runner.num_retries = 0
242         document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
243
244         tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
245         metadata["cwlVersion"] = tool["cwlVersion"]
246
247         mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
248
249         make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess, api_client=runner.api)
250         arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
251                                               basedir="", make_fs_access=make_fs_access, loader=document_loader,
252                                               makeTool=runner.arv_make_tool, metadata=metadata)
253         arvtool.formatgraph = None
254         it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
255         it.next().run()
256         it.next().run()
257
258         with open("tests/wf/scatter2_subwf.cwl") as f:
259             subwf = f.read()
260
261         runner.api.jobs().create.assert_called_with(
262             body={
263                 'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
264                 'repository': 'arvados',
265                 'script_version': 'master',
266                 'script': 'crunchrunner',
267                 'script_parameters': {
268                     'tasks': [{'task.env': {
269                         'HOME': '$(task.outdir)',
270                         'TMPDIR': '$(task.tmpdir)'},
271                                'task.vwd': {
272                                    'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
273                                    'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
274                                },
275                     'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
276                     'task.stdout': 'cwl.output.json'}]},
277                 'runtime_constraints': {
278                     'min_scratch_mb_per_node': 2048,
279                     'min_cores_per_node': 1,
280                     'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
281                     'min_ram_mb_per_node': 1024
282                 },
283                 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
284             filters=[['repository', '=', 'arvados'],
285                      ['script', '=', 'crunchrunner'],
286                      ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
287                      ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]],
288             find_or_create=True)
289
290         mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
291         mockcollection().open().__enter__().write.assert_has_calls([mock.call('{sleeptime: 5}')])
292
293     def test_default_work_api(self):
294         arvados_cwl.add_arv_hints()
295
296         api = mock.MagicMock()
297         api._rootDesc = copy.deepcopy(get_rootDesc())
298         del api._rootDesc.get('resources')['jobs']['methods']['create']
299         runner = arvados_cwl.ArvCwlRunner(api)
300         self.assertEqual(runner.work_api, 'containers')