From: Lucas Di Pentima Date: Thu, 13 Dec 2018 19:34:21 +0000 (-0300) Subject: Merge branch '14012-arvput-check-cache' X-Git-Tag: 1.4.0~195 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/8ff3fd06e165a275f53884d1d20287b68c1b32bd?hp=d1571f495b0e0e05c833d4666924bcb6a288b33d Merge branch '14012-arvput-check-cache' Closes #14012 Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima --- diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index cba00c3c8c..61258632bd 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -31,6 +31,7 @@ import traceback from apiclient import errors as apiclient_errors from arvados._version import __version__ +from arvados.util import keep_locator_pattern import arvados.commands._util as arv_cmd @@ -289,6 +290,9 @@ class ResumeCacheConflict(Exception): pass +class ResumeCacheInvalidError(Exception): + pass + class ArvPutArgumentConflict(Exception): pass @@ -387,7 +391,7 @@ class ResumeCache(object): new_cache = os.fdopen(new_cache_fd, 'r+') json.dump(data, new_cache) os.rename(new_cache_name, self.filename) - except (IOError, OSError, ResumeCacheConflict) as error: + except (IOError, OSError, ResumeCacheConflict): try: os.unlink(new_cache_name) except NameError: # mkstemp failed. @@ -482,8 +486,8 @@ class ArvPutUploadJob(object): def _build_upload_list(self): """ - Scan the requested paths to count file sizes, excluding files & dirs if requested - and building the upload file list. + Scan the requested paths to count file sizes, excluding requested files + and dirs and building the upload file list. """ # If there aren't special files to be read, reset total bytes count to zero # to start counting. @@ -795,6 +799,20 @@ class ArvPutUploadJob(object): def _my_collection(self): return self._remote_collection if self.update else self._local_collection + def _get_cache_filepath(self): + # Set up cache file name from input paths. + md5 = hashlib.md5() + md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) + realpaths = sorted(os.path.realpath(path) for path in self.paths) + md5.update(b'\0'.join([p.encode() for p in realpaths])) + if self.filename: + md5.update(self.filename.encode()) + cache_filename = md5.hexdigest() + cache_filepath = os.path.join( + arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), + cache_filename) + return cache_filepath + def _setup_state(self, update_collection): """ Create a new cache file or load a previously existing one. @@ -814,17 +832,7 @@ class ArvPutUploadJob(object): raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection)) if self.use_cache: - # Set up cache file name from input paths. - md5 = hashlib.md5() - md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode()) - realpaths = sorted(os.path.realpath(path) for path in self.paths) - md5.update(b'\0'.join([p.encode() for p in realpaths])) - if self.filename: - md5.update(self.filename.encode()) - cache_filename = md5.hexdigest() - cache_filepath = os.path.join( - arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'), - cache_filename) + cache_filepath = self._get_cache_filepath() if self.resume and os.path.exists(cache_filepath): self.logger.info("Resuming upload from cache file {}".format(cache_filepath)) self._cache_file = open(cache_filepath, 'a+') @@ -850,6 +858,8 @@ class ArvPutUploadJob(object): self.logger.info("No cache usage requested for this run.") # No cache file, set empty state self._state = copy.deepcopy(self.EMPTY_STATE) + if not self._cached_manifest_valid(): + raise ResumeCacheInvalidError() # Load the previous manifest so we can check if files were modified remotely. self._local_collection = arvados.collection.Collection( self._state['manifest'], @@ -857,6 +867,48 @@ class ArvPutUploadJob(object): put_threads=self.put_threads, api_client=self._api_client) + def _cached_manifest_valid(self): + """ + Validate the oldest non-expired block signature to check if cached manifest + is usable: checking if the cached manifest was not created with a different + arvados account. + """ + if self._state.get('manifest', None) is None: + # No cached manifest yet, all good. + return True + now = datetime.datetime.utcnow() + oldest_exp = None + oldest_loc = None + block_found = False + for m in keep_locator_pattern.finditer(self._state['manifest']): + loc = m.group(0) + try: + exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16)) + except IndexError: + # Locator without signature + continue + block_found = True + if exp > now and (oldest_exp is None or exp < oldest_exp): + oldest_exp = exp + oldest_loc = loc + if not block_found: + # No block signatures found => no invalid block signatures. + return True + if oldest_loc is None: + # Locator signatures found, but all have expired. + # Reset the cache and move on. + self.logger.info('Cache expired, starting from scratch.') + self._state['manifest'] = '' + return True + kc = arvados.KeepClient(api_client=self._api_client, + num_retries=self.num_retries) + try: + kc.head(oldest_loc) + except arvados.errors.KeepRequestError: + # Something is wrong, cached manifest is not valid. + return False + return True + def collection_file_paths(self, col, path_prefix='.'): """Return a list of file paths by recursively go through the entire collection `col`""" file_paths = [] @@ -1131,6 +1183,14 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr, "arv-put: Another process is already uploading this data.", " Use --no-cache if this is really what you want."])) sys.exit(1) + except ResumeCacheInvalidError: + logger.error("\n".join([ + "arv-put: Resume cache contains invalid signature: it may have expired", + " or been created with another Arvados user's credentials.", + " Switch user or use one of the following options to restart upload:", + " --no-resume to start a new resume cache.", + " --no-cache to disable resume cache."])) + sys.exit(1) except CollectionUpdateError as error: logger.error("\n".join([ "arv-put: %s" % str(error)])) diff --git a/sdk/python/arvados/keep.py b/sdk/python/arvados/keep.py index 1b6376e9be..4354ced67d 100644 --- a/sdk/python/arvados/keep.py +++ b/sdk/python/arvados/keep.py @@ -791,6 +791,7 @@ class KeepClient(object): if local_store: self.local_store = local_store + self.head = self.local_store_head self.get = self.local_store_get self.put = self.local_store_put else: @@ -1230,5 +1231,17 @@ class KeepClient(object): with open(os.path.join(self.local_store, locator.md5sum), 'rb') as f: return f.read() + def local_store_head(self, loc_s, num_retries=None): + """Companion to local_store_put().""" + try: + locator = KeepLocator(loc_s) + except ValueError: + raise arvados.errors.NotFoundError( + "Invalid data locator: '%s'" % loc_s) + if locator.md5sum == config.EMPTY_BLOCK_LOCATOR.split('+')[0]: + return True + if os.path.exists(os.path.join(self.local_store, locator.md5sum)): + return True + def is_cached(self, locator): return self.block_cache.reserve_cache(expect_hash) diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index 93cfdc2a36..a41184d10f 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -8,6 +8,7 @@ from future import standard_library standard_library.install_aliases() from builtins import str from builtins import range +from functools import partial import apiclient import datetime import hashlib @@ -528,6 +529,85 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, resume=False) del(self.writer) +class CachedManifestValidationTest(ArvadosBaseTestCase): + class MockedPut(arv_put.ArvPutUploadJob): + def __init__(self, cached_manifest=None): + self._state = arv_put.ArvPutUploadJob.EMPTY_STATE + self._state['manifest'] = cached_manifest + self._api_client = mock.MagicMock() + self.logger = mock.MagicMock() + self.num_retries = 1 + + def datetime_to_hex(self, dt): + return hex(int(time.mktime(dt.timetuple())))[2:] + + def setUp(self): + super(CachedManifestValidationTest, self).setUp() + self.block1 = "fdba98970961edb29f88241b9d99d890" # foo + self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar + self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n" + + def test_empty_cached_manifest_is_valid(self): + put_mock = self.MockedPut() + self.assertEqual(None, put_mock._state.get('manifest')) + self.assertTrue(put_mock._cached_manifest_valid()) + put_mock._state['manifest'] = '' + self.assertTrue(put_mock._cached_manifest_valid()) + + def test_signature_cases(self): + now = datetime.datetime.utcnow() + yesterday = now - datetime.timedelta(days=1) + lastweek = now - datetime.timedelta(days=7) + tomorrow = now + datetime.timedelta(days=1) + nextweek = now + datetime.timedelta(days=7) + + def mocked_head(blocks={}, loc=None): + blk = loc.split('+', 1)[0] + if blocks.get(blk): + return True + raise arvados.errors.KeepRequestError("mocked error - block invalid") + + # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation + cases = [ + # All expired, reset cache - OK + (yesterday, lastweek, False, False, True), + (lastweek, yesterday, False, False, True), + # All non-expired valid blocks - OK + (tomorrow, nextweek, True, True, True), + (nextweek, tomorrow, True, True, True), + # All non-expired invalid blocks - Not OK + (tomorrow, nextweek, False, False, False), + (nextweek, tomorrow, False, False, False), + # One non-expired valid block - OK + (tomorrow, yesterday, True, False, True), + (yesterday, tomorrow, False, True, True), + # One non-expired invalid block - Not OK + (tomorrow, yesterday, False, False, False), + (yesterday, tomorrow, False, False, False), + ] + for case in cases: + b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case + head_responses = { + self.block1: b1_valid, + self.block2: b2_valid, + } + cached_manifest = self.template % ( + self.datetime_to_hex(b1_expiration), + self.datetime_to_hex(b2_expiration), + ) + arvput = self.MockedPut(cached_manifest) + with mock.patch('arvados.collection.KeepClient.head') as head_mock: + head_mock.side_effect = partial(mocked_head, head_responses) + self.assertEqual(outcome, arvput._cached_manifest_valid(), + "Case '%s' should have produced outcome '%s'" % (case, outcome) + ) + if b1_expiration > now or b2_expiration > now: + # A HEAD request should have been done + head_mock.assert_called_once() + else: + head_mock.assert_not_called() + + class ArvadosExpectedBytesTest(ArvadosBaseTestCase): TEST_SIZE = os.path.getsize(__file__) @@ -549,7 +629,7 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase): writer.bytes_expected) def test_expected_bytes_for_device(self): - writer = arv_put.ArvPutUploadJob(['/dev/null']) + writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False) self.assertIsNone(writer.bytes_expected) writer = arv_put.ArvPutUploadJob([__file__, '/dev/null']) self.assertIsNone(writer.bytes_expected) @@ -938,7 +1018,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, self.assertEqual(1, len(collection_list)) return collection_list[0] - def test_expired_token_invalidates_cache(self): + def test_all_expired_signatures_invalidates_cache(self): self.authorize_with('active') tmpdir = self.make_tmpdir() with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f: @@ -974,7 +1054,89 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers, (out, err) = p.communicate() self.assertRegex( err.decode(), - r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch') + r'INFO: Cache expired, starting from scratch.*') + self.assertEqual(p.returncode, 0) + + def test_invalid_signature_invalidates_cache(self): + self.authorize_with('active') + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f: + f.write('foo') + # Upload a directory and get the cache file name + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex(err.decode(), r'INFO: Creating new cache file at ') + self.assertEqual(p.returncode, 0) + cache_filepath = re.search(r'INFO: Creating new cache file at (.*)', + err.decode()).groups()[0] + self.assertTrue(os.path.isfile(cache_filepath)) + # Load the cache file contents and modify the manifest to simulate + # an invalid access token + with open(cache_filepath, 'r') as c: + cache = json.load(c) + self.assertRegex(cache['manifest'], r'\+A\S+\@') + cache['manifest'] = re.sub( + r'\+A.*\@', + "+Aabcdef0123456789abcdef0123456789abcdef01@", + cache['manifest']) + with open(cache_filepath, 'w') as c: + c.write(json.dumps(cache)) + # Re-run the upload and expect to get an invalid cache message + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex( + err.decode(), + r'ERROR: arv-put: Resume cache contains invalid signature.*') + self.assertEqual(p.returncode, 1) + + def test_single_expired_signature_reuploads_file(self): + self.authorize_with('active') + tmpdir = self.make_tmpdir() + with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f: + f.write('foo') + # Write a second file on its own subdir to force a new stream + os.mkdir(os.path.join(tmpdir, 'bar')) + with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f: + f.write('bar') + # Upload a directory and get the cache file name + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex(err.decode(), r'INFO: Creating new cache file at ') + self.assertEqual(p.returncode, 0) + cache_filepath = re.search(r'INFO: Creating new cache file at (.*)', + err.decode()).groups()[0] + self.assertTrue(os.path.isfile(cache_filepath)) + # Load the cache file contents and modify the manifest to simulate + # an expired access token + with open(cache_filepath, 'r') as c: + cache = json.load(c) + self.assertRegex(cache['manifest'], r'\+A\S+\@') + a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30) + # Make one of the signatures appear to have expired + cache['manifest'] = re.sub( + r'\@.*? 3:3:barfile.txt', + "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)), + cache['manifest']) + with open(cache_filepath, 'w') as c: + c.write(json.dumps(cache)) + # Re-run the upload and expect to get an invalid cache message + p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=self.ENVIRON) + (out, err) = p.communicate() + self.assertRegex( + err.decode(), + r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch') self.assertEqual(p.returncode, 0) # Confirm that the resulting cache is different from the last run. with open(cache_filepath, 'r') as c2: