X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ae4db6299b2d255bbc08a4c6fd3e77abcf030fb8..426417d8de4f288f6b360006cfe61c591b5e10a1:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 814bd75a1e..e420a679e2 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -41,6 +41,9 @@ def normalize_stream(s, stream): blocks[b[arvados.LOCATOR]] = streamoffset streamoffset += b[arvados.BLOCKSIZE] + if len(stream_tokens) == 1: + stream_tokens.append(config.EMPTY_BLOCK_LOCATOR) + for f in sortedfiles: current_span = None fout = f.replace(' ', '\\040') @@ -143,9 +146,13 @@ class CollectionReader(object): for f in s.all_files(): yield f - def manifest_text(self): + def manifest_text(self, strip=False): self._populate() - return self._manifest_text + if strip: + m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams]) + return m + else: + return self._manifest_text class CollectionWriter(object): KEEP_BLOCK_SIZE = 2**26 @@ -171,7 +178,7 @@ class CollectionWriter(object): def __exit__(self): self.finish() - def _do_queued_work(self): + def do_queued_work(self): # The work queue consists of three pieces: # * _queued_file: The file object we're currently writing to the # Collection. @@ -194,11 +201,6 @@ class CollectionWriter(object): self._work_trees() else: break - self.checkpoint_state() - - def checkpoint_state(self): - # Subclasses can implement this method to, e.g., report or record state. - pass def _work_file(self): while True: @@ -233,7 +235,11 @@ class CollectionWriter(object): path, stream_name, max_manifest_depth = self._queued_trees[0] make_dirents = (util.listdir_recursive if (max_manifest_depth == 0) else os.listdir) - self._queue_dirents(stream_name, make_dirents(path)) + d = make_dirents(path) + if len(d) > 0: + self._queue_dirents(stream_name, d) + else: + self._queued_trees.popleft() def _queue_file(self, source, filename=None): assert (self._queued_file is None), "tried to queue more than one file" @@ -256,12 +262,12 @@ class CollectionWriter(object): def write_file(self, source, filename=None): self._queue_file(source, filename) - self._do_queued_work() + self.do_queued_work() def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1): self._queue_tree(path, stream_name, max_manifest_depth) - self._do_queued_work() + self.do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): @@ -280,7 +286,6 @@ class CollectionWriter(object): self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])] self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]] self._data_buffer_len = len(self._data_buffer[0]) - self.checkpoint_state() def start_new_file(self, newfilename=None): self.finish_current_file() @@ -347,7 +352,24 @@ class CollectionWriter(object): self._current_file_name = None def finish(self): - return Keep.put(self.manifest_text()) + # Send the stripped manifest to Keep, to ensure that we use the + # same UUID regardless of what hints are used on the collection. + return Keep.put(self.stripped_manifest()) + + def stripped_manifest(self): + """ + Return the manifest for the current collection with all permission + hints removed from the locators in the manifest. + """ + raw = self.manifest_text() + clean = '' + for line in raw.split("\n"): + fields = line.split() + if len(fields) > 0: + locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x) + for x in fields[1:-1] ] + clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n" + return clean def manifest_text(self): self.finish_current_stream() @@ -361,10 +383,10 @@ class CollectionWriter(object): manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) manifest += "\n" - #print 'writer',manifest - #print 'after reader',CollectionReader(manifest).manifest_text() - - return CollectionReader(manifest).manifest_text() + if len(manifest) > 0: + return CollectionReader(manifest).manifest_text() + else: + return "" def data_locators(self): ret = [] @@ -385,8 +407,14 @@ class ResumableCollectionWriter(CollectionWriter): super(ResumableCollectionWriter, self).__init__() @classmethod - def from_state(cls, state): - writer = cls() + def from_state(cls, state, *init_args, **init_kwargs): + # Try to build a new writer from scratch with the given state. + # If the state is not suitable to resume (because files have changed, + # been deleted, aren't predictable, etc.), raise a + # StaleWriterStateError. Otherwise, return the initialized writer. + # The caller is responsible for calling writer.do_queued_work() + # appropriately after it's returned. + writer = cls(*init_args, **init_kwargs) for attr_name in cls.STATE_PROPS: attr_value = state[attr_name] attr_class = getattr(writer, attr_name).__class__ @@ -396,6 +424,10 @@ class ResumableCollectionWriter(CollectionWriter): attr_value = attr_class(attr_value) setattr(writer, attr_name, attr_value) # Check dependencies before we try to resume anything. + if any(KeepLocator(ls).permission_expired() + for ls in writer._current_stream_locators): + raise errors.StaleWriterStateError( + "locators include expired permission hint") writer.check_dependencies() if state['_current_file'] is not None: path, pos = state['_current_file'] @@ -405,7 +437,6 @@ class ResumableCollectionWriter(CollectionWriter): except IOError as error: raise errors.StaleWriterStateError( "failed to reopen active file {}: {}".format(path, error)) - writer._do_queued_work() return writer def check_dependencies(self): @@ -439,15 +470,22 @@ class ResumableCollectionWriter(CollectionWriter): raise errors.AssertionError("{} not a file path".format(source)) try: path_stat = os.stat(src_path) - except OSError as error: - raise errors.AssertionError( - "could not stat {}: {}".format(source, error)) + except OSError as stat_error: + path_stat = None super(ResumableCollectionWriter, self)._queue_file(source, filename) fd_stat = os.fstat(self._queued_file.fileno()) - if path_stat.st_ino != fd_stat.st_ino: + if not S_ISREG(fd_stat.st_mode): + # We won't be able to resume from this cache anyway, so don't + # worry about further checks. + self._dependencies[source] = tuple(fd_stat) + elif path_stat is None: + raise errors.AssertionError( + "could not stat {}: {}".format(source, stat_error)) + elif path_stat.st_ino != fd_stat.st_ino: raise errors.AssertionError( "{} changed between open and stat calls".format(source)) - self._dependencies[src_path] = tuple(fd_stat) + else: + self._dependencies[src_path] = tuple(fd_stat) def write(self, data): if self._queued_file is None: