X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1b82cd274ecebba9302e8a06f6c9e99eaf8ec717..fe85ec515483d95ac36e00e5a411da9c1f76f2de:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index b069e8d216..814bd75a1e 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -18,10 +18,50 @@ import fcntl import time import threading +from collections import deque +from stat import * + from keep import * from stream import * import config import errors +import util + +def normalize_stream(s, stream): + stream_tokens = [s] + sortedfiles = list(stream.keys()) + sortedfiles.sort() + + blocks = {} + streamoffset = 0L + for f in sortedfiles: + for b in stream[f]: + if b[arvados.LOCATOR] not in blocks: + stream_tokens.append(b[arvados.LOCATOR]) + blocks[b[arvados.LOCATOR]] = streamoffset + streamoffset += b[arvados.BLOCKSIZE] + + for f in sortedfiles: + current_span = None + fout = f.replace(' ', '\\040') + for segment in stream[f]: + segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET] + if current_span == None: + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] + else: + if segmentoffset == current_span[1]: + current_span[1] += segment[arvados.SEGMENTSIZE] + else: + stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) + current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]] + + if current_span != None: + stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) + + if len(stream[f]) == 0: + stream_tokens.append("0:0:{0}".format(fout)) + + return stream_tokens def normalize(collection): streams = {} @@ -35,57 +75,28 @@ def normalize(collection): streams[streamname] = {} if filename not in streams[streamname]: streams[streamname][filename] = [] - streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size())) - - manifest = "" + for r in f.segments: + streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1])) + + normalized_streams = [] sortedstreams = list(streams.keys()) sortedstreams.sort() - #import pprint - #pprint.pprint(streams) for s in sortedstreams: - stream = streams[s] - manifest += s.replace(' ', '\\040') - sortedfiles = list(stream.keys()) - sortedfiles.sort() - - blocks = {} - streamoffset = 0L - for f in sortedfiles: - for b in stream[f]: - if b[StreamReader.LOCATOR] not in blocks: - manifest += " " + b[StreamReader.LOCATOR] - blocks[b[StreamReader.LOCATOR]] = streamoffset - streamoffset += b[StreamReader.BLOCKSIZE] - - for f in sortedfiles: - current_span = None - fout = f.replace(' ', '\\040') - for chunk in stream[f]: - chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.CHUNKOFFSET] - if current_span == None: - current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]] - else: - if chunkoffset == current_span[1]: - current_span[1] += chunk[StreamReader.CHUNKSIZE] - else: - manifest += " " + "{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout) - current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]] + normalized_streams.append(normalize_stream(s, streams[s])) + return normalized_streams - if current_span != None: - manifest += " " + "{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout) - - manifest += "\n" - manifest = manifest - return manifest class CollectionReader(object): def __init__(self, manifest_locator_or_text): - if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text): + if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text): + self._manifest_locator = manifest_locator_or_text + self._manifest_text = None + elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text): self._manifest_text = manifest_locator_or_text self._manifest_locator = None else: - self._manifest_locator = manifest_locator_or_text - self._manifest_text = None + raise errors.ArgumentError( + "Argument to CollectionReader must be a manifest or a collection UUID") self._streams = None def __enter__(self): @@ -111,12 +122,20 @@ class CollectionReader(object): if stream_line != '': stream_tokens = stream_line.split() self._streams += [stream_tokens] + self._streams = normalize(self) + + # now regenerate the manifest text based on the normalized stream + + #print "normalizing", self._manifest_text + self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams]) + #print "result", self._manifest_text + def all_streams(self): self._populate() resp = [] for s in self._streams: - resp += [StreamReader(s)] + resp.append(StreamReader(s)) return resp def all_files(self): @@ -141,6 +160,10 @@ class CollectionWriter(object): self._current_file_name = None self._current_file_pos = 0 self._finished_streams = [] + self._close_file = None + self._queued_file = None + self._queued_dirents = deque() + self._queued_trees = deque() def __enter__(self): pass @@ -148,30 +171,97 @@ class CollectionWriter(object): def __exit__(self): self.finish() - def write_directory_tree(self, - path, stream_name='.', max_manifest_depth=-1): - self.start_new_stream(stream_name) - todo = [] - if max_manifest_depth == 0: - dirents = sorted(util.listdir_recursive(path)) - else: - dirents = sorted(os.listdir(path)) - for dirent in dirents: + def _do_queued_work(self): + # The work queue consists of three pieces: + # * _queued_file: The file object we're currently writing to the + # Collection. + # * _queued_dirents: Entries under the current directory + # (_queued_trees[0]) that we want to write or recurse through. + # This may contain files from subdirectories if + # max_manifest_depth == 0 for this directory. + # * _queued_trees: Directories that should be written as separate + # streams to the Collection. + # This function handles the smallest piece of work currently queued + # (current file, then current directory, then next directory) until + # no work remains. The _work_THING methods each do a unit of work on + # THING. _queue_THING methods add a THING to the work queue. + while True: + if self._queued_file: + self._work_file() + elif self._queued_dirents: + self._work_dirents() + elif self._queued_trees: + self._work_trees() + else: + break + self.checkpoint_state() + + def checkpoint_state(self): + # Subclasses can implement this method to, e.g., report or record state. + pass + + def _work_file(self): + while True: + buf = self._queued_file.read(self.KEEP_BLOCK_SIZE) + if not buf: + break + self.write(buf) + self.finish_current_file() + if self._close_file: + self._queued_file.close() + self._close_file = None + self._queued_file = None + + def _work_dirents(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + if stream_name != self.current_stream_name(): + self.start_new_stream(stream_name) + while self._queued_dirents: + dirent = self._queued_dirents.popleft() target = os.path.join(path, dirent) if os.path.isdir(target): - todo += [[target, - os.path.join(stream_name, dirent), - max_manifest_depth-1]] + self._queue_tree(target, + os.path.join(stream_name, dirent), + max_manifest_depth - 1) else: - self.start_new_file(dirent) - with open(target, 'rb') as f: - while True: - buf = f.read(2**26) - if len(buf) == 0: - break - self.write(buf) - self.finish_current_stream() - map(lambda x: self.write_directory_tree(*x), todo) + self._queue_file(target, dirent) + break + if not self._queued_dirents: + self._queued_trees.popleft() + + def _work_trees(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + make_dirents = (util.listdir_recursive if (max_manifest_depth == 0) + else os.listdir) + self._queue_dirents(stream_name, make_dirents(path)) + + def _queue_file(self, source, filename=None): + assert (self._queued_file is None), "tried to queue more than one file" + if not hasattr(source, 'read'): + source = open(source, 'rb') + self._close_file = True + else: + self._close_file = False + if filename is None: + filename = os.path.basename(source.name) + self.start_new_file(filename) + self._queued_file = source + + def _queue_dirents(self, stream_name, dirents): + assert (not self._queued_dirents), "tried to queue more than one tree" + self._queued_dirents = deque(sorted(dirents)) + + def _queue_tree(self, path, stream_name, max_manifest_depth): + self._queued_trees.append((path, stream_name, max_manifest_depth)) + + def write_file(self, source, filename=None): + self._queue_file(source, filename) + self._do_queued_work() + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + self._queue_tree(path, stream_name, max_manifest_depth) + self._do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): @@ -190,6 +280,7 @@ class CollectionWriter(object): self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])] self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]] self._data_buffer_len = len(self._data_buffer[0]) + self.checkpoint_state() def start_new_file(self, newfilename=None): self.finish_current_file() @@ -261,19 +352,105 @@ class CollectionWriter(object): def manifest_text(self): self.finish_current_stream() manifest = '' + for stream in self._finished_streams: if not re.search(r'^\.(/.*)?$', stream[0]): manifest += './' manifest += stream[0].replace(' ', '\\040') - for locator in stream[1]: - manifest += " %s" % locator - for sfile in stream[2]: - manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) + manifest += ' ' + ' '.join(stream[1]) + manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) manifest += "\n" - return manifest + + #print 'writer',manifest + #print 'after reader',CollectionReader(manifest).manifest_text() + + return CollectionReader(manifest).manifest_text() def data_locators(self): ret = [] for name, locators, files in self._finished_streams: ret += locators return ret + + +class ResumableCollectionWriter(CollectionWriter): + STATE_PROPS = ['_current_stream_files', '_current_stream_length', + '_current_stream_locators', '_current_stream_name', + '_current_file_name', '_current_file_pos', '_close_file', + '_data_buffer', '_dependencies', '_finished_streams', + '_queued_dirents', '_queued_trees'] + + def __init__(self): + self._dependencies = {} + super(ResumableCollectionWriter, self).__init__() + + @classmethod + def from_state(cls, state): + writer = cls() + for attr_name in cls.STATE_PROPS: + attr_value = state[attr_name] + attr_class = getattr(writer, attr_name).__class__ + # Coerce the value into the same type as the initial value, if + # needed. + if attr_class not in (type(None), attr_value.__class__): + attr_value = attr_class(attr_value) + setattr(writer, attr_name, attr_value) + # Check dependencies before we try to resume anything. + writer.check_dependencies() + if state['_current_file'] is not None: + path, pos = state['_current_file'] + try: + writer._queued_file = open(path, 'rb') + writer._queued_file.seek(pos) + except IOError as error: + raise errors.StaleWriterStateError( + "failed to reopen active file {}: {}".format(path, error)) + writer._do_queued_work() + return writer + + def check_dependencies(self): + for path, orig_stat in self._dependencies.items(): + if not S_ISREG(orig_stat[ST_MODE]): + raise errors.StaleWriterStateError("{} not file".format(path)) + try: + now_stat = tuple(os.stat(path)) + except OSError as error: + raise errors.StaleWriterStateError( + "failed to stat {}: {}".format(path, error)) + if ((not S_ISREG(now_stat[ST_MODE])) or + (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or + (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): + raise errors.StaleWriterStateError("{} changed".format(path)) + + def dump_state(self, copy_func=lambda x: x): + state = {attr: copy_func(getattr(self, attr)) + for attr in self.STATE_PROPS} + if self._queued_file is None: + state['_current_file'] = None + else: + state['_current_file'] = (os.path.realpath(self._queued_file.name), + self._queued_file.tell()) + return state + + def _queue_file(self, source, filename=None): + try: + src_path = os.path.realpath(source) + except Exception: + raise errors.AssertionError("{} not a file path".format(source)) + try: + path_stat = os.stat(src_path) + except OSError as error: + raise errors.AssertionError( + "could not stat {}: {}".format(source, error)) + super(ResumableCollectionWriter, self)._queue_file(source, filename) + fd_stat = os.fstat(self._queued_file.fileno()) + if path_stat.st_ino != fd_stat.st_ino: + raise errors.AssertionError( + "{} changed between open and stat calls".format(source)) + self._dependencies[src_path] = tuple(fd_stat) + + def write(self, data): + if self._queued_file is None: + raise errors.AssertionError( + "resumable writer can't accept unsourced data") + return super(ResumableCollectionWriter, self).write(data)