+ self.bytes_written = 0
+ self.bytes_skipped = 0
+ self.name = name
+ self.owner_uuid = owner_uuid
+ self.ensure_unique_name = ensure_unique_name
+ self.num_retries = num_retries
+ self.replication_desired = replication_desired
+ self.put_threads = put_threads
+ self.filename = filename
+ self._state_lock = threading.Lock()
+ self._state = None # Previous run state (file list & manifest)
+ self._current_files = [] # Current run file list
+ self._cache_file = None
+ self._collection_lock = threading.Lock()
+ self._remote_collection = None # Collection being updated (if asked)
+ self._local_collection = None # Collection from previous run manifest
+ self._file_paths = [] # Files to be updated in remote collection
+ self._stop_checkpointer = threading.Event()
+ self._checkpointer = threading.Thread(target=self._update_task)
+ self._checkpointer.daemon = True
+ self._update_task_time = update_time # How many seconds wait between update runs
+ self._files_to_upload = FileUploadList(dry_run=dry_run)
+ self.logger = logger
+ self.dry_run = dry_run
+
+ if not self.use_cache and self.resume:
+ raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
+
+ # Check for obvious dry-run responses
+ if self.dry_run and (not self.use_cache or not self.resume):
+ raise ArvPutUploadIsPending()
+
+ # Load cached data if any and if needed
+ self._setup_state(update_collection)
+
+ def start(self, save_collection):
+ """
+ Start supporting thread & file uploading
+ """
+ if not self.dry_run:
+ self._checkpointer.start()
+ try:
+ for path in self.paths:
+ # Test for stdin first, in case some file named '-' exist
+ if path == '-':
+ if self.dry_run:
+ raise ArvPutUploadIsPending()
+ self._write_stdin(self.filename or 'stdin')
+ elif os.path.isdir(path):
+ # Use absolute paths on cache index so CWD doesn't interfere
+ # with the caching logic.
+ prefixdir = path = os.path.abspath(path)
+ if prefixdir != '/':
+ prefixdir += '/'
+ for root, dirs, files in os.walk(path):
+ # Make os.walk()'s dir traversing order deterministic
+ dirs.sort()
+ files.sort()
+ for f in files:
+ self._check_file(os.path.join(root, f),
+ os.path.join(root[len(prefixdir):], f))
+ else:
+ self._check_file(os.path.abspath(path),
+ self.filename or os.path.basename(path))
+ # If dry-mode is on, and got up to this point, then we should notify that
+ # there aren't any file to upload.
+ if self.dry_run:
+ raise ArvPutUploadNotPending()
+ # Remove local_collection's files that don't exist locally anymore, so the
+ # bytes_written count is correct.
+ for f in self.collection_file_paths(self._local_collection,
+ path_prefix=""):
+ if f != 'stdin' and f != self.filename and not f in self._file_paths:
+ self._local_collection.remove(f)
+ # Update bytes_written from current local collection and
+ # report initial progress.
+ self._update()
+ # Actual file upload
+ self._upload_files()
+ finally:
+ if not self.dry_run:
+ # Stop the thread before doing anything else
+ self._stop_checkpointer.set()
+ self._checkpointer.join()
+ # Commit all pending blocks & one last _update()
+ self._local_collection.manifest_text()
+ self._update(final=True)
+ if save_collection:
+ self.save_collection()
+ if self.use_cache:
+ self._cache_file.close()
+
+ def save_collection(self):
+ if self.update:
+ # Check if files should be updated on the remote collection.
+ for fp in self._file_paths:
+ remote_file = self._remote_collection.find(fp)
+ if not remote_file:
+ # File don't exist on remote collection, copy it.
+ self._remote_collection.copy(fp, fp, self._local_collection)
+ elif remote_file != self._local_collection.find(fp):
+ # A different file exist on remote collection, overwrite it.
+ self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
+ else:
+ # The file already exist on remote collection, skip it.
+ pass
+ self._remote_collection.save(num_retries=self.num_retries)
+ else:
+ self._local_collection.save_new(
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)