from apiclient import errors as apiclient_errors
from arvados._version import __version__
+from arvados.util import keep_locator_pattern
import arvados.commands._util as arv_cmd
pass
+class ResumeCacheInvalidError(Exception):
+ pass
+
class ArvPutArgumentConflict(Exception):
pass
new_cache = os.fdopen(new_cache_fd, 'r+')
json.dump(data, new_cache)
os.rename(new_cache_name, self.filename)
- except (IOError, OSError, ResumeCacheConflict) as error:
+ except (IOError, OSError, ResumeCacheConflict):
try:
os.unlink(new_cache_name)
except NameError: # mkstemp failed.
def _build_upload_list(self):
"""
- Scan the requested paths to count file sizes, excluding files & dirs if requested
- and building the upload file list.
+ Scan the requested paths to count file sizes, excluding requested files
+ and dirs and building the upload file list.
"""
# If there aren't special files to be read, reset total bytes count to zero
# to start counting.
def _my_collection(self):
return self._remote_collection if self.update else self._local_collection
+ def _get_cache_filepath(self):
+ # Set up cache file name from input paths.
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update(b'\0'.join([p.encode() for p in realpaths]))
+ if self.filename:
+ md5.update(self.filename.encode())
+ cache_filename = md5.hexdigest()
+ cache_filepath = os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename)
+ return cache_filepath
+
def _setup_state(self, update_collection):
"""
Create a new cache file or load a previously existing one.
raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
if self.use_cache:
- # Set up cache file name from input paths.
- md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost').encode())
- realpaths = sorted(os.path.realpath(path) for path in self.paths)
- md5.update(b'\0'.join([p.encode() for p in realpaths]))
- if self.filename:
- md5.update(self.filename.encode())
- cache_filename = md5.hexdigest()
- cache_filepath = os.path.join(
- arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
- cache_filename)
+ cache_filepath = self._get_cache_filepath()
if self.resume and os.path.exists(cache_filepath):
self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
self.logger.info("No cache usage requested for this run.")
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
+ if not self._cached_manifest_valid():
+ raise ResumeCacheInvalidError()
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(
self._state['manifest'],
put_threads=self.put_threads,
api_client=self._api_client)
+ def _cached_manifest_valid(self):
+ """
+ Validate the oldest non-expired block signature to check if cached manifest
+ is usable: checking if the cached manifest was not created with a different
+ arvados account.
+ """
+ if self._state.get('manifest', None) is None:
+ # No cached manifest yet, all good.
+ return True
+ now = datetime.datetime.utcnow()
+ oldest_exp = None
+ oldest_loc = None
+ block_found = False
+ for m in keep_locator_pattern.finditer(self._state['manifest']):
+ loc = m.group(0)
+ try:
+ exp = datetime.datetime.utcfromtimestamp(int(loc.split('@')[1], 16))
+ except IndexError:
+ # Locator without signature
+ continue
+ block_found = True
+ if exp > now and (oldest_exp is None or exp < oldest_exp):
+ oldest_exp = exp
+ oldest_loc = loc
+ if not block_found:
+ # No block signatures found => no invalid block signatures.
+ return True
+ if oldest_loc is None:
+ # Locator signatures found, but all have expired.
+ # Reset the cache and move on.
+ self.logger.info('Cache expired, starting from scratch.')
+ self._state['manifest'] = ''
+ return True
+ kc = arvados.KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ try:
+ kc.head(oldest_loc)
+ except arvados.errors.KeepRequestError:
+ # Something is wrong, cached manifest is not valid.
+ return False
+ return True
+
def collection_file_paths(self, col, path_prefix='.'):
"""Return a list of file paths by recursively go through the entire collection `col`"""
file_paths = []
"arv-put: Another process is already uploading this data.",
" Use --no-cache if this is really what you want."]))
sys.exit(1)
+ except ResumeCacheInvalidError:
+ logger.error("\n".join([
+ "arv-put: Resume cache contains invalid signature: it may have expired",
+ " or been created with another Arvados user's credentials.",
+ " Switch user or use one of the following options to restart upload:",
+ " --no-resume to start a new resume cache.",
+ " --no-cache to disable resume cache."]))
+ sys.exit(1)
except CollectionUpdateError as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
standard_library.install_aliases()
from builtins import str
from builtins import range
+from functools import partial
import apiclient
import datetime
import hashlib
resume=False)
del(self.writer)
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+ class MockedPut(arv_put.ArvPutUploadJob):
+ def __init__(self, cached_manifest=None):
+ self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+ self._state['manifest'] = cached_manifest
+ self._api_client = mock.MagicMock()
+ self.logger = mock.MagicMock()
+ self.num_retries = 1
+
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
+ def setUp(self):
+ super(CachedManifestValidationTest, self).setUp()
+ self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+ self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+ self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+
+ def test_empty_cached_manifest_is_valid(self):
+ put_mock = self.MockedPut()
+ self.assertEqual(None, put_mock._state.get('manifest'))
+ self.assertTrue(put_mock._cached_manifest_valid())
+ put_mock._state['manifest'] = ''
+ self.assertTrue(put_mock._cached_manifest_valid())
+
+ def test_signature_cases(self):
+ now = datetime.datetime.utcnow()
+ yesterday = now - datetime.timedelta(days=1)
+ lastweek = now - datetime.timedelta(days=7)
+ tomorrow = now + datetime.timedelta(days=1)
+ nextweek = now + datetime.timedelta(days=7)
+
+ def mocked_head(blocks={}, loc=None):
+ blk = loc.split('+', 1)[0]
+ if blocks.get(blk):
+ return True
+ raise arvados.errors.KeepRequestError("mocked error - block invalid")
+
+ # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+ cases = [
+ # All expired, reset cache - OK
+ (yesterday, lastweek, False, False, True),
+ (lastweek, yesterday, False, False, True),
+ # All non-expired valid blocks - OK
+ (tomorrow, nextweek, True, True, True),
+ (nextweek, tomorrow, True, True, True),
+ # All non-expired invalid blocks - Not OK
+ (tomorrow, nextweek, False, False, False),
+ (nextweek, tomorrow, False, False, False),
+ # One non-expired valid block - OK
+ (tomorrow, yesterday, True, False, True),
+ (yesterday, tomorrow, False, True, True),
+ # One non-expired invalid block - Not OK
+ (tomorrow, yesterday, False, False, False),
+ (yesterday, tomorrow, False, False, False),
+ ]
+ for case in cases:
+ b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+ head_responses = {
+ self.block1: b1_valid,
+ self.block2: b2_valid,
+ }
+ cached_manifest = self.template % (
+ self.datetime_to_hex(b1_expiration),
+ self.datetime_to_hex(b2_expiration),
+ )
+ arvput = self.MockedPut(cached_manifest)
+ with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+ head_mock.side_effect = partial(mocked_head, head_responses)
+ self.assertEqual(outcome, arvput._cached_manifest_valid(),
+ "Case '%s' should have produced outcome '%s'" % (case, outcome)
+ )
+ if b1_expiration > now or b2_expiration > now:
+ # A HEAD request should have been done
+ head_mock.assert_called_once()
+ else:
+ head_mock.assert_not_called()
+
+
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
TEST_SIZE = os.path.getsize(__file__)
writer.bytes_expected)
def test_expected_bytes_for_device(self):
- writer = arv_put.ArvPutUploadJob(['/dev/null'])
+ writer = arv_put.ArvPutUploadJob(['/dev/null'], use_cache=False, resume=False)
self.assertIsNone(writer.bytes_expected)
writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
self.assertIsNone(writer.bytes_expected)
self.assertEqual(1, len(collection_list))
return collection_list[0]
- def test_expired_token_invalidates_cache(self):
+ def test_all_expired_signatures_invalidates_cache(self):
self.authorize_with('active')
tmpdir = self.make_tmpdir()
with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
(out, err) = p.communicate()
self.assertRegex(
err.decode(),
- r'WARNING: Uploaded file .* access token expired, will re-upload it from scratch')
+ r'INFO: Cache expired, starting from scratch.*')
+ self.assertEqual(p.returncode, 0)
+
+ def test_invalid_signature_invalidates_cache(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an invalid access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ cache['manifest'] = re.sub(
+ r'\+A.*\@',
+ "+Aabcdef0123456789abcdef0123456789abcdef01@",
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'ERROR: arv-put: Resume cache contains invalid signature.*')
+ self.assertEqual(p.returncode, 1)
+
+ def test_single_expired_signature_reuploads_file(self):
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'foofile.txt'), 'w') as f:
+ f.write('foo')
+ # Write a second file on its own subdir to force a new stream
+ os.mkdir(os.path.join(tmpdir, 'bar'))
+ with open(os.path.join(tmpdir, 'bar', 'barfile.txt'), 'w') as f:
+ f.write('bar')
+ # Upload a directory and get the cache file name
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an expired access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ a_month_ago = datetime.datetime.now() - datetime.timedelta(days=30)
+ # Make one of the signatures appear to have expired
+ cache['manifest'] = re.sub(
+ r'\@.*? 3:3:barfile.txt',
+ "@{} 3:3:barfile.txt".format(self.datetime_to_hex(a_month_ago)),
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (out, err) = p.communicate()
+ self.assertRegex(
+ err.decode(),
+ r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
self.assertEqual(p.returncode, 0)
# Confirm that the resulting cache is different from the last run.
with open(cache_filepath, 'r') as c2: