14012: Do HEAD request on the oldest non-expired signature for cache validation
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 6 Dec 2018 17:57:48 +0000 (14:57 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 6 Dec 2018 17:57:48 +0000 (14:57 -0300)
* Re-added single file expiration check for re-uploads
* Added tests for all cases

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index a69dee735acf89cccb5ae76a83db32f9a3f97eff..556927b2bcfd194e73a71cbb3f729be109255fa9 100644 (file)
@@ -739,6 +739,12 @@ class ArvPutUploadJob(object):
             if not file_in_local_collection:
                 # File not uploaded yet, upload it completely
                 should_upload = True
+            elif file_in_local_collection.permission_expired():
+                # Permission token expired, re-upload file. This will change whenever
+                # we have a API for refreshing tokens.
+                self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
+                should_upload = True
+                self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
@@ -859,23 +865,41 @@ class ArvPutUploadJob(object):
 
     def _cached_manifest_valid(self):
         """
-        Validate first block's signature to check if cached manifest is usable.
-        Cached manifest could be invalid because:
-        * Block signature expiration
-        * Block signatures belonging to a different user
+        Validate the oldest non-expired block signature to check if cached manifest
+        is usable: checking if the cached manifest was not created with a different
+        arvados account.
         """
         if self._state.get('manifest', None) is None:
             # No cached manifest yet, all good.
             return True
-        m = keep_locator_pattern.search(self._state['manifest'])
-        if m is None:
-            # No blocks found, no invalid block signatures.
+        now = datetime.datetime.utcnow()
+        oldest_exp = None
+        oldest_loc = None
+        block_found = False
+        for m in keep_locator_pattern.finditer(self._state['manifest']):
+            loc = m.group(0)
+            try:
+                exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
+            except IndexError:
+                # Locator without signature
+                continue
+            block_found = True
+            if exp > now and (oldest_exp is None or exp < oldest_exp):
+                oldest_exp = exp
+                oldest_loc = loc
+        if not block_found:
+            # No block signatures found => no invalid block signatures.
+            return True
+        if oldest_loc is None:
+            # Locator signatures found, but all have expired.
+            # Reset the cache and move on.
+            self.logger.info('Cache expired, starting from scratch.')
+            self._state['manifest'] = ''
             return True
-        loc = self._state['manifest'][m.start():m.end()]
         kc = arvados.KeepClient(api_client=self._api_client,
                                 num_retries=self.num_retries)
         try:
-            kc.head(loc)
+            kc.head(oldest_loc)
         except arvados.errors.KeepRequestError:
             # Something is wrong, cached manifest is not valid.
             return False
index 76144e8e38c07ee19c786a62b78494ebef4bf2b9..642f64ec5a54fd23d77ea37eb8901d056509c7d1 100644 (file)
@@ -938,7 +938,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1, len(collection_list))
         return collection_list[0]
 
-    def test_expired_token_invalidates_cache(self):
+    def test_all_expired_signatures_invalidates_cache(self):
         self.authorize_with('active')
         tmpdir = self.make_tmpdir()
         with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
@@ -972,11 +972,97 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
         (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'INFO: Cache expired, starting from scratch.*')
+        self.assertEqual(p.returncode, 0)
+
+    def test_invalid_signature_invalidates_cache(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+            f.write('foo')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an invalid access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        cache['manifest'] = re.sub(
+            r'\+A.*\@',
+            "+Aabcdef0123456789abcdef0123456789abcdef01@",
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
         self.assertRegex(
             err.decode(),
             r'ERROR: arv-put: Cache seems to contain invalid data.*')
         self.assertEqual(p.returncode, 1)
 
+    def test_single_expired_signature_reuploads_file(self):
+        self.authorize_with('active')
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+            f.write('foo')
+        # Write a second file on its own subdir to force a new stream
+        os.mkdir(os.path.join(tmpdir, 'bar'))
+        with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+            f.write('bar')
+        # Upload a directory and get the cache file name
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+        self.assertEqual(p.returncode, 0)
+        cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+                                   err.decode()).groups()[0]
+        self.assertTrue(os.path.isfile(cache_filepath))
+        # Load the cache file contents and modify the manifest to simulate
+        # an expired access token
+        with open(cache_filepath, 'r') as c:
+            cache = json.load(c)
+        self.assertRegex(cache['manifest'], r'\+A\S+\@')
+        a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+        # Make one of the signatures appear to have expired
+        cache['manifest'] = re.sub(
+            r'\@.*? 3:3:barfile.txt',
+            "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+            cache['manifest'])
+        with open(cache_filepath, 'w') as c:
+            c.write(json.dumps(cache))
+        # Re-run the upload and expect to get an invalid cache message
+        p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE,
+                             env=self.ENVIRON)
+        (out, err) = p.communicate()
+        self.assertRegex(
+            err.decode(),
+            r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
+        self.assertEqual(p.returncode, 0)
+        # Confirm that the resulting cache is different from the last run.
+        with open(cache_filepath, 'r') as c2:
+            new_cache = json.load(c2)
+        self.assertNotEqual(cache['manifest'], new_cache['manifest'])
+
     def test_put_collection_with_later_update(self):
         tmpdir = self.make_tmpdir()
         with open(os.path.join(tmpdir, 'file1'), 'w') as f: