Remove _get_intermediate_collection_info method and create function
authorFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 9 Jul 2018 13:08:59 +0000 (15:08 +0200)
committerFuad Muhic <fmuhic@capeannenterprises.com>
Mon, 9 Jul 2018 13:11:14 +0000 (15:11 +0200)
with same name

Arvados-DCO-1.1-Signed-off-by: Fuad Muhic <fmuhic@capeannenterprises.com>

sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/context.py
sdk/cwl/arvados_cwl/pathmapper.py
sdk/cwl/arvados_cwl/util.py [new file with mode: 0644]
sdk/cwl/tests/test_container.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_pathmapper.py
sdk/cwl/tests/test_util.py [new file with mode: 0644]

index 03f5a5eb678201f9610cf5fd76e085260695b742..2618ddecacac0db113e857cc0bd5440f5cd46472 100644 (file)
@@ -11,6 +11,7 @@ import datetime
 import ciso8601
 import uuid
 
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
 import ruamel.yaml as yaml
 
 from cwltool.errors import WorkflowException
@@ -21,7 +22,6 @@ from cwltool.job import JobBase
 
 import arvados.collection
 
-from arvados.errors import ApiError
 from .arvdocker import arv_docker_get_image
 from . import done
 from .runner import Runner, arvados_jobs_image, packed_workflow, trim_anonymous_location, remove_redundant_fields
@@ -166,7 +166,9 @@ class ArvadosContainer(JobBase):
 
                 keepemptydirs(vwd)
 
-                info = self._get_intermediate_collection_info()
+                if not runtimeContext.current_container:
+                    runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = get_intermediate_collection_info(runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
                 vwd.save_new(name=info["name"],
                              ensure_unique_name=True,
                              trash_at=info["trash_at"],
@@ -341,26 +343,6 @@ class ArvadosContainer(JobBase):
         finally:
             self.output_callback(outputs, processStatus)
 
-    def _get_intermediate_collection_info(self):
-            trash_time = None
-            if self.arvrunner.intermediate_output_ttl > 0:
-                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
-
-            current_container_uuid = None
-            try:
-                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
-                current_container_uuid = current_container['uuid']
-            except ApiError as e:
-                # Status code 404 just means we're not running in a container.
-                if e.resp.status != 404:
-                    logger.info("Getting current container: %s", e)
-            props = {"type": "Intermediate",
-                          "container": current_container_uuid}
-
-            return {"name" : "Intermediate collection",
-                    "trash_at" : trash_time,
-                    "properties" : props}
-
 
 class RunnerContainer(Runner):
     """Submit and manage a container that runs arvados-cwl-runner."""
index 2d112c87a7c5affa2b6d490e526ae1e2d2aea28b..36a047213ac66339088239068a8fb829124a7cc9 100644 (file)
@@ -6,7 +6,6 @@ import logging
 import re
 import copy
 import json
-import datetime
 import time
 
 from cwltool.process import shortname, UnsupportedRequirement
@@ -19,6 +18,7 @@ from cwltool.job import JobBase
 
 from schema_salad.sourceline import SourceLine
 
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
 import ruamel.yaml as yaml
 
 import arvados.collection
@@ -77,7 +77,9 @@ class ArvadosJob(JobBase):
 
                 if vwd:
                     with Perf(metrics, "generatefiles.save_new %s" % self.name):
-                        info = self._get_intermediate_collection_info()
+                        if not runtimeContext.current_container:
+                            runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                        info = get_intermediate_collection_info(runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
                         vwd.save_new(name=info["name"],
                                      ensure_unique_name=True,
                                      trash_at=info["trash_at"],
@@ -282,26 +284,6 @@ class ArvadosJob(JobBase):
         finally:
             self.output_callback(outputs, processStatus)
 
-    def _get_intermediate_collection_info(self):
-            trash_time = None
-            if self.arvrunner.intermediate_output_ttl > 0:
-                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
-
-            current_container_uuid = None
-            try:
-                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
-                current_container_uuid = current_container['uuid']
-            except ApiError as e:
-                # Status code 404 just means we're not running in a container.
-                if e.resp.status != 404:
-                    logger.info("Getting current container: %s", e)
-            props = {"type": "Intermediate",
-                          "container": current_container_uuid}
-
-            return {"name" : "Intermediate collection",
-                    "trash_at" : trash_time,
-                    "properties" : props}
-
 
 class RunnerJob(Runner):
     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
index cf0c1fb7e4576f9ef9f6d0f809e05b47e1186586..81e256ed545adbdf5e3be00eccdd108f65be26d4 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 from cwltool.context import LoadingContext, RuntimeContext
 
 class ArvLoadingContext(LoadingContext):
@@ -24,5 +28,6 @@ class ArvRuntimeContext(RuntimeContext):
         self.wait = True
         self.cwl_runner_job = None
         self.storage_classes = "default"
+        self.current_container = None
 
         super(ArvRuntimeContext, self).__init__(kwargs)
index 9a76f81da54bbd1131ade82be8adda9e5d21d2a5..a3b6a4dd9b956abd58c963230cee937c6ed8d110 100644 (file)
@@ -7,8 +7,8 @@ import logging
 import uuid
 import os
 import urllib
-import datetime
 
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
 import arvados.commands.run
 import arvados.collection
 
@@ -155,7 +155,8 @@ class ArvPathMapper(PathMapper):
                 for l in srcobj.get("listing", []):
                     self.addentry(l, c, ".", remap)
 
-                info = self._get_intermediate_collection_info()
+                container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = get_intermediate_collection_info(container, self.arvrunner.intermediate_output_ttl)
 
                 c.save_new(name=info["name"],
                            owner_uuid=self.arvrunner.project_uuid,
@@ -173,7 +174,8 @@ class ArvPathMapper(PathMapper):
                                                   num_retries=self.arvrunner.num_retries                                                  )
                 self.addentry(srcobj, c, ".", remap)
 
-                info = self._get_intermediate_collection_info()
+                container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+                info = get_intermediate_collection_info(container, self.arvrunner.intermediate_output_ttl)
 
                 c.save_new(name=info["name"],
                            owner_uuid=self.arvrunner.project_uuid,
@@ -212,26 +214,6 @@ class ArvPathMapper(PathMapper):
         else:
             return None
 
-    def _get_intermediate_collection_info(self):
-            trash_time = None
-            if self.arvrunner.intermediate_output_ttl > 0:
-                trash_time = datetime.datetime.now() + datetime.timedelta(seconds=self.arvrunner.intermediate_output_ttl)
-
-            current_container_uuid = None
-            try:
-                current_container = self.arvrunner.api.containers().current().execute(num_retries=self.arvrunner.num_retries)
-                current_container_uuid = current_container['uuid']
-            except ApiError as e:
-                # Status code 404 just means we're not running in a container.
-                if e.resp.status != 404:
-                    logger.info("Getting current container: %s", e)
-            props = {"type": "Intermediate",
-                          "container": current_container_uuid}
-
-            return {"name" : "Intermediate collection",
-                    "trash_at" : trash_time,
-                    "properties" : props}
-
 
 class StagingPathMapper(PathMapper):
     _follow_dirs = True
diff --git a/sdk/cwl/arvados_cwl/util.py b/sdk/cwl/arvados_cwl/util.py
new file mode 100644 (file)
index 0000000..7dafeda
--- /dev/null
@@ -0,0 +1,28 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import datetime
+from arvados.errors import ApiError
+
+def get_intermediate_collection_info(current_container, intermediate_output_ttl):
+        name = "Intermediate collection"
+        trash_time = None
+        if intermediate_output_ttl > 0:
+            trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl)
+        container_uuid = None
+        if current_container:
+            container_uuid = current_container['uuid']
+        props = {"type": "intermediate", "container": container_uuid}
+
+        return {"name" : name, "trash_at" : trash_time, "properties" : props}
+
+def get_current_container(api, num_retries=0, logger=None):
+    current_container = None
+    try:
+        current_container = api.containers().current().execute(num_retries=num_retries)
+    except ApiError as e:
+        # Status code 404 just means we're not running in a container.
+        if e.resp.status != 404 and logger:
+            logger.info("Getting current container: %s", e)
+    return current_container;
index bd5b1a1b613850995ecdc36cb741d81c8b13766e..e7ce9c37092084a171b8e0f0010d9cb9fa44277a 100644 (file)
@@ -9,7 +9,6 @@ import logging
 import mock
 import unittest
 import os
-import datetime
 import functools
 import cwltool.process
 import cwltool.secrets
@@ -22,13 +21,6 @@ if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
-class MockDateTime(datetime.datetime):
-    @classmethod
-    def now(cls):
-        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
-
-datetime.datetime = MockDateTime
-
 class TestContainer(unittest.TestCase):
 
     def helper(self, runner, enable_reuse=True):
@@ -698,17 +690,3 @@ class TestContainer(unittest.TestCase):
 
         _, kwargs = runner.api.container_requests().create.call_args
         self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
-
-
-    def test_get_intermediate_collection_info(self):
-        arvrunner = mock.MagicMock()
-        arvrunner.intermediate_output_ttl = 60
-        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
-
-        container = arvados_cwl.ArvadosContainer(arvrunner)
-
-        info = container._get_intermediate_collection_info()
-
-        self.assertEqual(info["name"], "Intermediate collection")
-        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
-        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
index b9f1c396172b6d32e0b22423878ed2b0c78696cf..c110bc5d53cd4634656d93fab2937954be973d07 100644 (file)
@@ -10,7 +10,6 @@ import os
 import unittest
 import copy
 import StringIO
-import datetime
 
 import arvados
 import arvados_cwl
@@ -25,13 +24,6 @@ if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
     logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
 
-class MockDateTime(datetime.datetime):
-    @classmethod
-    def now(cls):
-        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
-
-datetime.datetime = MockDateTime
-
 class TestJob(unittest.TestCase):
 
     def helper(self, runner, enable_reuse=True):
@@ -339,20 +331,6 @@ class TestJob(unittest.TestCase):
 
         arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
 
-    def test_get_intermediate_collection_info(self):
-        arvrunner = mock.MagicMock()
-        arvrunner.intermediate_output_ttl = 60
-        arvrunner.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
-
-        job = arvados_cwl.ArvadosJob(arvrunner)
-
-        info = job._get_intermediate_collection_info()
-
-        self.assertEqual(info["name"], "Intermediate collection")
-        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
-        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
-
-
 
 class TestWorkflow(unittest.TestCase):
     def helper(self, runner, enable_reuse=True):
index e5326147d7737d9f0f0d2e52fda8120a54770ae0..eaa57114222233d6bcbd02ff2674c89f5169b168 100644 (file)
@@ -9,7 +9,6 @@ import unittest
 import json
 import logging
 import os
-import datetime
 
 import arvados
 import arvados.keep
@@ -27,13 +26,6 @@ def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPatter
         c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
         c.fn = fnPattern % (pdh, os.path.basename(c.fn))
 
-class MockDateTime(datetime.datetime):
-    @classmethod
-    def now(cls):
-        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
-
-datetime.datetime = MockDateTime
-
 class TestPathmap(unittest.TestCase):
     def setUp(self):
         self.api = mock.MagicMock()
@@ -109,19 +101,3 @@ class TestPathmap(unittest.TestCase):
                 "class": "File",
                 "location": "file:tests/hw.py"
             }], "", "/test/%s", "/test/%s/%s")
-
-    def test_get_intermediate_collection_info(self):
-        self.api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
-        arvrunner = arvados_cwl.ArvCwlRunner(self.api)
-        arvrunner.intermediate_output_ttl = 60
-
-        path_mapper = ArvPathMapper(arvrunner, [{
-            "class": "File",
-            "location": "keep:99999999999999999999999999999991+99/hw.py"
-        }], "", "/test/%s", "/test/%s/%s")
-
-        info = path_mapper._get_intermediate_collection_info()
-
-        self.assertEqual(info["name"], "Intermediate collection")
-        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 1))
-        self.assertEqual(info["properties"], {"type" : "Intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
diff --git a/sdk/cwl/tests/test_util.py b/sdk/cwl/tests/test_util.py
new file mode 100644 (file)
index 0000000..e42197e
--- /dev/null
@@ -0,0 +1,44 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import unittest
+import mock
+import datetime
+import httplib2
+
+from arvados_cwl.util import *
+from arvados.errors import ApiError
+
+class MockDateTime(datetime.datetime):
+    @classmethod
+    def utcnow(cls):
+        return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
+class TestUtil(unittest.TestCase):
+    def test_get_intermediate_collection_info(self):
+        current_container = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+        intermediate_output_ttl = 120
+
+        info = get_intermediate_collection_info(current_container, intermediate_output_ttl)
+
+        self.assertEqual(info["name"], "Intermediate collection")
+        self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 2, 0, 0))
+        self.assertEqual(info["properties"], {"type" : "intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+    def test_get_current_container_success(self):
+        api = mock.MagicMock()
+        api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+        current_container = get_current_container(api)
+
+        self.assertEqual(current_container, {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+    def test_get_current_container_error(self):
+        api = mock.MagicMock()
+        api.containers().current().execute.side_effect = ApiError(httplib2.Response({"status": 300}), "")
+        logger = mock.MagicMock()
+
+        self.assertRaises(ApiError, get_current_container(api, num_retries=0, logger=logger))