Do not continue interrupted uploads from cached state.
""")
+_group = run_opts.add_mutually_exclusive_group()
+_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
+ help="""
+Save upload state in a cache file for resuming (default).
+""")
+_group.add_argument('--no-cache', action='store_false', dest='use_cache',
+ help="""
+Do not save upload state in a cache file for resuming.
+""")
+
arg_parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.',
parents=[upload_opts, run_opts, arv_cmd.retry_opt])
and os.isatty(sys.stderr.fileno())):
args.progress = True
+ # Turn off --resume (default) if --no-cache is used.
+ if not args.use_cache:
+ args.resume = False
+
if args.paths == ['-']:
if args.update_collection:
arg_parser.error("""
--update-collection cannot be used when reading from stdin.
""")
args.resume = False
+ args.use_cache = False
if not args.filename:
args.filename = 'stdin'
'files' : {} # Previous run file list: {path : {size, mtime}}
}
- def __init__(self, paths, resume=True, reporter=None, bytes_expected=None,
- name=None, owner_uuid=None, ensure_unique_name=False,
- num_retries=None, replication_desired=None,
+ def __init__(self, paths, resume=True, use_cache=True, reporter=None,
+ bytes_expected=None, name=None, owner_uuid=None,
+ ensure_unique_name=False, num_retries=None, replication_desired=None,
filename=None, update_time=1.0, update_collection=None):
self.paths = paths
self.resume = resume
+ self.use_cache = use_cache
self.update = False
self.reporter = reporter
self.bytes_expected = bytes_expected
self._update_task_time = update_time # How many seconds wait between update runs
self.logger = logging.getLogger('arvados.arv_put')
+ if not self.use_cache and self.resume:
+ raise ArgumentError('resume cannot be True when use_cache is False')
+
# Load cached data if any and if needed
self._setup_state(update_collection)
if path == '-':
self._write_stdin(self.filename or 'stdin')
elif os.path.isdir(path):
- if path == '.' or path == './' or os.path.dirname(path) == '':
- dirname = ''
- else:
- dirname = os.path.dirname(path) + '/'
+ # Use absolute paths on cache index so CWD doesn't interfere
+ # with the caching logic.
+ prefixdir = path = os.path.abspath(path)
+ if prefixdir != '/':
+ prefixdir += '/'
for root, dirs, files in os.walk(path):
# Make os.walk()'s dir traversing order deterministic
dirs.sort()
files.sort()
for f in files:
self._write_file(os.path.join(root, f),
- os.path.join(root[len(dirname):], f))
+ os.path.join(root[len(prefixdir):], f))
else:
- self._write_file(path, self.filename or os.path.basename(path))
+ self._write_file(os.path.abspath(path),
+ self.filename or os.path.basename(path))
finally:
# Stop the thread before doing anything else
self._stop_checkpointer.set()
self._checkpointer.join()
- # Commit all & one last _update()
- self.manifest_text()
+ # Commit all pending blocks & one last _update()
+ self._local_collection.manifest_text()
+ self._update(final=True)
+ if self.use_cache:
+ self._cache_file.close()
if save_collection:
self.save_collection()
- self._update()
- self._cache_file.close()
# Correct the final written bytes count
self.bytes_written -= self.bytes_skipped
while not self._stop_checkpointer.wait(self._update_task_time):
self._update()
- def _update(self):
+ def _update(self, final=False):
"""
Update cached manifest text and report progress.
"""
with self._collection_lock:
self.bytes_written = self._collection_size(self._local_collection)
- # Update cache, if resume enabled
- with self._state_lock:
- # Get the manifest text without comitting pending blocks
- self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
- self._save_state()
+ if self.use_cache:
+ # Update cache
+ with self._state_lock:
+ if final:
+ self._state['manifest'] = self._local_collection.manifest_text()
+ else:
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._local_collection._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+ self._save_state()
# Call the reporter, if any
self.report_progress()
output.close(flush=False)
def _write(self, source_fd, output):
- first_read = True
while True:
data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
- # Allow an empty file to be written
- if not data and not first_read:
+ if not data:
break
- if first_read:
- first_read = False
output.write(data)
def _my_collection(self):
# Collection locator provided, but unknown format
raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
- # Set up cache file name from input paths.
- md5 = hashlib.md5()
- md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
- realpaths = sorted(os.path.realpath(path) for path in self.paths)
- md5.update('\0'.join(realpaths))
- if self.filename:
- md5.update(self.filename)
- cache_filename = md5.hexdigest()
- self._cache_file = open(os.path.join(
- arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
- cache_filename), 'a+')
- self._cache_filename = self._cache_file.name
- self._lock_file(self._cache_file)
- self._cache_file.seek(0)
+ if self.use_cache:
+ # Set up cache file name from input paths.
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update('\0'.join(realpaths))
+ if self.filename:
+ md5.update(self.filename)
+ cache_filename = md5.hexdigest()
+ self._cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
+ self._cache_file.seek(0)
+
with self._state_lock:
- try:
- self._state = json.load(self._cache_file)
- if not set(['manifest', 'files']).issubset(set(self._state.keys())):
- # Cache at least partially incomplete, set up new cache
+ if self.use_cache:
+ try:
+ self._state = json.load(self._cache_file)
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ except ValueError:
+ # Cache file empty, set up new cache
self._state = copy.deepcopy(self.EMPTY_STATE)
- except ValueError:
- # Cache file empty, set up new cache
+ else:
+ # No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
-
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
# Load how many bytes were uploaded on previous run
"""
try:
with self._state_lock:
- state = self._state
+ state = copy.deepcopy(self._state)
new_cache_fd, new_cache_name = tempfile.mkstemp(
dir=os.path.dirname(self._cache_filename))
self._lock_file(new_cache_fd)
try:
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
+ use_cache = args.use_cache,
filename = args.filename,
reporter = reporter,
bytes_expected = bytes_expected,
except ResumeCacheConflict:
print >>stderr, "\n".join([
"arv-put: Another process is already uploading this data.",
- " Use --no-resume if this is really what you want."])
+ " Use --no-cache if this is really what you want."])
sys.exit(1)
except CollectionUpdateError as error:
print >>stderr, "\n".join([