14008: Merge branch 'master' into 14008-containers-index
[arvados.git] / sdk / cwl / tests / test_job.py
index 1841fd3f89ea7a89aa1ae246f4f0dd5d305c9532..20efe1513981585b3c699f73d0dbba6994f7c682 100644 (file)
@@ -10,7 +10,6 @@ import os
 import unittest
 import copy
 import StringIO
-import datetime
 
 import arvados
 import arvados_cwl
@@ -20,19 +19,35 @@ from schema_salad.ref_resolver import Loader
 from schema_salad.sourceline import cmap
 from .mock_discovery import get_rootDesc
 from .matcher import JsonDiffMatcher, StripYAMLComments
+from .test_container import CollectionMock
 
 if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
-class MockDateTime(datetime.datetime):
-    @classmethod
-    def now(cls):
-        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+class TestJob(unittest.TestCase):
 
-datetime.datetime = MockDateTime
+    def helper(self, runner, enable_reuse=True):
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
-class TestJob(unittest.TestCase):
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+        loadingContext = arvados_cwl.context.ArvLoadingContext(
+            {"avsc_names": avsc_names,
+             "basedir": "",
+             "make_fs_access": make_fs_access,
+             "loader": Loader({}),
+             "metadata": {"cwlVersion": "v1.0"},
+             "makeTool": runner.arv_make_tool})
+        runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+            {"work_api": "jobs",
+             "basedir": "",
+             "name": "test_run_job_"+str(enable_reuse),
+             "make_fs_access": make_fs_access,
+             "enable_reuse": enable_reuse,
+             "priority": 500})
+
+        return loadingContext, runtimeContext
 
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
@@ -43,7 +58,6 @@ class TestJob(unittest.TestCase):
             runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
             runner.ignore_docker_for_reuse = False
             runner.num_retries = 0
-            document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
             list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
             runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
@@ -64,14 +78,13 @@ class TestJob(unittest.TestCase):
                 "id": "#",
                 "class": "CommandLineTool"
             })
-            make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-            arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                     basedir="", make_fs_access=make_fs_access, loader=Loader({}),
-                                                     metadata={"cwlVersion": "v1.0"})
+
+            loadingContext, runtimeContext = self.helper(runner, enable_reuse)
+
+            arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
             arvtool.formatgraph = None
-            for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-                j.run(enable_reuse=enable_reuse)
+            for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+                j.run(runtimeContext)
                 runner.api.jobs().create.assert_called_with(
                     body=JsonDiffMatcher({
                         'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
@@ -113,7 +126,7 @@ class TestJob(unittest.TestCase):
                     runner.api.links().create.side_effect = ApiError(
                         mock.MagicMock(return_value={'status': 403}),
                         'Permission denied')
-                    j.run(enable_reuse=enable_reuse)
+                    j.run(runtimeContext)
                 else:
                     assert not runner.api.links().create.called
 
@@ -130,9 +143,6 @@ class TestJob(unittest.TestCase):
         list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
         runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
 
-        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
-
-
         tool = {
             "inputs": [],
             "outputs": [],
@@ -156,14 +166,13 @@ class TestJob(unittest.TestCase):
             "id": "#",
             "class": "CommandLineTool"
         }
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                                 make_fs_access=make_fs_access, loader=Loader({}),
-                                                 metadata={"cwlVersion": "v1.0"})
+
+        loadingContext, runtimeContext = self.helper(runner)
+
+        arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, loadingContext)
         arvtool.formatgraph = None
-        for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
-            j.run(enable_reuse=True)
+        for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
+            j.run(runtimeContext)
         runner.api.jobs().create.assert_called_with(
             body=JsonDiffMatcher({
                 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
@@ -213,9 +222,13 @@ class TestJob(unittest.TestCase):
                                                         {"items": []},
                                                         {"items": [{"manifest_text": "ABC"}]})
 
-        arvjob = arvados_cwl.ArvadosJob(runner)
-        arvjob.name = "testjob"
-        arvjob.builder = mock.MagicMock()
+        arvjob = arvados_cwl.ArvadosJob(runner,
+                                        mock.MagicMock(),
+                                        {},
+                                        None,
+                                        [],
+                                        [],
+                                        "testjob")
         arvjob.output_callback = mock.MagicMock()
         arvjob.collect_outputs = mock.MagicMock()
         arvjob.collect_outputs.return_value = {"out": "stuff"}
@@ -283,9 +296,13 @@ class TestJob(unittest.TestCase):
             {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2"}]},
         )
 
-        arvjob = arvados_cwl.ArvadosJob(runner)
-        arvjob.name = "testjob"
-        arvjob.builder = mock.MagicMock()
+        arvjob = arvados_cwl.ArvadosJob(runner,
+                                        mock.MagicMock(),
+                                        {},
+                                        None,
+                                        [],
+                                        [],
+                                        "testjob")
         arvjob.output_callback = mock.MagicMock()
         arvjob.collect_outputs = mock.MagicMock()
         arvjob.collect_outputs.return_value = {"out": "stuff"}
@@ -315,22 +332,36 @@ class TestJob(unittest.TestCase):
 
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
 
-    def test_get_intermediate_collection_info(self):
-        arvrunner = mock.MagicMock()
-        arvrunner.intermediate_output_ttl = 60
-        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
 
-        job = arvados_cwl.ArvadosJob(arvrunner)
-
-        info = job._get_intermediate_collection_info()
+class TestWorkflow(unittest.TestCase):
+    def helper(self, runner, enable_reuse=True):
+        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
-        self.assertEqual(info["name"], "Intermediate collection")
-        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
-        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
 
+        document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+        document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+        document_loader.fetch_text = document_loader.fetcher.fetch_text
+        document_loader.check_exists = document_loader.fetcher.check_exists
 
+        loadingContext = arvados_cwl.context.ArvLoadingContext(
+            {"avsc_names": avsc_names,
+             "basedir": "",
+             "make_fs_access": make_fs_access,
+             "loader": document_loader,
+             "metadata": {"cwlVersion": "v1.0"},
+             "construct_tool_object": runner.arv_make_tool})
+        runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+            {"work_api": "jobs",
+             "basedir": "",
+             "name": "test_run_wf_"+str(enable_reuse),
+             "make_fs_access": make_fs_access,
+             "enable_reuse": enable_reuse,
+             "priority": 500})
+
+        return loadingContext, runtimeContext
 
-class TestWorkflow(unittest.TestCase):
     # The test passes no builder.resources
     # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
     @mock.patch("arvados.collection.CollectionReader")
@@ -352,27 +383,22 @@ class TestWorkflow(unittest.TestCase):
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
         runner.num_retries = 0
-        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-        document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
-        document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
-        document_loader.fetch_text = document_loader.fetcher.fetch_text
-        document_loader.check_exists = document_loader.fetcher.check_exists
+        loadingContext, runtimeContext = self.helper(runner)
 
-        tool, metadata = document_loader.resolve_ref("tests/wf/scatter2.cwl")
+        tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
         metadata["cwlVersion"] = tool["cwlVersion"]
 
-        mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+        mockc = mock.MagicMock()
+        mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+        mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
 
-        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                              basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                              makeTool=runner.arv_make_tool, metadata=metadata)
+        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
         arvtool.formatgraph = None
-        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
-        it.next().run()
-        it.next().run()
+        it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+        it.next().run(runtimeContext)
+        it.next().run(runtimeContext)
 
         with open("tests/wf/scatter2_subwf.cwl") as f:
             subwf = StripYAMLComments(f.read())
@@ -388,8 +414,8 @@ class TestWorkflow(unittest.TestCase):
                         'HOME': '$(task.outdir)',
                         'TMPDIR': '$(task.tmpdir)'},
                                'task.vwd': {
-                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
-                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
+                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
                                },
                     'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
                     'task.stdout': 'cwl.output.json'}]},
@@ -406,13 +432,14 @@ class TestWorkflow(unittest.TestCase):
                      ['docker_image_locator', 'in docker', 'arvados/jobs']],
             find_or_create=True)
 
-        mockcollection().open().__enter__().write.assert_has_calls([mock.call(subwf)])
-        mockcollection().open().__enter__().write.assert_has_calls([mock.call(
+        mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
+        mockc.open().__enter__().write.assert_has_calls([mock.call(
 '''{
   "fileblub": {
     "basename": "token.txt",
     "class": "File",
-    "location": "/keep/99999999999999999999999999999999+118/token.txt"
+    "location": "/keep/99999999999999999999999999999999+118/token.txt",
+    "size": 0
   },
   "sleeptime": 5
 }''')])
@@ -438,27 +465,19 @@ class TestWorkflow(unittest.TestCase):
         runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
         runner.ignore_docker_for_reuse = False
         runner.num_retries = 0
-        document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
 
-        make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
-                                         collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
-        document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=api, fs_access=make_fs_access(""))
-        document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
-        document_loader.fetch_text = document_loader.fetcher.fetch_text
-        document_loader.check_exists = document_loader.fetcher.check_exists
+        loadingContext, runtimeContext = self.helper(runner)
 
-        tool, metadata = document_loader.resolve_ref("tests/wf/echo-wf.cwl")
+        tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
         metadata["cwlVersion"] = tool["cwlVersion"]
 
-        mockcollection().portable_data_hash.return_value = "99999999999999999999999999999999+118"
+        mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
 
-        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, work_api="jobs", avsc_names=avsc_names,
-                                              basedir="", make_fs_access=make_fs_access, loader=document_loader,
-                                              makeTool=runner.arv_make_tool, metadata=metadata)
+        arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
         arvtool.formatgraph = None
-        it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
-        it.next().run()
-        it.next().run()
+        it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+        it.next().run(runtimeContext)
+        it.next().run(runtimeContext)
 
         with open("tests/wf/echo-subwf.cwl") as f:
             subwf = StripYAMLComments(f.read())
@@ -474,8 +493,8 @@ class TestWorkflow(unittest.TestCase):
                         'HOME': '$(task.outdir)',
                         'TMPDIR': '$(task.tmpdir)'},
                                'task.vwd': {
-                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999999+118/workflow.cwl',
-                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999999+118/cwl.input.yml'
+                                   'workflow.cwl': '$(task.keep)/99999999999999999999999999999996+99/workflow.cwl',
+                                   'cwl.input.yml': '$(task.keep)/99999999999999999999999999999996+99/cwl.input.yml'
                                },
                     'command': [u'cwltool', u'--no-container', u'--move-outputs', u'--preserve-entire-environment', u'workflow.cwl#main', u'cwl.input.yml'],
                     'task.stdout': 'cwl.output.json'}]},