+ self.bytes_skipped = 0
+ self.name = name
+ self.owner_uuid = owner_uuid
+ self.ensure_unique_name = ensure_unique_name
+ self.num_retries = num_retries
+ self.replication_desired = replication_desired
+ self.filename = filename
+ self._state_lock = threading.Lock()
+ self._state = None # Previous run state (file list & manifest)
+ self._current_files = [] # Current run file list
+ self._cache_file = None
+ self._collection = None
+ self._collection_lock = threading.Lock()
+ self._stop_checkpointer = threading.Event()
+ self._checkpointer = threading.Thread(target=self._update_task)
+ self._update_task_time = update_time # How many seconds wait between update runs
+ self.logger = logging.getLogger('arvados.arv_put')
+ # Load cached data if any and if needed
+ self._setup_state()
+
+ def start(self):
+ """
+ Start supporting thread & file uploading
+ """
+ self._checkpointer.daemon = True
+ self._checkpointer.start()
+ try:
+ for path in self.paths:
+ # Test for stdin first, in case some file named '-' exist
+ if path == '-':
+ self._write_stdin(self.filename or 'stdin')
+ elif os.path.isdir(path):
+ self._write_directory_tree(path)
+ else:
+ self._write_file(path, self.filename or os.path.basename(path))
+ finally:
+ # Stop the thread before doing anything else
+ self._stop_checkpointer.set()
+ self._checkpointer.join()
+ # Commit all & one last _update()
+ self.manifest_text()
+ self._update()
+ if self.resume:
+ self._cache_file.close()
+ # Correct the final written bytes count
+ self.bytes_written -= self.bytes_skipped
+
+ def save_collection(self):
+ with self._collection_lock:
+ self._my_collection().save_new(
+ name=self.name, owner_uuid=self.owner_uuid,
+ ensure_unique_name=self.ensure_unique_name,
+ num_retries=self.num_retries)
+
+ def destroy_cache(self):
+ if self.resume:
+ try:
+ os.unlink(self._cache_filename)
+ except OSError as error:
+ # That's what we wanted anyway.
+ if error.errno != errno.ENOENT:
+ raise
+ self._cache_file.close()
+
+ def _collection_size(self, collection):
+ """
+ Recursively get the total size of the collection
+ """
+ size = 0
+ for item in collection.values():
+ if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
+ size += self._collection_size(item)
+ else:
+ size += item.size()
+ return size
+
+ def _update_task(self):
+ """
+ Periodically called support task. File uploading is
+ asynchronous so we poll status from the collection.
+ """
+ while not self._stop_checkpointer.wait(self._update_task_time):
+ self._update()
+
+ def _update(self):
+ """
+ Update cached manifest text and report progress.
+ """
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._my_collection())
+ # Update cache, if resume enabled
+ if self.resume:
+ with self._state_lock:
+ # Get the manifest text without comitting pending blocks
+ self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False, only_committed=True)
+ if self.resume:
+ self._save_state()
+ # Call the reporter, if any
+ self.report_progress()
+
+ def report_progress(self):
+ if self.reporter is not None:
+ self.reporter(self.bytes_written, self.bytes_expected)
+
+ def _write_directory_tree(self, path, stream_name="."):
+ # TODO: Check what happens when multiple directories are passed as
+ # arguments.
+ # If the code below is uncommented, integration test
+ # test_ArvPutSignedManifest (tests.test_arv_put.ArvPutIntegrationTest)
+ # fails, I suppose it is because the manifest_uuid changes because
+ # of the dir addition to stream_name.
+
+ # if stream_name == '.':
+ # stream_name = os.path.join('.', os.path.basename(path))
+ for item in os.listdir(path):
+ if os.path.isdir(os.path.join(path, item)):
+ self._write_directory_tree(os.path.join(path, item),
+ os.path.join(stream_name, item))
+ else:
+ self._write_file(os.path.join(path, item),
+ os.path.join(stream_name, item))
+
+ def _write_stdin(self, filename):
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'w')
+ self._write(sys.stdin, output)
+ output.close()
+
+ def _write_file(self, source, filename):
+ resume_offset = 0
+ if self.resume:
+ # Check if file was already uploaded (at least partially)
+ with self._collection_lock:
+ try:
+ file_in_collection = self._my_collection().find(filename)
+ except IOError:
+ # Not found
+ file_in_collection = None
+ # If no previous cached data on this file, store it for an eventual
+ # repeated run.
+ if source not in self._state['files']:
+ with self._state_lock:
+ self._state['files'][source] = {
+ 'mtime': os.path.getmtime(source),
+ 'size' : os.path.getsize(source)
+ }
+ with self._state_lock:
+ cached_file_data = self._state['files'][source]
+ # See if this file was already uploaded at least partially
+ if file_in_collection:
+ if cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ if cached_file_data['size'] == file_in_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ return
+ elif cached_file_data['size'] > file_in_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_collection.size()
+ else:
+ # Inconsistent cache, re-upload the file
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ else:
+ # Local file differs from cached data, re-upload it
+ pass
+ with open(source, 'r') as source_fd:
+ if resume_offset > 0:
+ # Start upload where we left off
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'a')
+ source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
+ else:
+ # Start from scratch
+ with self._collection_lock:
+ output = self._my_collection().open(filename, 'w')
+ self._write(source_fd, output)
+ output.close()
+
+ def _write(self, source_fd, output):
+ first_read = True
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ # Allow an empty file to be written
+ if not data and not first_read:
+ break
+ if first_read:
+ first_read = False
+ output.write(data)
+
+ def _my_collection(self):
+ """
+ Create a new collection if none cached. Load it from cache otherwise.
+ """
+ if self._collection is None:
+ with self._state_lock:
+ manifest = self._state['manifest']
+ if self.resume and manifest is not None:
+ # Create collection from saved state
+ self._collection = arvados.collection.Collection(
+ manifest,
+ replication_desired=self.replication_desired)
+ else:
+ # Create new collection
+ self._collection = arvados.collection.Collection(
+ replication_desired=self.replication_desired)
+ return self._collection
+
+ def _setup_state(self):
+ """
+ Create a new cache file or load a previously existing one.
+ """
+ if self.resume:
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update('\0'.join(realpaths))
+ if self.filename:
+ md5.update(self.filename)
+ cache_filename = md5.hexdigest()
+ self._cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
+ self._cache_file.seek(0)
+ with self._state_lock:
+ try:
+ self._state = json.load(self._cache_file)
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ except ValueError:
+ # Cache file empty, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ # Load how many bytes were uploaded on previous run
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._my_collection())
+ # No resume required