- def flush_data(self):
- start_buffer_len = self._data_buffer_len
- start_block_count = self.bytes_written / arvados.config.KEEP_BLOCK_SIZE
- super(ArvPutCollectionWriter, self).flush_data()
- if self._data_buffer_len < start_buffer_len: # We actually PUT data.
- self.bytes_written += (start_buffer_len - self._data_buffer_len)
- self.report_progress()
- if (self.bytes_written / arvados.config.KEEP_BLOCK_SIZE) > start_block_count:
- self.cache_state()
-
- def _record_new_input(self, input_type, source_name, dest_name):
- # The key needs to be a list because that's what we'll get back
- # from JSON deserialization.
- key = [input_type, source_name, dest_name]
- if key in self._seen_inputs:
- return False
- self._seen_inputs.append(key)
- return True
-
- def write_file(self, source, filename=None):
- if self._record_new_input('file', source, filename):
- super(ArvPutCollectionWriter, self).write_file(source, filename)
-
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- if self._record_new_input('directory', path, stream_name):
- super(ArvPutCollectionWriter, self).write_directory_tree(
- path, stream_name, max_manifest_depth)
+ def _write_stdin(self, filename):
+ output = self._local_collection.open(filename, 'w')
+ self._write(sys.stdin, output)
+ output.close()
+
+ def _write_file(self, source, filename):
+ resume_offset = 0
+ should_upload = False
+ new_file_in_cache = False
+
+ # Record file path for updating the remote collection before exiting
+ self._file_paths.append(filename)
+
+ with self._state_lock:
+ # If no previous cached data on this file, store it for an eventual
+ # repeated run.
+ if source not in self._state['files']:
+ self._state['files'][source] = {
+ 'mtime': os.path.getmtime(source),
+ 'size' : os.path.getsize(source)
+ }
+ new_file_in_cache = True
+ cached_file_data = self._state['files'][source]
+
+ # Check if file was already uploaded (at least partially)
+ file_in_local_collection = self._local_collection.find(filename)
+
+ # If not resuming, upload the full file.
+ if not self.resume:
+ should_upload = True
+ # New file detected from last run, upload it.
+ elif new_file_in_cache:
+ should_upload = True
+ # Local file didn't change from last run.
+ elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
+ if not file_in_local_collection:
+ # File not uploaded yet, upload it completely
+ should_upload = True
+ elif cached_file_data['size'] == file_in_local_collection.size():
+ # File already there, skip it.
+ self.bytes_skipped += cached_file_data['size']
+ elif cached_file_data['size'] > file_in_local_collection.size():
+ # File partially uploaded, resume!
+ resume_offset = file_in_local_collection.size()
+ should_upload = True
+ else:
+ # Inconsistent cache, re-upload the file
+ should_upload = True
+ self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ # Local file differs from cached data, re-upload it.
+ else:
+ should_upload = True
+
+ if should_upload:
+ with open(source, 'r') as source_fd:
+ with self._state_lock:
+ self._state['files'][source]['mtime'] = os.path.getmtime(source)
+ self._state['files'][source]['size'] = os.path.getsize(source)
+ if resume_offset > 0:
+ # Start upload where we left off
+ output = self._local_collection.open(filename, 'a')
+ source_fd.seek(resume_offset)
+ self.bytes_skipped += resume_offset
+ else:
+ # Start from scratch
+ output = self._local_collection.open(filename, 'w')
+ self._write(source_fd, output)
+ output.close(flush=False)
+
+ def _write(self, source_fd, output):
+ first_read = True
+ while True:
+ data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
+ # Allow an empty file to be written
+ if not data and not first_read:
+ break
+ if first_read:
+ first_read = False
+ output.write(data)
+
+ def _my_collection(self):
+ return self._remote_collection if self.update else self._local_collection
+
+ def _setup_state(self, update_collection):
+ """
+ Create a new cache file or load a previously existing one.
+ """
+ # Load an already existing collection for update
+ if update_collection and re.match(arvados.util.collection_uuid_pattern,
+ update_collection):
+ try:
+ self._remote_collection = arvados.collection.Collection(update_collection)
+ except arvados.errors.ApiError as error:
+ raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
+ else:
+ self.update = True
+ elif update_collection:
+ # Collection locator provided, but unknown format
+ raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
+
+ # Set up cache file name from input paths.
+ md5 = hashlib.md5()
+ md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
+ realpaths = sorted(os.path.realpath(path) for path in self.paths)
+ md5.update('\0'.join(realpaths))
+ if self.filename:
+ md5.update(self.filename)
+ cache_filename = md5.hexdigest()
+ self._cache_file = open(os.path.join(
+ arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
+ cache_filename), 'a+')
+ self._cache_filename = self._cache_file.name
+ self._lock_file(self._cache_file)
+ self._cache_file.seek(0)
+ with self._state_lock:
+ try:
+ self._state = json.load(self._cache_file)
+ if not set(['manifest', 'files']).issubset(set(self._state.keys())):
+ # Cache at least partially incomplete, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+ except ValueError:
+ # Cache file empty, set up new cache
+ self._state = copy.deepcopy(self.EMPTY_STATE)
+
+ # Load the previous manifest so we can check if files were modified remotely.
+ self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
+ # Load how many bytes were uploaded on previous run
+ with self._collection_lock:
+ self.bytes_written = self._collection_size(self._local_collection)
+
+ def _lock_file(self, fileobj):
+ try:
+ fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ raise ResumeCacheConflict("{} locked".format(fileobj.name))
+
+ def _save_state(self):
+ """
+ Atomically save current state into cache.
+ """
+ try:
+ with self._state_lock:
+ state = self._state
+ new_cache_fd, new_cache_name = tempfile.mkstemp(
+ dir=os.path.dirname(self._cache_filename))
+ self._lock_file(new_cache_fd)
+ new_cache = os.fdopen(new_cache_fd, 'r+')
+ json.dump(state, new_cache)
+ new_cache.flush()
+ os.fsync(new_cache)
+ os.rename(new_cache_name, self._cache_filename)
+ except (IOError, OSError, ResumeCacheConflict) as error:
+ self.logger.error("There was a problem while saving the cache file: {}".format(error))
+ try:
+ os.unlink(new_cache_name)
+ except NameError: # mkstemp failed.
+ pass
+ else:
+ self._cache_file.close()
+ self._cache_file = new_cache
+
+ def collection_name(self):
+ return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+
+ def manifest_locator(self):
+ return self._my_collection().manifest_locator()
+
+ def portable_data_hash(self):
+ return self._my_collection().portable_data_hash()
+
+ def manifest_text(self, stream_name=".", strip=False, normalize=False):
+ return self._my_collection().manifest_text(stream_name, strip, normalize)
+
+ def _datablocks_on_item(self, item):
+ """
+ Return a list of datablock locators, recursively navigating
+ through subcollections
+ """
+ if isinstance(item, arvados.arvfile.ArvadosFile):
+ if item.size() == 0:
+ # Empty file locator
+ return ["d41d8cd98f00b204e9800998ecf8427e+0"]
+ else:
+ locators = []
+ for segment in item.segments():
+ loc = segment.locator
+ locators.append(loc)
+ return locators
+ elif isinstance(item, arvados.collection.Collection):
+ l = [self._datablocks_on_item(x) for x in item.values()]
+ # Fast list flattener method taken from:
+ # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
+ return [loc for sublist in l for loc in sublist]
+ else:
+ return None
+
+ def data_locators(self):
+ with self._collection_lock:
+ # Make sure all datablocks are flushed before getting the locators
+ self._my_collection().manifest_text()
+ datablocks = self._datablocks_on_item(self._my_collection())
+ return datablocks