X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cf7e30873ef4b92cc8ec099b2bb344391a070e93..112d6ce9132ea6749aef115e3787483958e858fd:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index fb3dea43ac..874c38e79e 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -18,12 +18,17 @@ import fcntl import time import threading +from collections import deque +from stat import * + from keep import * from stream import * import config import errors import util +_logger = logging.getLogger('arvados.collection') + def normalize_stream(s, stream): stream_tokens = [s] sortedfiles = list(stream.keys()) @@ -38,6 +43,9 @@ def normalize_stream(s, stream): blocks[b[arvados.LOCATOR]] = streamoffset streamoffset += b[arvados.BLOCKSIZE] + if len(stream_tokens) == 1: + stream_tokens.append(config.EMPTY_BLOCK_LOCATOR) + for f in sortedfiles: current_span = None fout = f.replace(' ', '\\040') @@ -56,9 +64,9 @@ def normalize_stream(s, stream): stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout)) if len(stream[f]) == 0: - stream_tokens.append("0:0:{0}".format(fout)) + stream_tokens.append("0:0:{0}".format(fout)) - return stream_tokens + return stream_tokens def normalize(collection): streams = {} @@ -83,12 +91,52 @@ def normalize(collection): return normalized_streams -class CollectionReader(object): - def __init__(self, manifest_locator_or_text): - if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text): +class CollectionBase(object): + def __enter__(self): + pass + + def __exit__(self): + pass + + def _my_keep(self): + if self._keep_client is None: + self._keep_client = KeepClient(api_client=self._api_client, + num_retries=self.num_retries) + return self._keep_client + + +class CollectionReader(CollectionBase): + def __init__(self, manifest_locator_or_text, api_client=None, + keep_client=None, num_retries=0): + """Instantiate a CollectionReader. + + This class parses Collection manifests to provide a simple interface + to read its underlying files. + + Arguments: + * manifest_locator_or_text: One of a Collection UUID, portable data + hash, or full manifest text. + * api_client: The API client to use to look up Collections. If not + provided, CollectionReader will build one from available Arvados + configuration. + * keep_client: The KeepClient to use to download Collection data. + If not provided, CollectionReader will build one from available + Arvados configuration. + * num_retries: The default number of times to retry failed + service requests. Default 0. You may change this value + after instantiation, but note those changes may not + propagate to related objects like the Keep client. + """ + self._api_client = api_client + self._keep_client = keep_client + self.num_retries = num_retries + if re.match(util.keep_locator_pattern, manifest_locator_or_text): + self._manifest_locator = manifest_locator_or_text + self._manifest_text = None + elif re.match(util.collection_uuid_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text self._manifest_text = None - elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text): + elif re.match(util.manifest_pattern, manifest_locator_or_text): self._manifest_text = manifest_locator_or_text self._manifest_locator = None else: @@ -96,58 +144,98 @@ class CollectionReader(object): "Argument to CollectionReader must be a manifest or a collection UUID") self._streams = None - def __enter__(self): - pass - - def __exit__(self): - pass - def _populate(self): - if self._streams != None: + if self._streams is not None: return + error_via_api = None + error_via_keep = None + should_try_keep = (not self._manifest_text and + util.keep_locator_pattern.match( + self._manifest_locator)) + if (not self._manifest_text and + util.signed_locator_pattern.match(self._manifest_locator)): + try: + self._populate_from_keep() + except e: + error_via_keep = e if not self._manifest_text: try: - c = arvados.api('v1').collections().get( - uuid=self._manifest_locator).execute() - self._manifest_text = c['manifest_text'] + self._populate_from_api_server() except Exception as e: - logging.warning("API lookup failed for collection %s (%s: %s)" % - (self._manifest_locator, type(e), str(e))) - self._manifest_text = Keep.get(self._manifest_locator) - self._streams = [] - for stream_line in self._manifest_text.split("\n"): - if stream_line != '': - stream_tokens = stream_line.split() - self._streams += [stream_tokens] + if not should_try_keep: + raise + error_via_api = e + if (not self._manifest_text and + not error_via_keep and + should_try_keep): + # Looks like a keep locator, and we didn't already try keep above + try: + self._populate_from_keep() + except Exception as e: + error_via_keep = e + if not self._manifest_text: + # Nothing worked! + raise arvados.errors.NotFoundError( + ("Failed to retrieve collection '{}' " + + "from either API server ({}) or Keep ({})." + ).format( + self._manifest_locator, + error_via_api, + error_via_keep)) + self._streams = [sline.split() + for sline in self._manifest_text.split("\n") + if sline] self._streams = normalize(self) # now regenerate the manifest text based on the normalized stream #print "normalizing", self._manifest_text - self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams]) + self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams]) #print "result", self._manifest_text def all_streams(self): self._populate() - resp = [] - for s in self._streams: - resp.append(StreamReader(s)) - return resp + return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) + for s in self._streams] def all_files(self): for s in self.all_streams(): for f in s.all_files(): yield f - def manifest_text(self): + def manifest_text(self, strip=False): self._populate() - return self._manifest_text + if strip: + m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams]) + return m + else: + return self._manifest_text + -class CollectionWriter(object): +class CollectionWriter(CollectionBase): KEEP_BLOCK_SIZE = 2**26 - def __init__(self): + def __init__(self, api_client=None, num_retries=0): + """Instantiate a CollectionWriter. + + CollectionWriter lets you build a new Arvados Collection from scratch. + Write files to it. The CollectionWriter will upload data to Keep as + appropriate, and provide you with the Collection manifest text when + you're finished. + + Arguments: + * api_client: The API client to use to look up Collections. If not + provided, CollectionReader will build one from available Arvados + configuration. + * num_retries: The default number of times to retry failed + service requests. Default 0. You may change this value + after instantiation, but note those changes may not + propagate to related objects like the Keep client. + """ + self._api_client = api_client + self.num_retries = num_retries + self._keep_client = None self._data_buffer = [] self._data_buffer_len = 0 self._current_stream_files = [] @@ -157,44 +245,111 @@ class CollectionWriter(object): self._current_file_name = None self._current_file_pos = 0 self._finished_streams = [] - - def __enter__(self): - pass + self._close_file = None + self._queued_file = None + self._queued_dirents = deque() + self._queued_trees = deque() def __exit__(self): self.finish() - def write_directory_tree(self, - path, stream_name='.', max_manifest_depth=-1): - self.start_new_stream(stream_name) - todo = [] - if max_manifest_depth == 0: - dirents = sorted(util.listdir_recursive(path)) - else: - dirents = sorted(os.listdir(path)) - for dirent in dirents: + def do_queued_work(self): + # The work queue consists of three pieces: + # * _queued_file: The file object we're currently writing to the + # Collection. + # * _queued_dirents: Entries under the current directory + # (_queued_trees[0]) that we want to write or recurse through. + # This may contain files from subdirectories if + # max_manifest_depth == 0 for this directory. + # * _queued_trees: Directories that should be written as separate + # streams to the Collection. + # This function handles the smallest piece of work currently queued + # (current file, then current directory, then next directory) until + # no work remains. The _work_THING methods each do a unit of work on + # THING. _queue_THING methods add a THING to the work queue. + while True: + if self._queued_file: + self._work_file() + elif self._queued_dirents: + self._work_dirents() + elif self._queued_trees: + self._work_trees() + else: + break + + def _work_file(self): + while True: + buf = self._queued_file.read(self.KEEP_BLOCK_SIZE) + if not buf: + break + self.write(buf) + self.finish_current_file() + if self._close_file: + self._queued_file.close() + self._close_file = None + self._queued_file = None + + def _work_dirents(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + if stream_name != self.current_stream_name(): + self.start_new_stream(stream_name) + while self._queued_dirents: + dirent = self._queued_dirents.popleft() target = os.path.join(path, dirent) if os.path.isdir(target): - todo += [[target, - os.path.join(stream_name, dirent), - max_manifest_depth-1]] + self._queue_tree(target, + os.path.join(stream_name, dirent), + max_manifest_depth - 1) else: - self.start_new_file(dirent) - with open(target, 'rb') as f: - while True: - buf = f.read(2**26) - if len(buf) == 0: - break - self.write(buf) - self.finish_current_stream() - map(lambda x: self.write_directory_tree(*x), todo) + self._queue_file(target, dirent) + break + if not self._queued_dirents: + self._queued_trees.popleft() + + def _work_trees(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + make_dirents = (util.listdir_recursive if (max_manifest_depth == 0) + else os.listdir) + d = make_dirents(path) + if len(d) > 0: + self._queue_dirents(stream_name, d) + else: + self._queued_trees.popleft() + + def _queue_file(self, source, filename=None): + assert (self._queued_file is None), "tried to queue more than one file" + if not hasattr(source, 'read'): + source = open(source, 'rb') + self._close_file = True + else: + self._close_file = False + if filename is None: + filename = os.path.basename(source.name) + self.start_new_file(filename) + self._queued_file = source + + def _queue_dirents(self, stream_name, dirents): + assert (not self._queued_dirents), "tried to queue more than one tree" + self._queued_dirents = deque(sorted(dirents)) + + def _queue_tree(self, path, stream_name, max_manifest_depth): + self._queued_trees.append((path, stream_name, max_manifest_depth)) + + def write_file(self, source, filename=None): + self._queue_file(source, filename) + self.do_queued_work() + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + self._queue_tree(path, stream_name, max_manifest_depth) + self.do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): for s in newdata: self.write(s) return - self._data_buffer += [newdata] + self._data_buffer.append(newdata) self._data_buffer_len += len(newdata) self._current_stream_length += len(newdata) while self._data_buffer_len >= self.KEEP_BLOCK_SIZE: @@ -202,8 +357,9 @@ class CollectionWriter(object): def flush_data(self): data_buffer = ''.join(self._data_buffer) - if data_buffer != '': - self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])] + if data_buffer: + self._current_stream_locators.append( + self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE])) self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]] self._data_buffer_len = len(self._data_buffer[0]) @@ -231,10 +387,12 @@ class CollectionWriter(object): (self._current_stream_length - self._current_file_pos, self._current_file_pos, self._current_stream_name)) - self._current_stream_files += [[self._current_file_pos, - self._current_stream_length - self._current_file_pos, - self._current_file_name]] + self._current_stream_files.append([ + self._current_file_pos, + self._current_stream_length - self._current_file_pos, + self._current_file_name]) self._current_file_pos = self._current_stream_length + self._current_file_name = None def start_new_stream(self, newstreamname='.'): self.finish_current_stream() @@ -252,18 +410,18 @@ class CollectionWriter(object): def finish_current_stream(self): self.finish_current_file() self.flush_data() - if len(self._current_stream_files) == 0: + if not self._current_stream_files: pass - elif self._current_stream_name == None: + elif self._current_stream_name is None: raise errors.AssertionError( "Cannot finish an unnamed stream (%d bytes in %d files)" % (self._current_stream_length, len(self._current_stream_files))) else: - if len(self._current_stream_locators) == 0: - self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR] - self._finished_streams += [[self._current_stream_name, - self._current_stream_locators, - self._current_stream_files]] + if not self._current_stream_locators: + self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) + self._finished_streams.append([self._current_stream_name, + self._current_stream_locators, + self._current_stream_files]) self._current_stream_files = [] self._current_stream_length = 0 self._current_stream_locators = [] @@ -272,7 +430,23 @@ class CollectionWriter(object): self._current_file_name = None def finish(self): - return Keep.put(self.manifest_text()) + # Store the manifest in Keep and return its locator. + return self._my_keep().put(self.manifest_text()) + + def stripped_manifest(self): + """ + Return the manifest for the current collection with all permission + hints removed from the locators in the manifest. + """ + raw = self.manifest_text() + clean = '' + for line in raw.split("\n"): + fields = line.split() + if len(fields) > 0: + locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x) + for x in fields[1:-1] ] + clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n" + return clean def manifest_text(self): self.finish_current_stream() @@ -285,14 +459,114 @@ class CollectionWriter(object): manifest += ' ' + ' '.join(stream[1]) manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) manifest += "\n" - - #print 'writer',manifest - #print 'after reader',CollectionReader(manifest).manifest_text() - return CollectionReader(manifest).manifest_text() + if manifest: + return CollectionReader(manifest, self._api_client).manifest_text() + else: + return "" def data_locators(self): ret = [] for name, locators, files in self._finished_streams: ret += locators return ret + + +class ResumableCollectionWriter(CollectionWriter): + STATE_PROPS = ['_current_stream_files', '_current_stream_length', + '_current_stream_locators', '_current_stream_name', + '_current_file_name', '_current_file_pos', '_close_file', + '_data_buffer', '_dependencies', '_finished_streams', + '_queued_dirents', '_queued_trees'] + + def __init__(self, api_client=None, num_retries=0): + self._dependencies = {} + super(ResumableCollectionWriter, self).__init__( + api_client, num_retries=num_retries) + + @classmethod + def from_state(cls, state, *init_args, **init_kwargs): + # Try to build a new writer from scratch with the given state. + # If the state is not suitable to resume (because files have changed, + # been deleted, aren't predictable, etc.), raise a + # StaleWriterStateError. Otherwise, return the initialized writer. + # The caller is responsible for calling writer.do_queued_work() + # appropriately after it's returned. + writer = cls(*init_args, **init_kwargs) + for attr_name in cls.STATE_PROPS: + attr_value = state[attr_name] + attr_class = getattr(writer, attr_name).__class__ + # Coerce the value into the same type as the initial value, if + # needed. + if attr_class not in (type(None), attr_value.__class__): + attr_value = attr_class(attr_value) + setattr(writer, attr_name, attr_value) + # Check dependencies before we try to resume anything. + if any(KeepLocator(ls).permission_expired() + for ls in writer._current_stream_locators): + raise errors.StaleWriterStateError( + "locators include expired permission hint") + writer.check_dependencies() + if state['_current_file'] is not None: + path, pos = state['_current_file'] + try: + writer._queued_file = open(path, 'rb') + writer._queued_file.seek(pos) + except IOError as error: + raise errors.StaleWriterStateError( + "failed to reopen active file {}: {}".format(path, error)) + return writer + + def check_dependencies(self): + for path, orig_stat in self._dependencies.items(): + if not S_ISREG(orig_stat[ST_MODE]): + raise errors.StaleWriterStateError("{} not file".format(path)) + try: + now_stat = tuple(os.stat(path)) + except OSError as error: + raise errors.StaleWriterStateError( + "failed to stat {}: {}".format(path, error)) + if ((not S_ISREG(now_stat[ST_MODE])) or + (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or + (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): + raise errors.StaleWriterStateError("{} changed".format(path)) + + def dump_state(self, copy_func=lambda x: x): + state = {attr: copy_func(getattr(self, attr)) + for attr in self.STATE_PROPS} + if self._queued_file is None: + state['_current_file'] = None + else: + state['_current_file'] = (os.path.realpath(self._queued_file.name), + self._queued_file.tell()) + return state + + def _queue_file(self, source, filename=None): + try: + src_path = os.path.realpath(source) + except Exception: + raise errors.AssertionError("{} not a file path".format(source)) + try: + path_stat = os.stat(src_path) + except OSError as stat_error: + path_stat = None + super(ResumableCollectionWriter, self)._queue_file(source, filename) + fd_stat = os.fstat(self._queued_file.fileno()) + if not S_ISREG(fd_stat.st_mode): + # We won't be able to resume from this cache anyway, so don't + # worry about further checks. + self._dependencies[source] = tuple(fd_stat) + elif path_stat is None: + raise errors.AssertionError( + "could not stat {}: {}".format(source, stat_error)) + elif path_stat.st_ino != fd_stat.st_ino: + raise errors.AssertionError( + "{} changed between open and stat calls".format(source)) + else: + self._dependencies[src_path] = tuple(fd_stat) + + def write(self, data): + if self._queued_file is None: + raise errors.AssertionError( + "resumable writer can't accept unsourced data") + return super(ResumableCollectionWriter, self).write(data)