still be displayed.)
""")
+run_opts.add_argument('--batch', action='store_true', default=False,
+ help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--resume', action='store_true', default=True,
help="""
}
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
- name=None, owner_uuid=None, api_client=None,
+ name=None, owner_uuid=None, api_client=None, batch_mode=False,
ensure_unique_name=False, num_retries=None,
put_threads=None, replication_desired=None, filename=None,
update_time=60.0, update_collection=None, storage_classes=None,
self.paths = paths
self.resume = resume
self.use_cache = use_cache
+ self.batch_mode = batch_mode
self.update = False
self.reporter = reporter
# This will set to 0 before start counting, if no special files are going
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
if not self._cached_manifest_valid():
- raise ResumeCacheInvalidError()
+ if not self.batch_mode:
+ raise ResumeCacheInvalidError()
+ else:
+ self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+ self.use_cache = False # Don't overwrite preexisting cache file.
+ self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(
self._state['manifest'],
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
use_cache = args.use_cache,
+ batch_mode= args.batch,
filename = args.filename,
reporter = reporter,
api_client = api_client,
" or been created with another Arvados user's credentials.",
" Switch user or use one of the following options to restart upload:",
" --no-resume to start a new resume cache.",
- " --no-cache to disable resume cache."]))
+ " --no-cache to disable resume cache.",
+ " --batch to ignore the resume cache if invalid."]))
sys.exit(1)
except (CollectionUpdateError, PathDoesNotExistError) as error:
logger.error("\n".join([
r'INFO: Cache expired, starting from scratch.*')
self.assertEqual(p.returncode, 0)
- def test_invalid_signature_invalidates_cache(self):
- self.authorize_with('active')
- tmpdir = self.make_tmpdir()
- with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
- f.write('foo')
- # Upload a directory and get the cache file name
- p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=self.ENVIRON)
- (_, err) = p.communicate()
- self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
- self.assertEqual(p.returncode, 0)
- cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
- err.decode()).groups()[0]
- self.assertTrue(os.path.isfile(cache_filepath))
- # Load the cache file contents and modify the manifest to simulate
- # an invalid access token
- with open(cache_filepath, 'r') as c:
- cache = json.load(c)
- self.assertRegex(cache['manifest'], r'\+A\S+\@')
- cache['manifest'] = re.sub(
- r'\+A.*\@',
- "+Aabcdef0123456789abcdef0123456789abcdef01@",
- cache['manifest'])
- with open(cache_filepath, 'w') as c:
- c.write(json.dumps(cache))
- # Re-run the upload and expect to get an invalid cache message
- p = subprocess.Popen([sys.executable, arv_put.__file__, tmpdir],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=self.ENVIRON)
- (_, err) = p.communicate()
- self.assertRegex(
- err.decode(),
- r'ERROR: arv-put: Resume cache contains invalid signature.*')
- self.assertEqual(p.returncode, 1)
+ def test_invalid_signature_in_cache(self):
+ for batch_mode in [False, True]:
+ self.authorize_with('active')
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'somefile.txt'), 'w') as f:
+ f.write('foo')
+ # Upload a directory and get the cache file name
+ arv_put_args = [tmpdir]
+ if batch_mode:
+ arv_put_args = ['--batch'] + arv_put_args
+ p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
+ self.assertEqual(p.returncode, 0)
+ cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
+ err.decode()).groups()[0]
+ self.assertTrue(os.path.isfile(cache_filepath))
+ # Load the cache file contents and modify the manifest to simulate
+ # an invalid access token
+ with open(cache_filepath, 'r') as c:
+ cache = json.load(c)
+ self.assertRegex(cache['manifest'], r'\+A\S+\@')
+ cache['manifest'] = re.sub(
+ r'\+A.*\@',
+ "+Aabcdef0123456789abcdef0123456789abcdef01@",
+ cache['manifest'])
+ with open(cache_filepath, 'w') as c:
+ c.write(json.dumps(cache))
+ # Re-run the upload and expect to get an invalid cache message
+ p = subprocess.Popen([sys.executable, arv_put.__file__] + arv_put_args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ env=self.ENVIRON)
+ (_, err) = p.communicate()
+ if not batch_mode:
+ self.assertRegex(
+ err.decode(),
+ r'ERROR: arv-put: Resume cache contains invalid signature.*')
+ self.assertEqual(p.returncode, 1)
+ else:
+ self.assertRegex(
+ err.decode(),
+ r'Invalid signatures on cache file \'.*\' while being run in \'batch mode\' -- continuing anyways.*')
+ self.assertEqual(p.returncode, 0)
def test_single_expired_signature_reuploads_file(self):
self.authorize_with('active')