Merge branch 'master' into 14259-pysdk-remote-block-copy
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Tue, 6 Nov 2018 14:41:06 +0000 (11:41 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Tue, 6 Nov 2018 14:41:06 +0000 (11:41 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
sdk/python/tests/test_collections.py
sdk/python/tests/test_keep_client.py

index f4580f346bbed43a5642e974d54c8e5922c24efd..f58c882e24f05d7cf929e8a25c2455f67ffc5125 100644 (file)
@@ -900,6 +900,14 @@ class ArvadosFile(object):
                 return True
         return False
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Returns True if any of the segment's locators has a +R signature"""
+        for s in self._segments:
+            if '+R' in s.locator:
+                return True
+        return False
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
index 55797bdfebd53a49db6acd5bf6ea6f1443a317f3..d63e9424ee45a6e28d2d353e17a62a15aa8708b6 100644 (file)
@@ -520,6 +520,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
+        self._has_remote_blocks = False
         self._callback = None
         self._items = {}
 
@@ -544,6 +545,15 @@ class RichCollectionBase(CollectionBase):
     def stream_name(self):
         raise NotImplementedError()
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Recursively check for a +R segment locator signature."""
+
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -901,6 +911,8 @@ class RichCollectionBase(CollectionBase):
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
         target_dir.add(source_obj, target_name, overwrite, False)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self._has_remote_blocks = True
 
     @must_be_writable
     @synchronized
@@ -927,6 +939,8 @@ class RichCollectionBase(CollectionBase):
         if not source_obj.writable():
             raise IOError(errno.EROFS, "Source collection is read only", source)
         target_dir.add(source_obj, target_name, overwrite, True)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self._has_remote_blocks = True
 
     def portable_manifest_text(self, stream_name="."):
         """Get the manifest text for this collection, sub collections and files.
@@ -1037,18 +1051,19 @@ class RichCollectionBase(CollectionBase):
           different subdirectories.
 
         """
-        for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
-            for s in self[filename].segments():
-                if '+R' in s.locator:
-                    try:
-                        loc = remote_blocks[s.locator]
-                    except KeyError:
-                        loc = self._my_keep().refresh_signature(s.locator)
-                        remote_blocks[s.locator] = loc
-                    s.locator = loc
-                    self.set_committed(False)
-        for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
-            remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+        for item in self:
+            if isinstance(self[item], ArvadosFile):
+                for s in self[item].segments():
+                    if '+R' in s.locator:
+                        try:
+                            loc = remote_blocks[s.locator]
+                        except KeyError:
+                            loc = self._my_keep().refresh_signature(s.locator)
+                            remote_blocks[s.locator] = loc
+                        s.locator = loc
+                        self.set_committed(False)
+            elif isinstance(self[item], RichCollectionBase):
+                remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
         return remote_blocks
 
     @synchronized
@@ -1285,8 +1300,12 @@ class Collection(RichCollectionBase):
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
+                if not self._has_local_collection_uuid():
+                    self._has_remote_blocks = True
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
+                if '+R' in self._manifest_text:
+                    self._has_remote_blocks = True
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
@@ -1536,10 +1555,11 @@ class Collection(RichCollectionBase):
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
-
         if not self.committed():
+            if self._has_remote_blocks:
+                # Copy any remote blocks to the local cluster.
+                self._copy_remote_blocks(remote_blocks={})
+                self._has_remote_blocks = False
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
             elif not self._has_local_collection_uuid():
@@ -1628,8 +1648,10 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
+        if self._has_remote_blocks:
+            # Copy any remote blocks to the local cluster.
+            self._copy_remote_blocks(remote_blocks={})
+            self._has_remote_blocks = False
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
index 9618ee5ce9ac5551931093b24695514379e1376f..1b6376e9be1035dac69bb34da974c2c486477627 100644 (file)
@@ -377,7 +377,7 @@ class KeepClient(object):
                         curl.setopt(pycurl.SSL_VERIFYPEER, 0)
                     if method == "HEAD":
                         curl.setopt(pycurl.NOBODY, True)
-                    self._setcurltimeouts(curl, timeout)
+                    self._setcurltimeouts(curl, timeout, method=="HEAD")
 
                     try:
                         curl.perform()
@@ -516,7 +516,7 @@ class KeepClient(object):
                 self.upload_counter.add(len(body))
             return True
 
-        def _setcurltimeouts(self, curl, timeouts):
+        def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
             if not timeouts:
                 return
             elif isinstance(timeouts, tuple):
@@ -529,8 +529,9 @@ class KeepClient(object):
                 conn_t, xfer_t = (timeouts, timeouts)
                 bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
             curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
-            curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
-            curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+            if not ignore_bandwidth:
+                curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+                curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
 
         def _headerfunction(self, header_line):
             if isinstance(header_line, bytes):
@@ -1094,10 +1095,7 @@ class KeepClient(object):
 
             # Always cache the result, then return it if we succeeded.
             if loop.success():
-                if method == "HEAD":
-                    return blob or True
-                else:
-                    return blob
+                return blob
         finally:
             if slot is not None:
                 slot.set(blob)
index 491f95d9dba6e5e4115bab751753507e6c64e011..ac18c44c6844c2f54fbc54d91acf094778834b3c 100644 (file)
@@ -1198,8 +1198,12 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
         c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
         self.assertEqual(
             len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, c.manifest_text())), 0)
         c.save_new()
         rs_mock.assert_called()
+        self.assertEqual(
+            len(re.findall(self.remote_locator_re, c.manifest_text())), 0)
         self.assertEqual(
             len(re.findall(self.local_locator_re, c.manifest_text())), 1)
 
@@ -1223,10 +1227,13 @@ class NewCollectionTestCaseWithServersAndTokens(run_test_server.TestCaseWithServ
             len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
         # Copy remote file to local collection
         local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+        self.assertEqual(
+            len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
         self.assertEqual(
             len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
         # Save local collection: remote block should be copied
         local_c.save()
+        rs_mock.assert_called()
         self.assertEqual(
             len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
         self.assertEqual(
index 192a21cd66b4af3cc062b72345263b5afb7ccdef..d6b3a2a12dc6f4d4d8bb997c25f43cc21e8cda62 100644 (file)
@@ -412,10 +412,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+                None)
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
-                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
+                None)
 
     def test_proxy_get_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -446,10 +446,10 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
                 int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+                None)
             self.assertEqual(
                 mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
-                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
+                None)
 
     def test_proxy_put_timeout(self):
         api_client = self.mock_keep_services(service_type='proxy', count=1)
@@ -560,6 +560,43 @@ class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
         self.assertEqual(1, req_mock.call_count)
 
 
+@tutil.skip_sleep
+class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+    def setUp(self):
+        self.api_client = self.mock_keep_services(count=2)
+        self.keep_client = arvados.KeepClient(api_client=self.api_client)
+        self.data = b'xyzzy'
+        self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+    @mock.patch('arvados.KeepClient.KeepService.get')
+    def test_get_request_cache(self, get_mock):
+        with tutil.mock_keep_responses(self.data, 200, 200):
+            self.keep_client.get(self.locator)
+            self.keep_client.get(self.locator)
+        # Request already cached, don't require more than one request
+        get_mock.assert_called_once()
+
+    @mock.patch('arvados.KeepClient.KeepService.get')
+    def test_head_request_cache(self, get_mock):
+        with tutil.mock_keep_responses(self.data, 200, 200):
+            self.keep_client.head(self.locator)
+            self.keep_client.head(self.locator)
+        # Don't cache HEAD requests so that they're not confused with GET reqs
+        self.assertEqual(2, get_mock.call_count)
+
+    @mock.patch('arvados.KeepClient.KeepService.get')
+    def test_head_and_then_get_return_different_responses(self, get_mock):
+        head_resp = None
+        get_resp = None
+        get_mock.side_effect = ['first response', 'second response']
+        with tutil.mock_keep_responses(self.data, 200, 200):
+            head_resp = self.keep_client.head(self.locator)
+            get_resp = self.keep_client.get(self.locator)
+        self.assertEqual('first response', head_resp)
+        # First reponse was not cached because it was from a HEAD request.
+        self.assertNotEqual(head_resp, get_resp)
+
+
 @tutil.skip_sleep
 class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
     def setUp(self):
@@ -849,7 +886,7 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
         loc = kc.put(self.DATA, copies=1, num_retries=0)
         self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
-            with self.assertRaises(arvados.errors.KeepReadError) as e:
+            with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
             with self.assertRaises(arvados.errors.KeepWriteError):
@@ -861,14 +898,13 @@ class KeepClientTimeout(keepstub.StubKeepServers, unittest.TestCase):
         self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
         self.server.setdelays(response=self.TIMEOUT_TIME)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
-            with self.assertRaises(arvados.errors.KeepReadError) as e:
+            with self.assertRaises(arvados.errors.KeepReadError):
                 kc.get(loc, num_retries=0)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 kc.put(self.DATA, copies=1, num_retries=0)
         with self.assertTakesGreater(self.TIMEOUT_TIME):
-            with self.assertRaises(arvados.errors.KeepReadError) as e:
-                kc.head(loc, num_retries=0)
+            kc.head(loc, num_retries=0)
 
     def test_low_bandwidth_with_server_mid_delay_failure(self):
         kc = self.keepClient()