Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>
return True
return False
+ @synchronized
+ def has_remote_blocks(self):
+ """Returns True if any of the segment's locators has a +R signature"""
+ for s in self._segments:
+ if '+R' in s.locator:
+ return True
+ return False
+
@synchronized
def segments(self):
return copy.copy(self._segments)
def __init__(self, parent=None):
self.parent = parent
self._committed = False
+ self._has_remote_blocks = False
self._callback = None
self._items = {}
def stream_name(self):
raise NotImplementedError()
+ @synchronized
+ def has_remote_blocks(self):
+ """Recursively check for a +R segment locator signature."""
+
+ for item in self:
+ if self[item].has_remote_blocks():
+ return True
+ return False
+
@must_be_writable
@synchronized
def find_or_create(self, path, create_type):
source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
target_dir.add(source_obj, target_name, overwrite, False)
+ if not self._has_remote_blocks and source_obj.has_remote_blocks():
+ self._has_remote_blocks = True
@must_be_writable
@synchronized
if not source_obj.writable():
raise IOError(errno.EROFS, "Source collection is read only", source)
target_dir.add(source_obj, target_name, overwrite, True)
+ if not self._has_remote_blocks and source_obj.has_remote_blocks():
+ self._has_remote_blocks = True
def portable_manifest_text(self, stream_name="."):
"""Get the manifest text for this collection, sub collections and files.
different subdirectories.
"""
- for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
- for s in self[filename].segments():
- if '+R' in s.locator:
- try:
- loc = remote_blocks[s.locator]
- except KeyError:
- loc = self._my_keep().refresh_signature(s.locator)
- remote_blocks[s.locator] = loc
- s.locator = loc
- self.set_committed(False)
- for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
- remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+ for item in self:
+ if isinstance(self[item], ArvadosFile):
+ for s in self[item].segments():
+ if '+R' in s.locator:
+ try:
+ loc = remote_blocks[s.locator]
+ except KeyError:
+ loc = self._my_keep().refresh_signature(s.locator)
+ remote_blocks[s.locator] = loc
+ s.locator = loc
+ self.set_committed(False)
+ elif isinstance(self[item], RichCollectionBase):
+ remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
return remote_blocks
@synchronized
self._manifest_locator = manifest_locator_or_text
elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
+ if not self._has_local_collection_uuid():
+ self._has_remote_blocks = True
elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
+ if '+R' in self._manifest_text:
+ self._has_remote_blocks = True
else:
raise errors.ArgumentError(
"Argument to CollectionReader is not a manifest or a collection UUID")
t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
body["trash_at"] = t
- # Copy any remote blocks to the local cluster.
- self._copy_remote_blocks(remote_blocks={})
-
if not self.committed():
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
elif not self._has_local_collection_uuid():
if trash_at and type(trash_at) is not datetime.datetime:
raise errors.ArgumentError("trash_at must be datetime type.")
- # Copy any remote blocks to the local cluster.
- self._copy_remote_blocks(remote_blocks={})
+ if self._has_remote_blocks:
+ # Copy any remote blocks to the local cluster.
+ self._copy_remote_blocks(remote_blocks={})
+ self._has_remote_blocks = False
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
curl.setopt(pycurl.SSL_VERIFYPEER, 0)
if method == "HEAD":
curl.setopt(pycurl.NOBODY, True)
- self._setcurltimeouts(curl, timeout)
+ self._setcurltimeouts(curl, timeout, method=="HEAD")
try:
curl.perform()
self.upload_counter.add(len(body))
return True
- def _setcurltimeouts(self, curl, timeouts):
+ def _setcurltimeouts(self, curl, timeouts, ignore_bandwidth=False):
if not timeouts:
return
elif isinstance(timeouts, tuple):
conn_t, xfer_t = (timeouts, timeouts)
bandwidth_bps = KeepClient.DEFAULT_TIMEOUT[2]
curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(conn_t*1000))
- curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
- curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
+ if not ignore_bandwidth:
+ curl.setopt(pycurl.LOW_SPEED_TIME, int(math.ceil(xfer_t)))
+ curl.setopt(pycurl.LOW_SPEED_LIMIT, int(math.ceil(bandwidth_bps)))
def _headerfunction(self, header_line):
if isinstance(header_line, bytes):
# Always cache the result, then return it if we succeeded.
if loop.success():
- if method == "HEAD":
- return blob or True
- else:
- return blob
+ return blob
finally:
if slot is not None:
slot.set(blob)
c = Collection(". " + remote_block_loc + " 0:3:foofile.txt\n")
self.assertEqual(
len(re.findall(self.remote_locator_re, c.manifest_text())), 1)
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, c.manifest_text())), 0)
c.save_new()
rs_mock.assert_called()
+ self.assertEqual(
+ len(re.findall(self.remote_locator_re, c.manifest_text())), 0)
self.assertEqual(
len(re.findall(self.local_locator_re, c.manifest_text())), 1)
len(re.findall(self.remote_locator_re, local_c.manifest_text())), 0)
# Copy remote file to local collection
local_c.copy('./foofile.txt', './copied/foofile.txt', remote_c)
+ self.assertEqual(
+ len(re.findall(self.local_locator_re, local_c.manifest_text())), 1)
self.assertEqual(
len(re.findall(self.remote_locator_re, local_c.manifest_text())), 1)
# Save local collection: remote block should be copied
local_c.save()
+ rs_mock.assert_called()
self.assertEqual(
len(re.findall(self.local_locator_re, local_c.manifest_text())), 2)
self.assertEqual(
int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
+ None)
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
- int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
+ None)
def test_proxy_get_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
+ None)
self.assertEqual(
mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
- int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
+ None)
def test_proxy_put_timeout(self):
api_client = self.mock_keep_services(service_type='proxy', count=1)
self.assertEqual(1, req_mock.call_count)
+@tutil.skip_sleep
+class KeepClientCacheTestCase(unittest.TestCase, tutil.ApiClientMock):
+ def setUp(self):
+ self.api_client = self.mock_keep_services(count=2)
+ self.keep_client = arvados.KeepClient(api_client=self.api_client)
+ self.data = b'xyzzy'
+ self.locator = '1271ed5ef305aadabc605b1609e24c52'
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_get_request_cache(self, get_mock):
+ with tutil.mock_keep_responses(self.data, 200, 200):
+ self.keep_client.get(self.locator)
+ self.keep_client.get(self.locator)
+ # Request already cached, don't require more than one request
+ get_mock.assert_called_once()
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_head_request_cache(self, get_mock):
+ with tutil.mock_keep_responses(self.data, 200, 200):
+ self.keep_client.head(self.locator)
+ self.keep_client.head(self.locator)
+ # Don't cache HEAD requests so that they're not confused with GET reqs
+ self.assertEqual(2, get_mock.call_count)
+
+ @mock.patch('arvados.KeepClient.KeepService.get')
+ def test_head_and_then_get_return_different_responses(self, get_mock):
+ head_resp = None
+ get_resp = None
+ get_mock.side_effect = ['first response', 'second response']
+ with tutil.mock_keep_responses(self.data, 200, 200):
+ head_resp = self.keep_client.head(self.locator)
+ get_resp = self.keep_client.get(self.locator)
+ self.assertEqual('first response', head_resp)
+ # First reponse was not cached because it was from a HEAD request.
+ self.assertNotEqual(head_resp, get_resp)
+
+
@tutil.skip_sleep
class KeepXRequestIdTestCase(unittest.TestCase, tutil.ApiClientMock):
def setUp(self):
loc = kc.put(self.DATA, copies=1, num_retries=0)
self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
with self.assertTakesGreater(self.TIMEOUT_TIME):
- with self.assertRaises(arvados.errors.KeepReadError) as e:
+ with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepWriteError):
self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
self.server.setdelays(response=self.TIMEOUT_TIME)
with self.assertTakesGreater(self.TIMEOUT_TIME):
- with self.assertRaises(arvados.errors.KeepReadError) as e:
+ with self.assertRaises(arvados.errors.KeepReadError):
kc.get(loc, num_retries=0)
with self.assertTakesGreater(self.TIMEOUT_TIME):
with self.assertRaises(arvados.errors.KeepWriteError):
kc.put(self.DATA, copies=1, num_retries=0)
with self.assertTakesGreater(self.TIMEOUT_TIME):
- with self.assertRaises(arvados.errors.KeepReadError) as e:
- kc.head(loc, num_retries=0)
+ kc.head(loc, num_retries=0)
def test_low_bandwidth_with_server_mid_delay_failure(self):
kc = self.keepClient()