X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7cf55dce1df774d2c3daa3e54c0d4f27c38d4b11..5be63dcb589e10fbfc11fdd85dcb382708852baa:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index ff29919f1b..e7b26017d6 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -18,10 +18,14 @@ import fcntl import time import threading +from collections import deque +from stat import * + from keep import * from stream import * import config import errors +import util def normalize_stream(s, stream): stream_tokens = [s] @@ -55,9 +59,9 @@ def normalize_stream(s, stream): stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) if len(stream[f]) == 0: - stream_tokens.append("0:0:{0}".format(fout)) + stream_tokens.append("0:0:{0}".format(fout)) - return stream_tokens + return stream_tokens def normalize(collection): streams = {} @@ -156,6 +160,10 @@ class CollectionWriter(object): self._current_file_name = None self._current_file_pos = 0 self._finished_streams = [] + self._close_file = None + self._queued_file = None + self._queued_dirents = deque() + self._queued_trees = deque() def __enter__(self): pass @@ -163,30 +171,92 @@ class CollectionWriter(object): def __exit__(self): self.finish() - def write_directory_tree(self, - path, stream_name='.', max_manifest_depth=-1): - self.start_new_stream(stream_name) - todo = [] - if max_manifest_depth == 0: - dirents = sorted(util.listdir_recursive(path)) - else: - dirents = sorted(os.listdir(path)) - for dirent in dirents: + def do_queued_work(self): + # The work queue consists of three pieces: + # * _queued_file: The file object we're currently writing to the + # Collection. + # * _queued_dirents: Entries under the current directory + # (_queued_trees[0]) that we want to write or recurse through. + # This may contain files from subdirectories if + # max_manifest_depth == 0 for this directory. + # * _queued_trees: Directories that should be written as separate + # streams to the Collection. + # This function handles the smallest piece of work currently queued + # (current file, then current directory, then next directory) until + # no work remains. The _work_THING methods each do a unit of work on + # THING. _queue_THING methods add a THING to the work queue. + while True: + if self._queued_file: + self._work_file() + elif self._queued_dirents: + self._work_dirents() + elif self._queued_trees: + self._work_trees() + else: + break + + def _work_file(self): + while True: + buf = self._queued_file.read(self.KEEP_BLOCK_SIZE) + if not buf: + break + self.write(buf) + self.finish_current_file() + if self._close_file: + self._queued_file.close() + self._close_file = None + self._queued_file = None + + def _work_dirents(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + if stream_name != self.current_stream_name(): + self.start_new_stream(stream_name) + while self._queued_dirents: + dirent = self._queued_dirents.popleft() target = os.path.join(path, dirent) if os.path.isdir(target): - todo += [[target, - os.path.join(stream_name, dirent), - max_manifest_depth-1]] + self._queue_tree(target, + os.path.join(stream_name, dirent), + max_manifest_depth - 1) else: - self.start_new_file(dirent) - with open(target, 'rb') as f: - while True: - buf = f.read(2**26) - if len(buf) == 0: - break - self.write(buf) - self.finish_current_stream() - map(lambda x: self.write_directory_tree(*x), todo) + self._queue_file(target, dirent) + break + if not self._queued_dirents: + self._queued_trees.popleft() + + def _work_trees(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + make_dirents = (util.listdir_recursive if (max_manifest_depth == 0) + else os.listdir) + self._queue_dirents(stream_name, make_dirents(path)) + + def _queue_file(self, source, filename=None): + assert (self._queued_file is None), "tried to queue more than one file" + if not hasattr(source, 'read'): + source = open(source, 'rb') + self._close_file = True + else: + self._close_file = False + if filename is None: + filename = os.path.basename(source.name) + self.start_new_file(filename) + self._queued_file = source + + def _queue_dirents(self, stream_name, dirents): + assert (not self._queued_dirents), "tried to queue more than one tree" + self._queued_dirents = deque(sorted(dirents)) + + def _queue_tree(self, path, stream_name, max_manifest_depth): + self._queued_trees.append((path, stream_name, max_manifest_depth)) + + def write_file(self, source, filename=None): + self._queue_file(source, filename) + self.do_queued_work() + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + self._queue_tree(path, stream_name, max_manifest_depth) + self.do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): @@ -271,8 +341,25 @@ class CollectionWriter(object): self._current_file_name = None def finish(self): - return Keep.put(self.manifest_text()) - + # Send the stripped manifest to Keep, to ensure that we use the + # same UUID regardless of what hints are used on the collection. + return Keep.put(self.stripped_manifest()) + + def stripped_manifest(self): + """ + Return the manifest for the current collection with all permission + hints removed from the locators in the manifest. + """ + raw = self.manifest_text() + clean = '' + for line in raw.split("\n"): + fields = line.split() + if len(fields) > 0: + locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x) + for x in fields[1:-1] ] + clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n" + return clean + def manifest_text(self): self.finish_current_stream() manifest = '' @@ -284,7 +371,7 @@ class CollectionWriter(object): manifest += ' ' + ' '.join(stream[1]) manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) manifest += "\n" - + #print 'writer',manifest #print 'after reader',CollectionReader(manifest).manifest_text() @@ -295,3 +382,102 @@ class CollectionWriter(object): for name, locators, files in self._finished_streams: ret += locators return ret + + +class ResumableCollectionWriter(CollectionWriter): + STATE_PROPS = ['_current_stream_files', '_current_stream_length', + '_current_stream_locators', '_current_stream_name', + '_current_file_name', '_current_file_pos', '_close_file', + '_data_buffer', '_dependencies', '_finished_streams', + '_queued_dirents', '_queued_trees'] + + def __init__(self): + self._dependencies = {} + super(ResumableCollectionWriter, self).__init__() + + @classmethod + def from_state(cls, state, *init_args, **init_kwargs): + # Try to build a new writer from scratch with the given state. + # If the state is not suitable to resume (because files have changed, + # been deleted, aren't predictable, etc.), raise a + # StaleWriterStateError. Otherwise, return the initialized writer. + # The caller is responsible for calling writer.do_queued_work() + # appropriately after it's returned. + writer = cls(*init_args, **init_kwargs) + for attr_name in cls.STATE_PROPS: + attr_value = state[attr_name] + attr_class = getattr(writer, attr_name).__class__ + # Coerce the value into the same type as the initial value, if + # needed. + if attr_class not in (type(None), attr_value.__class__): + attr_value = attr_class(attr_value) + setattr(writer, attr_name, attr_value) + # Check dependencies before we try to resume anything. + if any(KeepLocator(ls).permission_expired() + for ls in writer._current_stream_locators): + raise errors.StaleWriterStateError( + "locators include expired permission hint") + writer.check_dependencies() + if state['_current_file'] is not None: + path, pos = state['_current_file'] + try: + writer._queued_file = open(path, 'rb') + writer._queued_file.seek(pos) + except IOError as error: + raise errors.StaleWriterStateError( + "failed to reopen active file {}: {}".format(path, error)) + return writer + + def check_dependencies(self): + for path, orig_stat in self._dependencies.items(): + if not S_ISREG(orig_stat[ST_MODE]): + raise errors.StaleWriterStateError("{} not file".format(path)) + try: + now_stat = tuple(os.stat(path)) + except OSError as error: + raise errors.StaleWriterStateError( + "failed to stat {}: {}".format(path, error)) + if ((not S_ISREG(now_stat[ST_MODE])) or + (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or + (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): + raise errors.StaleWriterStateError("{} changed".format(path)) + + def dump_state(self, copy_func=lambda x: x): + state = {attr: copy_func(getattr(self, attr)) + for attr in self.STATE_PROPS} + if self._queued_file is None: + state['_current_file'] = None + else: + state['_current_file'] = (os.path.realpath(self._queued_file.name), + self._queued_file.tell()) + return state + + def _queue_file(self, source, filename=None): + try: + src_path = os.path.realpath(source) + except Exception: + raise errors.AssertionError("{} not a file path".format(source)) + try: + path_stat = os.stat(src_path) + except OSError as stat_error: + path_stat = None + super(ResumableCollectionWriter, self)._queue_file(source, filename) + fd_stat = os.fstat(self._queued_file.fileno()) + if not S_ISREG(fd_stat.st_mode): + # We won't be able to resume from this cache anyway, so don't + # worry about further checks. + self._dependencies[source] = tuple(fd_stat) + elif path_stat is None: + raise errors.AssertionError( + "could not stat {}: {}".format(source, stat_error)) + elif path_stat.st_ino != fd_stat.st_ino: + raise errors.AssertionError( + "{} changed between open and stat calls".format(source)) + else: + self._dependencies[src_path] = tuple(fd_stat) + + def write(self, data): + if self._queued_file is None: + raise errors.AssertionError( + "resumable writer can't accept unsourced data") + return super(ResumableCollectionWriter, self).write(data)