From 5e991889ca6c18cdd901b98b5083c4cee7d260c9 Mon Sep 17 00:00:00 2001 From: radhika Date: Mon, 25 Apr 2016 09:36:19 -0400 Subject: [PATCH] 8937: invalidate cache and create new one if there are errors on head request during ResumeCache. --- sdk/python/arvados/commands/put.py | 24 ++++++++++++++++++-- sdk/python/tests/test_arv_put.py | 36 ++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 2 deletions(-) diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 6fa26c672d..8fa1c8f66b 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -197,10 +197,25 @@ class ResumeCacheConflict(Exception): class ResumeCache(object): CACHE_DIR = '.cache/arvados/arv-put' - def __init__(self, file_spec): + def __init__(self, file_spec, api_client=None, num_retries=0): self.cache_file = open(file_spec, 'a+') self._lock_file(self.cache_file) self.filename = self.cache_file.name + try: + state = self.load() + locator = None + try: + if "_finished_streams" in state and len(state["_finished_streams"]) > 0: + locator = state["_finished_streams"][0][1][0] + elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0: + locator = state["_current_stream_locators"][0] + if locator is not None: + kc = arvados.keep.KeepClient(api_client=api_client) + kc.head(locator, num_retries=num_retries) + except Exception as e: + raise arvados.errors.KeepRequestError("Head request error for {}: {}".format(locator, e)) + except (ValueError): + pass @classmethod def make_path(cls, args): @@ -437,9 +452,14 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): resume_cache = None if args.resume: try: - resume_cache = ResumeCache(ResumeCache.make_path(args)) + cachepath = ResumeCache.make_path(args) + resume_cache = ResumeCache(cachepath, api_client=api_client, num_retries=args.retries) except (IOError, OSError, ValueError): pass # Couldn't open cache directory/file. Continue without it. + except arvados.errors.KeepRequestError: + # delete the cache and create a new one + shutil.rmtree(cachepath) + resume_cache = ResumeCache(cachepath) except ResumeCacheConflict: print >>stderr, "\n".join([ "arv-put: Another process is already uploading this data.", diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index 896b880778..f1ed35a94a 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -127,6 +127,42 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase): else: config['ARVADOS_API_HOST'] = orig_host + @mock.patch('arvados.keep.KeepClient.head') + def test_resume_cache_with_current_stream_locators(self, keep_client_head): + keep_client_head.side_effect = [True] + thing = {} + thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6'] + with tempfile.NamedTemporaryFile() as cachefile: + self.last_cache = arv_put.ResumeCache(cachefile.name) + self.last_cache.save(thing) + self.last_cache.close() + resume_cache = arv_put.ResumeCache(self.last_cache.filename) + self.assertNotEqual(None, resume_cache) + + @mock.patch('arvados.keep.KeepClient.head') + def test_resume_cache_with_finished_streams(self, keep_client_head): + keep_client_head.side_effect = [True] + thing = {} + thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]] + with tempfile.NamedTemporaryFile() as cachefile: + self.last_cache = arv_put.ResumeCache(cachefile.name) + self.last_cache.save(thing) + self.last_cache.close() + resume_cache = arv_put.ResumeCache(self.last_cache.filename) + self.assertNotEqual(None, resume_cache) + + @mock.patch('arvados.keep.KeepClient.head') + def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head): + keep_client_head.side_effect = Exception('Locator not found') + thing = {} + thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]] + with tempfile.NamedTemporaryFile() as cachefile: + self.last_cache = arv_put.ResumeCache(cachefile.name) + self.last_cache.save(thing) + self.last_cache.close() + with self.assertRaises(arvados.errors.KeepRequestError): + arv_put.ResumeCache(self.last_cache.filename) + def test_basic_cache_storage(self): thing = ['test', 'list'] with tempfile.NamedTemporaryFile() as cachefile: -- 2.30.2