X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/516d43a18765674c2a5d0f0bc2a4a4a789d4c61c..586c0409bf9496bae169c2d51b04806b82c342a9:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 7b38ccd180..e7b26017d6 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -171,7 +171,7 @@ class CollectionWriter(object): def __exit__(self): self.finish() - def _do_queued_work(self): + def do_queued_work(self): # The work queue consists of three pieces: # * _queued_file: The file object we're currently writing to the # Collection. @@ -194,11 +194,6 @@ class CollectionWriter(object): self._work_trees() else: break - self.checkpoint_state() - - def checkpoint_state(self): - # Subclasses can implement this method to, e.g., report or record state. - pass def _work_file(self): while True: @@ -256,12 +251,12 @@ class CollectionWriter(object): def write_file(self, source, filename=None): self._queue_file(source, filename) - self._do_queued_work() + self.do_queued_work() def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1): self._queue_tree(path, stream_name, max_manifest_depth) - self._do_queued_work() + self.do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): @@ -280,7 +275,6 @@ class CollectionWriter(object): self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])] self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]] self._data_buffer_len = len(self._data_buffer[0]) - self.checkpoint_state() def start_new_file(self, newfilename=None): self.finish_current_file() @@ -347,8 +341,25 @@ class CollectionWriter(object): self._current_file_name = None def finish(self): - return Keep.put(self.manifest_text()) - + # Send the stripped manifest to Keep, to ensure that we use the + # same UUID regardless of what hints are used on the collection. + return Keep.put(self.stripped_manifest()) + + def stripped_manifest(self): + """ + Return the manifest for the current collection with all permission + hints removed from the locators in the manifest. + """ + raw = self.manifest_text() + clean = '' + for line in raw.split("\n"): + fields = line.split() + if len(fields) > 0: + locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x) + for x in fields[1:-1] ] + clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n" + return clean + def manifest_text(self): self.finish_current_stream() manifest = '' @@ -385,8 +396,14 @@ class ResumableCollectionWriter(CollectionWriter): super(ResumableCollectionWriter, self).__init__() @classmethod - def from_state(cls, state): - writer = cls() + def from_state(cls, state, *init_args, **init_kwargs): + # Try to build a new writer from scratch with the given state. + # If the state is not suitable to resume (because files have changed, + # been deleted, aren't predictable, etc.), raise a + # StaleWriterStateError. Otherwise, return the initialized writer. + # The caller is responsible for calling writer.do_queued_work() + # appropriately after it's returned. + writer = cls(*init_args, **init_kwargs) for attr_name in cls.STATE_PROPS: attr_value = state[attr_name] attr_class = getattr(writer, attr_name).__class__ @@ -409,7 +426,6 @@ class ResumableCollectionWriter(CollectionWriter): except IOError as error: raise errors.StaleWriterStateError( "failed to reopen active file {}: {}".format(path, error)) - writer._do_queued_work() return writer def check_dependencies(self): @@ -443,15 +459,22 @@ class ResumableCollectionWriter(CollectionWriter): raise errors.AssertionError("{} not a file path".format(source)) try: path_stat = os.stat(src_path) - except OSError as error: - raise errors.AssertionError( - "could not stat {}: {}".format(source, error)) + except OSError as stat_error: + path_stat = None super(ResumableCollectionWriter, self)._queue_file(source, filename) fd_stat = os.fstat(self._queued_file.fileno()) - if path_stat.st_ino != fd_stat.st_ino: + if not S_ISREG(fd_stat.st_mode): + # We won't be able to resume from this cache anyway, so don't + # worry about further checks. + self._dependencies[source] = tuple(fd_stat) + elif path_stat is None: + raise errors.AssertionError( + "could not stat {}: {}".format(source, stat_error)) + elif path_stat.st_ino != fd_stat.st_ino: raise errors.AssertionError( "{} changed between open and stat calls".format(source)) - self._dependencies[src_path] = tuple(fd_stat) + else: + self._dependencies[src_path] = tuple(fd_stat) def write(self, data): if self._queued_file is None: