self._file_paths = [] # Files to be updated in remote collection
self._stop_checkpointer = threading.Event()
self._checkpointer = threading.Thread(target=self._update_task)
+ self._checkpointer.daemon = True
self._update_task_time = update_time # How many seconds wait between update runs
self._files_to_upload = FileUploadList(dry_run=dry_run)
self.logger = logger
"""
Start supporting thread & file uploading
"""
- self._checkpointer.daemon = True
- self._checkpointer.start()
+ if not self.dry_run:
+ self._checkpointer.start()
try:
for path in self.paths:
# Test for stdin first, in case some file named '-' exist
# there aren't any file to upload.
if self.dry_run:
raise ArvPutUploadNotPending()
+ # Remove local_collection's files that don't exist locally anymore, so the
+ # bytes_written count is correct.
+ for f in self.collection_file_paths(self._local_collection,
+ path_prefix=""):
+ if f != 'stdin' and f != self.filename and not f in self._file_paths:
+ self._local_collection.remove(f)
# Update bytes_written from current local collection and
# report initial progress.
self._update()
# Actual file upload
self._upload_files()
finally:
- # Stop the thread before doing anything else
- self._stop_checkpointer.set()
- self._checkpointer.join()
- # Commit all pending blocks & one last _update()
- self._local_collection.manifest_text()
- self._update(final=True)
+ if not self.dry_run:
+ # Stop the thread before doing anything else
+ self._stop_checkpointer.set()
+ self._checkpointer.join()
+ # Commit all pending blocks & one last _update()
+ self._local_collection.manifest_text()
+ self._update(final=True)
+ if save_collection:
+ self.save_collection()
if self.use_cache:
self._cache_file.close()
- if save_collection:
- self.save_collection()
def save_collection(self):
if self.update:
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+ def collection_file_paths(self, col, path_prefix='.'):
+ """Return a list of file paths by recursively go through the entire collection `col`"""
+ file_paths = []
+ for name, item in col.items():
+ if isinstance(item, arvados.arvfile.ArvadosFile):
+ file_paths.append(os.path.join(path_prefix, name))
+ elif isinstance(item, arvados.collection.Subcollection):
+ new_prefix = os.path.join(path_prefix, name)
+ file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
+ return file_paths
+
def _lock_file(self, fileobj):
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
if args.stream or args.raw:
logger.error("Cannot use --name with --stream or --raw")
sys.exit(1)
+ elif args.update_collection:
+ logger.error("Cannot use --name with --update-collection")
+ sys.exit(1)
collection_name = args.name
else:
collection_name = "Saved at {} by {}@{}".format(
owner_uuid = project_uuid,
ensure_unique_name = True,
update_collection = args.update_collection,
- logger=logger)
+ logger=logger,
+ dry_run=args.dry_run)
except ResumeCacheConflict:
logger.error("\n".join([
"arv-put: Another process is already uploading this data.",
orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
for sigcode in CAUGHT_SIGNALS}
- if not args.update_collection and args.resume and writer.bytes_written > 0:
+ if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
logger.warning("\n".join([
"arv-put: Resuming previous upload from last checkpoint.",
" Use the --no-resume option to start over."]))
- writer.report_progress()
+ if not args.dry_run:
+ writer.report_progress()
output = None
try:
writer.start(save_collection=not(args.stream or args.raw))
logger.error("\n".join([
"arv-put: %s" % str(error)]))
sys.exit(1)
+ except ArvPutUploadIsPending:
+ # Dry run check successful, return proper exit code.
+ sys.exit(2)
+ except ArvPutUploadNotPending:
+ # No files pending for upload
+ sys.exit(0)
if args.progress: # Print newline to split stderr from stdout for humans.
- logger.error("\n")
+ logger.info("\n")
if args.stream:
if args.normalize: