X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/2d6c425e78bc5712c63b4ebecb05077b0e30da1f..85ec07b3d4f0a14285a0d20db682c282e490e7e7:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index a44d42b6ac..d03790411a 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -1,6 +1,16 @@ # Copyright (C) The Arvados Authors. All rights reserved. # # SPDX-License-Identifier: Apache-2.0 +"""Tools to work with Arvados collections + +This module provides high-level interfaces to create, read, and update +Arvados collections. Most users will want to instantiate `Collection` +objects, and use methods like `Collection.open` and `Collection.mkdirs` to +read and write data in the collection. Refer to the Arvados Python SDK +cookbook for [an introduction to using the Collection class][cookbook]. + +[cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections +""" from __future__ import absolute_import from future.utils import listitems, listvalues, viewkeys @@ -35,30 +45,65 @@ import arvados.util import arvados.events as events from arvados.retry import retry_method -_logger = logging.getLogger('arvados.collection') - - -if sys.version_info >= (3, 0): - TextIOWrapper = io.TextIOWrapper +from typing import ( + Any, + Callable, + Dict, + IO, + Iterator, + List, + Mapping, + Optional, + Tuple, + Union, +) + +if sys.version_info < (3, 8): + from typing_extensions import Literal else: - class TextIOWrapper(io.TextIOWrapper): - """To maintain backward compatibility, cast str to unicode in - write('foo'). + from typing import Literal - """ - def write(self, data): - if isinstance(data, basestring): - data = unicode(data) - return super(TextIOWrapper, self).write(data) +_logger = logging.getLogger('arvados.collection') +ADD = "add" +"""Argument value for `Collection` methods to represent an added item""" +DEL = "del" +"""Argument value for `Collection` methods to represent a removed item""" +MOD = "mod" +"""Argument value for `Collection` methods to represent a modified item""" +TOK = "tok" +"""Argument value for `Collection` methods to represent an item with token differences""" +FILE = "file" +"""`create_type` value for `Collection.find_or_create`""" +COLLECTION = "collection" +"""`create_type` value for `Collection.find_or_create`""" + +ChangeList = List[Union[ + Tuple[Literal[ADD, DEL], str, 'Collection'], + Tuple[Literal[MOD, TOK], str, 'Collection', 'Collection'], +]] +ChangeType = Literal[ADD, DEL, MOD, TOK] +CollectionItem = Union[ArvadosFile, 'Collection'] +ChangeCallback = Callable[[ChangeType, 'Collection', str, CollectionItem], object] +CreateType = Literal[COLLECTION, FILE] +Properties = Dict[str, Any] +StorageClasses = List[str] class CollectionBase(object): - """Abstract base class for Collection classes.""" + """Abstract base class for Collection classes + + .. ATTENTION:: Internal + This class is meant to be used by other parts of the SDK. User code + should instantiate or subclass `Collection` or one of its subclasses + directly. + """ def __enter__(self): + """Enter a context block with this collection instance""" return self def __exit__(self, exc_type, exc_value, traceback): + """Exit a context block with this collection instance""" pass def _my_keep(self): @@ -67,12 +112,13 @@ class CollectionBase(object): num_retries=self.num_retries) return self._keep_client - def stripped_manifest(self): - """Get the manifest with locator hints stripped. + def stripped_manifest(self) -> str: + """Create a copy of the collection manifest with only size hints - Return the manifest for the current collection with all - non-portable hints (i.e., permission signatures and other - hints other than size hints) removed from the locators. + This method returns a string with the current collection's manifest + text with all non-portable locator hints like permission hints and + remote cluster hints removed. The only hints in the returned manifest + will be size hints. """ raw = self.manifest_text() clean = [] @@ -111,707 +157,379 @@ class _WriterFile(_FileLikeObjectBase): self.dest.flush_data() -class CollectionWriter(CollectionBase): - """Deprecated, use Collection instead.""" +class RichCollectionBase(CollectionBase): + """Base class for Collection classes - def __init__(self, api_client=None, num_retries=0, replication=None): - """Instantiate a CollectionWriter. + .. ATTENTION:: Internal + This class is meant to be used by other parts of the SDK. User code + should instantiate or subclass `Collection` or one of its subclasses + directly. + """ - CollectionWriter lets you build a new Arvados Collection from scratch. - Write files to it. The CollectionWriter will upload data to Keep as - appropriate, and provide you with the Collection manifest text when - you're finished. + def __init__(self, parent=None): + self.parent = parent + self._committed = False + self._has_remote_blocks = False + self._callback = None + self._items = {} - Arguments: - * api_client: The API client to use to look up Collections. If not - provided, CollectionReader will build one from available Arvados - configuration. - * num_retries: The default number of times to retry failed - service requests. Default 0. You may change this value - after instantiation, but note those changes may not - propagate to related objects like the Keep client. - * replication: The number of copies of each block to store. - If this argument is None or not supplied, replication is - the server-provided default if available, otherwise 2. - """ - self._api_client = api_client - self.num_retries = num_retries - self.replication = (2 if replication is None else replication) - self._keep_client = None - self._data_buffer = [] - self._data_buffer_len = 0 - self._current_stream_files = [] - self._current_stream_length = 0 - self._current_stream_locators = [] - self._current_stream_name = '.' - self._current_file_name = None - self._current_file_pos = 0 - self._finished_streams = [] - self._close_file = None - self._queued_file = None - self._queued_dirents = deque() - self._queued_trees = deque() - self._last_open = None + def _my_api(self): + raise NotImplementedError() - def __exit__(self, exc_type, exc_value, traceback): - if exc_type is None: - self.finish() + def _my_keep(self): + raise NotImplementedError() - def do_queued_work(self): - # The work queue consists of three pieces: - # * _queued_file: The file object we're currently writing to the - # Collection. - # * _queued_dirents: Entries under the current directory - # (_queued_trees[0]) that we want to write or recurse through. - # This may contain files from subdirectories if - # max_manifest_depth == 0 for this directory. - # * _queued_trees: Directories that should be written as separate - # streams to the Collection. - # This function handles the smallest piece of work currently queued - # (current file, then current directory, then next directory) until - # no work remains. The _work_THING methods each do a unit of work on - # THING. _queue_THING methods add a THING to the work queue. - while True: - if self._queued_file: - self._work_file() - elif self._queued_dirents: - self._work_dirents() - elif self._queued_trees: - self._work_trees() - else: - break + def _my_block_manager(self): + raise NotImplementedError() - def _work_file(self): - while True: - buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) - if not buf: - break - self.write(buf) - self.finish_current_file() - if self._close_file: - self._queued_file.close() - self._close_file = None - self._queued_file = None + def writable(self) -> bool: + """Indicate whether this collection object can be modified - def _work_dirents(self): - path, stream_name, max_manifest_depth = self._queued_trees[0] - if stream_name != self.current_stream_name(): - self.start_new_stream(stream_name) - while self._queued_dirents: - dirent = self._queued_dirents.popleft() - target = os.path.join(path, dirent) - if os.path.isdir(target): - self._queue_tree(target, - os.path.join(stream_name, dirent), - max_manifest_depth - 1) - else: - self._queue_file(target, dirent) - break - if not self._queued_dirents: - self._queued_trees.popleft() + This method returns `False` if this object is a `CollectionReader`, + else `True`. + """ + raise NotImplementedError() - def _work_trees(self): - path, stream_name, max_manifest_depth = self._queued_trees[0] - d = arvados.util.listdir_recursive( - path, max_depth = (None if max_manifest_depth == 0 else 0)) - if d: - self._queue_dirents(stream_name, d) - else: - self._queued_trees.popleft() + def root_collection(self) -> 'Collection': + """Get this collection's root collection object - def _queue_file(self, source, filename=None): - assert (self._queued_file is None), "tried to queue more than one file" - if not hasattr(source, 'read'): - source = open(source, 'rb') - self._close_file = True - else: - self._close_file = False - if filename is None: - filename = os.path.basename(source.name) - self.start_new_file(filename) - self._queued_file = source + If you open a subcollection with `Collection.find`, calling this method + on that subcollection returns the source Collection object. + """ + raise NotImplementedError() - def _queue_dirents(self, stream_name, dirents): - assert (not self._queued_dirents), "tried to queue more than one tree" - self._queued_dirents = deque(sorted(dirents)) + def stream_name(self) -> str: + """Get the name of the manifest stream represented by this collection - def _queue_tree(self, path, stream_name, max_manifest_depth): - self._queued_trees.append((path, stream_name, max_manifest_depth)) + If you open a subcollection with `Collection.find`, calling this method + on that subcollection returns the name of the stream you opened. + """ + raise NotImplementedError() - def write_file(self, source, filename=None): - self._queue_file(source, filename) - self.do_queued_work() + @synchronized + def has_remote_blocks(self) -> bool: + """Indiciate whether the collection refers to remote data - def write_directory_tree(self, - path, stream_name='.', max_manifest_depth=-1): - self._queue_tree(path, stream_name, max_manifest_depth) - self.do_queued_work() + Returns `True` if the collection manifest includes any Keep locators + with a remote hint (`+R`), else `False`. + """ + if self._has_remote_blocks: + return True + for item in self: + if self[item].has_remote_blocks(): + return True + return False - def write(self, newdata): - if isinstance(newdata, bytes): - pass - elif isinstance(newdata, str): - newdata = newdata.encode() - elif hasattr(newdata, '__iter__'): - for s in newdata: - self.write(s) - return - self._data_buffer.append(newdata) - self._data_buffer_len += len(newdata) - self._current_stream_length += len(newdata) - while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: - self.flush_data() + @synchronized + def set_has_remote_blocks(self, val: bool) -> None: + """Cache whether this collection refers to remote blocks - def open(self, streampath, filename=None): - """open(streampath[, filename]) -> file-like object + .. ATTENTION:: Internal + This method is only meant to be used by other Collection methods. - Pass in the path of a file to write to the Collection, either as a - single string or as two separate stream name and file name arguments. - This method returns a file-like object you can write to add it to the - Collection. + Set this collection's cached "has remote blocks" flag to the given + value. + """ + self._has_remote_blocks = val + if self.parent: + self.parent.set_has_remote_blocks(val) - You may only have one file object from the Collection open at a time, - so be sure to close the object when you're done. Using the object in - a with statement makes that easy:: + @must_be_writable + @synchronized + def find_or_create( + self, + path: str, + create_type: CreateType, + ) -> CollectionItem: + """Get the item at the given path, creating it if necessary + + If `path` refers to a stream in this collection, returns a + corresponding `Subcollection` object. If `path` refers to a file in + this collection, returns a corresponding + `arvados.arvfile.ArvadosFile` object. If `path` does not exist in + this collection, then this method creates a new object and returns + it, creating parent streams as needed. The type of object created is + determined by the value of `create_type`. - with cwriter.open('./doc/page1.txt') as outfile: - outfile.write(page1_data) - with cwriter.open('./doc/page2.txt') as outfile: - outfile.write(page2_data) + Arguments: + + * path: str --- The path to find or create within this collection. + + * create_type: Literal[COLLECTION, FILE] --- The type of object to + create at `path` if one does not exist. Passing `COLLECTION` + creates a stream and returns the corresponding + `Subcollection`. Passing `FILE` creates a new file and returns the + corresponding `arvados.arvfile.ArvadosFile`. """ - if filename is None: - streampath, filename = split(streampath) - if self._last_open and not self._last_open.closed: - raise errors.AssertionError( - u"can't open '{}' when '{}' is still open".format( - filename, self._last_open.name)) - if streampath != self.current_stream_name(): - self.start_new_stream(streampath) - self.set_current_file_name(filename) - self._last_open = _WriterFile(self, filename) - return self._last_open + pathcomponents = path.split("/", 1) + if pathcomponents[0]: + item = self._items.get(pathcomponents[0]) + if len(pathcomponents) == 1: + if item is None: + # create new file + if create_type == COLLECTION: + item = Subcollection(self, pathcomponents[0]) + else: + item = ArvadosFile(self, pathcomponents[0]) + self._items[pathcomponents[0]] = item + self.set_committed(False) + self.notify(ADD, self, pathcomponents[0], item) + return item + else: + if item is None: + # create new collection + item = Subcollection(self, pathcomponents[0]) + self._items[pathcomponents[0]] = item + self.set_committed(False) + self.notify(ADD, self, pathcomponents[0], item) + if isinstance(item, RichCollectionBase): + return item.find_or_create(pathcomponents[1], create_type) + else: + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) + else: + return self - def flush_data(self): - data_buffer = b''.join(self._data_buffer) - if data_buffer: - self._current_stream_locators.append( - self._my_keep().put( - data_buffer[0:config.KEEP_BLOCK_SIZE], - copies=self.replication)) - self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] - self._data_buffer_len = len(self._data_buffer[0]) + @synchronized + def find(self, path: str) -> CollectionItem: + """Get the item at the given path - def start_new_file(self, newfilename=None): - self.finish_current_file() - self.set_current_file_name(newfilename) + If `path` refers to a stream in this collection, returns a + corresponding `Subcollection` object. If `path` refers to a file in + this collection, returns a corresponding + `arvados.arvfile.ArvadosFile` object. If `path` does not exist in + this collection, then this method raises `NotADirectoryError`. - def set_current_file_name(self, newfilename): - if re.search(r'[\t\n]', newfilename): - raise errors.AssertionError( - "Manifest filenames cannot contain whitespace: %s" % - newfilename) - elif re.search(r'\x00', newfilename): - raise errors.AssertionError( - "Manifest filenames cannot contain NUL characters: %s" % - newfilename) - self._current_file_name = newfilename + Arguments: - def current_file_name(self): - return self._current_file_name + * path: str --- The path to find or create within this collection. + """ + if not path: + raise errors.ArgumentError("Parameter 'path' is empty.") - def finish_current_file(self): - if self._current_file_name is None: - if self._current_file_pos == self._current_stream_length: - return - raise errors.AssertionError( - "Cannot finish an unnamed file " + - "(%d bytes at offset %d in '%s' stream)" % - (self._current_stream_length - self._current_file_pos, - self._current_file_pos, - self._current_stream_name)) - self._current_stream_files.append([ - self._current_file_pos, - self._current_stream_length - self._current_file_pos, - self._current_file_name]) - self._current_file_pos = self._current_stream_length - self._current_file_name = None + pathcomponents = path.split("/", 1) + if pathcomponents[0] == '': + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) - def start_new_stream(self, newstreamname='.'): - self.finish_current_stream() - self.set_current_stream_name(newstreamname) + item = self._items.get(pathcomponents[0]) + if item is None: + return None + elif len(pathcomponents) == 1: + return item + else: + if isinstance(item, RichCollectionBase): + if pathcomponents[1]: + return item.find(pathcomponents[1]) + else: + return item + else: + raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) - def set_current_stream_name(self, newstreamname): - if re.search(r'[\t\n]', newstreamname): - raise errors.AssertionError( - "Manifest stream names cannot contain whitespace: '%s'" % - (newstreamname)) - self._current_stream_name = '.' if newstreamname=='' else newstreamname + @synchronized + def mkdirs(self, path: str) -> 'Subcollection': + """Create and return a subcollection at `path` - def current_stream_name(self): - return self._current_stream_name + If `path` exists within this collection, raises `FileExistsError`. + Otherwise, creates a stream at that path and returns the + corresponding `Subcollection`. + """ + if self.find(path) != None: + raise IOError(errno.EEXIST, "Directory or file exists", path) - def finish_current_stream(self): - self.finish_current_file() - self.flush_data() - if not self._current_stream_files: - pass - elif self._current_stream_name is None: - raise errors.AssertionError( - "Cannot finish an unnamed stream (%d bytes in %d files)" % - (self._current_stream_length, len(self._current_stream_files))) - else: - if not self._current_stream_locators: - self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) - self._finished_streams.append([self._current_stream_name, - self._current_stream_locators, - self._current_stream_files]) - self._current_stream_files = [] - self._current_stream_length = 0 - self._current_stream_locators = [] - self._current_stream_name = None - self._current_file_pos = 0 - self._current_file_name = None + return self.find_or_create(path, COLLECTION) - def finish(self): - """Store the manifest in Keep and return its locator. + def open( + self, + path: str, + mode: str="r", + encoding: Optional[str]=None, + ) -> IO: + """Open a file-like object within the collection - This is useful for storing manifest fragments (task outputs) - temporarily in Keep during a Crunch job. + This method returns a file-like object that can read and/or write the + file located at `path` within the collection. If you attempt to write + a `path` that does not exist, the file is created with `find_or_create`. + If the file cannot be opened for any other reason, this method raises + `OSError` with an appropriate errno. - In other cases you should make a collection instead, by - sending manifest_text() to the API server's "create - collection" endpoint. - """ - return self._my_keep().put(self.manifest_text().encode(), - copies=self.replication) + Arguments: - def portable_data_hash(self): - stripped = self.stripped_manifest().encode() - return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) + * path: str --- The path of the file to open within this collection - def manifest_text(self): - self.finish_current_stream() - manifest = '' + * mode: str --- The mode to open this file. Supports all the same + values as `builtins.open`. - for stream in self._finished_streams: - if not re.search(r'^\.(/.*)?$', stream[0]): - manifest += './' - manifest += stream[0].replace(' ', '\\040') - manifest += ' ' + ' '.join(stream[1]) - manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) - manifest += "\n" + * encoding: str | None --- The text encoding of the file. Only used + when the file is opened in text mode. The default is + platform-dependent. + """ + if not re.search(r'^[rwa][bt]?\+?$', mode): + raise errors.ArgumentError("Invalid mode {!r}".format(mode)) - return manifest + if mode[0] == 'r' and '+' not in mode: + fclass = ArvadosFileReader + arvfile = self.find(path) + elif not self.writable(): + raise IOError(errno.EROFS, "Collection is read only") + else: + fclass = ArvadosFileWriter + arvfile = self.find_or_create(path, FILE) - def data_locators(self): - ret = [] - for name, locators, files in self._finished_streams: - ret += locators - return ret + if arvfile is None: + raise IOError(errno.ENOENT, "File not found", path) + if not isinstance(arvfile, ArvadosFile): + raise IOError(errno.EISDIR, "Is a directory", path) - def save_new(self, name=None): - return self._api_client.collections().create( - ensure_unique_name=True, - body={ - 'name': name, - 'manifest_text': self.manifest_text(), - }).execute(num_retries=self.num_retries) + if mode[0] == 'w': + arvfile.truncate(0) + binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) + f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) + if 'b' not in mode: + bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader + f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) + return f -class ResumableCollectionWriter(CollectionWriter): - """Deprecated, use Collection instead.""" + def modified(self) -> bool: + """Indicate whether this collection has an API server record - STATE_PROPS = ['_current_stream_files', '_current_stream_length', - '_current_stream_locators', '_current_stream_name', - '_current_file_name', '_current_file_pos', '_close_file', - '_data_buffer', '_dependencies', '_finished_streams', - '_queued_dirents', '_queued_trees'] + Returns `False` if this collection corresponds to a record loaded from + the API server, `True` otherwise. + """ + return not self.committed() - def __init__(self, api_client=None, **kwargs): - self._dependencies = {} - super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) + @synchronized + def committed(self): + """Indicate whether this collection has an API server record - @classmethod - def from_state(cls, state, *init_args, **init_kwargs): - # Try to build a new writer from scratch with the given state. - # If the state is not suitable to resume (because files have changed, - # been deleted, aren't predictable, etc.), raise a - # StaleWriterStateError. Otherwise, return the initialized writer. - # The caller is responsible for calling writer.do_queued_work() - # appropriately after it's returned. - writer = cls(*init_args, **init_kwargs) - for attr_name in cls.STATE_PROPS: - attr_value = state[attr_name] - attr_class = getattr(writer, attr_name).__class__ - # Coerce the value into the same type as the initial value, if - # needed. - if attr_class not in (type(None), attr_value.__class__): - attr_value = attr_class(attr_value) - setattr(writer, attr_name, attr_value) - # Check dependencies before we try to resume anything. - if any(KeepLocator(ls).permission_expired() - for ls in writer._current_stream_locators): - raise errors.StaleWriterStateError( - "locators include expired permission hint") - writer.check_dependencies() - if state['_current_file'] is not None: - path, pos = state['_current_file'] - try: - writer._queued_file = open(path, 'rb') - writer._queued_file.seek(pos) - except IOError as error: - raise errors.StaleWriterStateError( - u"failed to reopen active file {}: {}".format(path, error)) - return writer + Returns `True` if this collection corresponds to a record loaded from + the API server, `False` otherwise. + """ + return self._committed - def check_dependencies(self): - for path, orig_stat in listitems(self._dependencies): - if not S_ISREG(orig_stat[ST_MODE]): - raise errors.StaleWriterStateError(u"{} not file".format(path)) - try: - now_stat = tuple(os.stat(path)) - except OSError as error: - raise errors.StaleWriterStateError( - u"failed to stat {}: {}".format(path, error)) - if ((not S_ISREG(now_stat[ST_MODE])) or - (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or - (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): - raise errors.StaleWriterStateError(u"{} changed".format(path)) + @synchronized + def set_committed(self, value: bool=True): + """Cache whether this collection has an API server record - def dump_state(self, copy_func=lambda x: x): - state = {attr: copy_func(getattr(self, attr)) - for attr in self.STATE_PROPS} - if self._queued_file is None: - state['_current_file'] = None - else: - state['_current_file'] = (os.path.realpath(self._queued_file.name), - self._queued_file.tell()) - return state + .. ATTENTION:: Internal + This method is only meant to be used by other Collection methods. - def _queue_file(self, source, filename=None): - try: - src_path = os.path.realpath(source) - except Exception: - raise errors.AssertionError(u"{} not a file path".format(source)) - try: - path_stat = os.stat(src_path) - except OSError as stat_error: - path_stat = None - super(ResumableCollectionWriter, self)._queue_file(source, filename) - fd_stat = os.fstat(self._queued_file.fileno()) - if not S_ISREG(fd_stat.st_mode): - # We won't be able to resume from this cache anyway, so don't - # worry about further checks. - self._dependencies[source] = tuple(fd_stat) - elif path_stat is None: - raise errors.AssertionError( - u"could not stat {}: {}".format(source, stat_error)) - elif path_stat.st_ino != fd_stat.st_ino: - raise errors.AssertionError( - u"{} changed between open and stat calls".format(source)) + Set this collection's cached "committed" flag to the given + value and propagates it as needed. + """ + if value == self._committed: + return + if value: + for k,v in listitems(self._items): + v.set_committed(True) + self._committed = True else: - self._dependencies[src_path] = tuple(fd_stat) + self._committed = False + if self.parent is not None: + self.parent.set_committed(False) - def write(self, data): - if self._queued_file is None: - raise errors.AssertionError( - "resumable writer can't accept unsourced data") - return super(ResumableCollectionWriter, self).write(data) + @synchronized + def __iter__(self) -> Iterator[str]: + """Iterate names of streams and files in this collection - -ADD = "add" -DEL = "del" -MOD = "mod" -TOK = "tok" -FILE = "file" -COLLECTION = "collection" - -class RichCollectionBase(CollectionBase): - """Base class for Collections and Subcollections. - - Implements the majority of functionality relating to accessing items in the - Collection. - - """ - - def __init__(self, parent=None): - self.parent = parent - self._committed = False - self._has_remote_blocks = False - self._callback = None - self._items = {} - - def _my_api(self): - raise NotImplementedError() - - def _my_keep(self): - raise NotImplementedError() - - def _my_block_manager(self): - raise NotImplementedError() - - def writable(self): - raise NotImplementedError() - - def root_collection(self): - raise NotImplementedError() - - def notify(self, event, collection, name, item): - raise NotImplementedError() - - def stream_name(self): - raise NotImplementedError() - - - @synchronized - def has_remote_blocks(self): - """Recursively check for a +R segment locator signature.""" - - if self._has_remote_blocks: - return True - for item in self: - if self[item].has_remote_blocks(): - return True - return False + This method does not recurse. It only iterates the contents of this + collection's corresponding stream. + """ + return iter(viewkeys(self._items)) @synchronized - def set_has_remote_blocks(self, val): - self._has_remote_blocks = val - if self.parent: - self.parent.set_has_remote_blocks(val) - - @must_be_writable - @synchronized - def find_or_create(self, path, create_type): - """Recursively search the specified file path. - - May return either a `Collection` or `ArvadosFile`. If not found, will - create a new item at the specified path based on `create_type`. Will - create intermediate subcollections needed to contain the final item in - the path. - - :create_type: - One of `arvados.collection.FILE` or - `arvados.collection.COLLECTION`. If the path is not found, and value - of create_type is FILE then create and return a new ArvadosFile for - the last path component. If COLLECTION, then create and return a new - Collection for the last path component. + def __getitem__(self, k: str) -> CollectionItem: + """Get a `arvados.arvfile.ArvadosFile` or `Subcollection` in this collection + This method does not recurse. If you want to search a path, use + `RichCollectionBase.find` instead. """ - - pathcomponents = path.split("/", 1) - if pathcomponents[0]: - item = self._items.get(pathcomponents[0]) - if len(pathcomponents) == 1: - if item is None: - # create new file - if create_type == COLLECTION: - item = Subcollection(self, pathcomponents[0]) - else: - item = ArvadosFile(self, pathcomponents[0]) - self._items[pathcomponents[0]] = item - self.set_committed(False) - self.notify(ADD, self, pathcomponents[0], item) - return item - else: - if item is None: - # create new collection - item = Subcollection(self, pathcomponents[0]) - self._items[pathcomponents[0]] = item - self.set_committed(False) - self.notify(ADD, self, pathcomponents[0], item) - if isinstance(item, RichCollectionBase): - return item.find_or_create(pathcomponents[1], create_type) - else: - raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) - else: - return self + return self._items[k] @synchronized - def find(self, path): - """Recursively search the specified file path. - - May return either a Collection or ArvadosFile. Return None if not - found. - If path is invalid (ex: starts with '/'), an IOError exception will be - raised. + def __contains__(self, k: str) -> bool: + """Indicate whether this collection has an item with this name + This method does not recurse. It you want to check a path, use + `RichCollectionBase.exists` instead. """ - if not path: - raise errors.ArgumentError("Parameter 'path' is empty.") - - pathcomponents = path.split("/", 1) - if pathcomponents[0] == '': - raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) - - item = self._items.get(pathcomponents[0]) - if item is None: - return None - elif len(pathcomponents) == 1: - return item - else: - if isinstance(item, RichCollectionBase): - if pathcomponents[1]: - return item.find(pathcomponents[1]) - else: - return item - else: - raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0]) + return k in self._items @synchronized - def mkdirs(self, path): - """Recursive subcollection create. - - Like `os.makedirs()`. Will create intermediate subcollections needed - to contain the leaf subcollection path. + def __len__(self): + """Get the number of items directly contained in this collection + This method does not recurse. It only counts the streams and files + in this collection's corresponding stream. """ + return len(self._items) - if self.find(path) != None: - raise IOError(errno.EEXIST, "Directory or file exists", path) - - return self.find_or_create(path, COLLECTION) - - def open(self, path, mode="r", encoding=None): - """Open a file-like object for access. - - :path: - path to a file in the collection - :mode: - a string consisting of "r", "w", or "a", optionally followed - by "b" or "t", optionally followed by "+". - :"b": - binary mode: write() accepts bytes, read() returns bytes. - :"t": - text mode (default): write() accepts strings, read() returns strings. - :"r": - opens for reading - :"r+": - opens for reading and writing. Reads/writes share a file pointer. - :"w", "w+": - truncates to 0 and opens for reading and writing. Reads/writes share a file pointer. - :"a", "a+": - opens for reading and writing. All writes are appended to - the end of the file. Writing does not affect the file pointer for - reading. + @must_be_writable + @synchronized + def __delitem__(self, p: str) -> None: + """Delete an item from this collection's stream + This method does not recurse. If you want to remove an item by a + path, use `RichCollectionBase.remove` instead. """ - - if not re.search(r'^[rwa][bt]?\+?$', mode): - raise errors.ArgumentError("Invalid mode {!r}".format(mode)) - - if mode[0] == 'r' and '+' not in mode: - fclass = ArvadosFileReader - arvfile = self.find(path) - elif not self.writable(): - raise IOError(errno.EROFS, "Collection is read only") - else: - fclass = ArvadosFileWriter - arvfile = self.find_or_create(path, FILE) - - if arvfile is None: - raise IOError(errno.ENOENT, "File not found", path) - if not isinstance(arvfile, ArvadosFile): - raise IOError(errno.EISDIR, "Is a directory", path) - - if mode[0] == 'w': - arvfile.truncate(0) - - binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:]) - f = fclass(arvfile, mode=binmode, num_retries=self.num_retries) - if 'b' not in mode: - bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader - f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding) - return f - - def modified(self): - """Determine if the collection has been modified since last commited.""" - return not self.committed() - - @synchronized - def committed(self): - """Determine if the collection has been committed to the API server.""" - return self._committed + del self._items[p] + self.set_committed(False) + self.notify(DEL, self, p, None) @synchronized - def set_committed(self, value=True): - """Recursively set committed flag. + def keys(self) -> Iterator[str]: + """Iterate names of streams and files in this collection - If value is True, set committed to be True for this and all children. - - If value is False, set committed to be False for this and all parents. + This method does not recurse. It only iterates the contents of this + collection's corresponding stream. """ - if value == self._committed: - return - if value: - for k,v in listitems(self._items): - v.set_committed(True) - self._committed = True - else: - self._committed = False - if self.parent is not None: - self.parent.set_committed(False) - - @synchronized - def __iter__(self): - """Iterate over names of files and collections contained in this collection.""" - return iter(viewkeys(self._items)) + return self._items.keys() @synchronized - def __getitem__(self, k): - """Get a file or collection that is directly contained by this collection. - - If you want to search a path, use `find()` instead. + def values(self) -> List[CollectionItem]: + """Get a list of objects in this collection's stream + The return value includes a `Subcollection` for every stream, and an + `arvados.arvfile.ArvadosFile` for every file, directly within this + collection's stream. This method does not recurse. """ - return self._items[k] - - @synchronized - def __contains__(self, k): - """Test if there is a file or collection a directly contained by this collection.""" - return k in self._items + return listvalues(self._items) @synchronized - def __len__(self): - """Get the number of items directly contained in this collection.""" - return len(self._items) + def items(self) -> List[Tuple[str, CollectionItem]]: + """Get a list of `(name, object)` tuples from this collection's stream - @must_be_writable - @synchronized - def __delitem__(self, p): - """Delete an item by name which is directly contained by this collection.""" - del self._items[p] - self.set_committed(False) - self.notify(DEL, self, p, None) + The return value includes a `Subcollection` for every stream, and an + `arvados.arvfile.ArvadosFile` for every file, directly within this + collection's stream. This method does not recurse. + """ + return listitems(self._items) - @synchronized - def keys(self): - """Get a list of names of files and collections directly contained in this collection.""" - return self._items.keys() + def exists(self, path: str) -> bool: + """Indicate whether this collection includes an item at `path` - @synchronized - def values(self): - """Get a list of files and collection objects directly contained in this collection.""" - return listvalues(self._items) + This method returns `True` if `path` refers to a stream or file within + this collection, else `False`. - @synchronized - def items(self): - """Get a list of (name, object) tuples directly contained in this collection.""" - return listitems(self._items) + Arguments: - def exists(self, path): - """Test if there is a file or collection at `path`.""" + * path: str --- The path to check for existence within this collection + """ return self.find(path) is not None @must_be_writable @synchronized - def remove(self, path, recursive=False): - """Remove the file or subcollection (directory) at `path`. + def remove(self, path: str, recursive: bool=False) -> None: + """Remove the file or stream at `path` - :recursive: - Specify whether to remove non-empty subcollections (True), or raise an error (False). - """ + Arguments: + * path: str --- The path of the item to remove from the collection + + * recursive: bool --- Controls the method's behavior if `path` refers + to a nonempty stream. If `False` (the default), this method raises + `OSError` with errno `ENOTEMPTY`. If `True`, this method removes all + items under the stream. + """ if not path: raise errors.ArgumentError("Parameter 'path' is empty.") @@ -827,7 +545,7 @@ class RichCollectionBase(CollectionBase): self.set_committed(False) self.notify(DEL, self, pathcomponents[0], deleteditem) else: - item.remove(pathcomponents[1]) + item.remove(pathcomponents[1], recursive=recursive) def _clonefrom(self, source): for k,v in listitems(source): @@ -838,26 +556,33 @@ class RichCollectionBase(CollectionBase): @must_be_writable @synchronized - def add(self, source_obj, target_name, overwrite=False, reparent=False): - """Copy or move a file or subcollection to this collection. + def add( + self, + source_obj: CollectionItem, + target_name: str, + overwrite: bool=False, + reparent: bool=False, + ) -> None: + """Copy or move a file or subcollection object to this collection - :source_obj: - An ArvadosFile, or Subcollection object + Arguments: - :target_name: - Destination item name. If the target name already exists and is a - file, this will raise an error unless you specify `overwrite=True`. + * source_obj: arvados.arvfile.ArvadosFile | Subcollection --- The file or subcollection + to add to this collection - :overwrite: - Whether to overwrite target file if it already exists. + * target_name: str --- The path inside this collection where + `source_obj` should be added. - :reparent: - If True, source_obj will be moved from its parent collection to this collection. - If False, source_obj will be copied and the parent collection will be - unmodified. + * overwrite: bool --- Controls the behavior of this method when the + collection already contains an object at `target_name`. If `False` + (the default), this method will raise `FileExistsError`. If `True`, + the object at `target_name` will be replaced with `source_obj`. + * reparent: bool --- Controls whether this method copies or moves + `source_obj`. If `False` (the default), `source_obj` is copied into + this collection. If `True`, `source_obj` is moved into this + collection. """ - if target_name in self and not overwrite: raise IOError(errno.EEXIST, "File already exists", target_name) @@ -924,92 +649,117 @@ class RichCollectionBase(CollectionBase): @must_be_writable @synchronized - def copy(self, source, target_path, source_collection=None, overwrite=False): - """Copy a file or subcollection to a new path in this collection. + def copy( + self, + source: Union[str, CollectionItem], + target_path: str, + source_collection: Optional['RichCollectionBase']=None, + overwrite: bool=False, + ) -> None: + """Copy a file or subcollection object to this collection - :source: - A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object. + Arguments: - :target_path: - Destination file or path. If the target path already exists and is a - subcollection, the item will be placed inside the subcollection. If - the target path already exists and is a file, this will raise an error - unless you specify `overwrite=True`. + * source: str | arvados.arvfile.ArvadosFile | + arvados.collection.Subcollection --- The file or subcollection to + add to this collection. If `source` is a str, the object will be + found by looking up this path from `source_collection` (see + below). - :source_collection: - Collection to copy `source_path` from (default `self`) + * target_path: str --- The path inside this collection where the + source object should be added. - :overwrite: - Whether to overwrite target file if it already exists. - """ + * source_collection: arvados.collection.Collection | None --- The + collection to find the source object from when `source` is a + path. Defaults to the current collection (`self`). + * overwrite: bool --- Controls the behavior of this method when the + collection already contains an object at `target_path`. If `False` + (the default), this method will raise `FileExistsError`. If `True`, + the object at `target_path` will be replaced with `source_obj`. + """ source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True) target_dir.add(source_obj, target_name, overwrite, False) @must_be_writable @synchronized - def rename(self, source, target_path, source_collection=None, overwrite=False): - """Move a file or subcollection from `source_collection` to a new path in this collection. + def rename( + self, + source: Union[str, CollectionItem], + target_path: str, + source_collection: Optional['RichCollectionBase']=None, + overwrite: bool=False, + ) -> None: + """Move a file or subcollection object to this collection + + Arguments: - :source: - A string with a path to source file or subcollection. + * source: str | arvados.arvilfe.ArvadosFile | + arvados.collection.Subcollection --- The file or subcollection to + add to this collection. If `source` is a str, the object will be + found by looking up this path from `source_collection` (see + below). - :target_path: - Destination file or path. If the target path already exists and is a - subcollection, the item will be placed inside the subcollection. If - the target path already exists and is a file, this will raise an error - unless you specify `overwrite=True`. + * target_path: str --- The path inside this collection where the + source object should be added. - :source_collection: - Collection to copy `source_path` from (default `self`) + * source_collection: arvados.collection.Collection | None --- The + collection to find the source object from when `source` is a + path. Defaults to the current collection (`self`). - :overwrite: - Whether to overwrite target file if it already exists. + * overwrite: bool --- Controls the behavior of this method when the + collection already contains an object at `target_path`. If `False` + (the default), this method will raise `FileExistsError`. If `True`, + the object at `target_path` will be replaced with `source_obj`. """ - source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False) if not source_obj.writable(): raise IOError(errno.EROFS, "Source collection is read only", source) target_dir.add(source_obj, target_name, overwrite, True) - def portable_manifest_text(self, stream_name="."): - """Get the manifest text for this collection, sub collections and files. + def portable_manifest_text(self, stream_name: str=".") -> str: + """Get the portable manifest text for this collection - This method does not flush outstanding blocks to Keep. It will return - a normalized manifest with access tokens stripped. + The portable manifest text is normalized, and does not include access + tokens. This method does not flush outstanding blocks to Keep. - :stream_name: - Name to use for this stream (directory) + Arguments: + * stream_name: str --- The name to use for this collection's stream in + the generated manifest. Default `'.'`. """ return self._get_manifest_text(stream_name, True, True) @synchronized - def manifest_text(self, stream_name=".", strip=False, normalize=False, - only_committed=False): - """Get the manifest text for this collection, sub collections and files. - - This method will flush outstanding blocks to Keep. By default, it will - not normalize an unmodified manifest or strip access tokens. + def manifest_text( + self, + stream_name: str=".", + strip: bool=False, + normalize: bool=False, + only_committed: bool=False, + ) -> str: + """Get the manifest text for this collection - :stream_name: - Name to use for this stream (directory) + Arguments: - :strip: - If True, remove signing tokens from block locators if present. - If False (default), block locators are left unchanged. + * stream_name: str --- The name to use for this collection's stream in + the generated manifest. Default `'.'`. - :normalize: - If True, always export the manifest text in normalized form - even if the Collection is not modified. If False (default) and the collection - is not modified, return the original manifest text even if it is not - in normalized form. + * strip: bool --- Controls whether or not the returned manifest text + includes access tokens. If `False` (the default), the manifest text + will include access tokens. If `True`, the manifest text will not + include access tokens. - :only_committed: - If True, don't commit pending blocks. + * normalize: bool --- Controls whether or not the returned manifest + text is normalized. Default `False`. + * only_committed: bool --- Controls whether or not this method uploads + pending data to Keep before building and returning the manifest text. + If `False` (the default), this method will finish uploading all data + to Keep, then return the final manifest. If `True`, this method will + build and return a manifest that only refers to the data that has + finished uploading at the time this method was called. """ - if not only_committed: self._my_block_manager().commit_all() return self._get_manifest_text(stream_name, strip, normalize, @@ -1088,11 +838,27 @@ class RichCollectionBase(CollectionBase): return remote_blocks @synchronized - def diff(self, end_collection, prefix=".", holding_collection=None): - """Generate list of add/modify/delete actions. + def diff( + self, + end_collection: 'RichCollectionBase', + prefix: str=".", + holding_collection: Optional['Collection']=None, + ) -> ChangeList: + """Build a list of differences between this collection and another + + Arguments: + + * end_collection: arvados.collection.RichCollectionBase --- A + collection object with the desired end state. The returned diff + list will describe how to go from the current collection object + `self` to `end_collection`. - When given to `apply`, will change `self` to match `end_collection` + * prefix: str --- The name to use for this collection's stream in + the diff list. Default `'.'`. + * holding_collection: arvados.collection.Collection | None --- A + collection object used to hold objects for the returned diff + list. By default, a new empty collection is created. """ changes = [] if holding_collection is None: @@ -1114,12 +880,20 @@ class RichCollectionBase(CollectionBase): @must_be_writable @synchronized - def apply(self, changes): - """Apply changes from `diff`. + def apply(self, changes: ChangeList) -> None: + """Apply a list of changes from to this collection - If a change conflicts with a local change, it will be saved to an - alternate path indicating the conflict. + This method takes a list of changes generated by + `RichCollectionBase.diff` and applies it to this + collection. Afterward, the state of this collection object will + match the state of `end_collection` passed to `diff`. If a change + conflicts with a local change, it will be saved to an alternate path + indicating the conflict. + Arguments: + + * changes: arvados.collection.ChangeList --- The list of differences + generated by `RichCollectionBase.diff`. """ if changes: self.set_committed(False) @@ -1161,8 +935,8 @@ class RichCollectionBase(CollectionBase): # else, the file is modified or already removed, in either # case we don't want to try to remove it. - def portable_data_hash(self): - """Get the portable data hash for this collection's manifest.""" + def portable_data_hash(self) -> str: + """Get the portable data hash for this collection's manifest""" if self._manifest_locator and self.committed(): # If the collection is already saved on the API server, and it's committed # then return API server's PDH response. @@ -1172,25 +946,64 @@ class RichCollectionBase(CollectionBase): return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) @synchronized - def subscribe(self, callback): + def subscribe(self, callback: ChangeCallback) -> None: + """Set a notify callback for changes to this collection + + Arguments: + + * callback: arvados.collection.ChangeCallback --- The callable to + call each time the collection is changed. + """ if self._callback is None: self._callback = callback else: raise errors.ArgumentError("A callback is already set on this collection.") @synchronized - def unsubscribe(self): + def unsubscribe(self) -> None: + """Remove any notify callback set for changes to this collection""" if self._callback is not None: self._callback = None @synchronized - def notify(self, event, collection, name, item): + def notify( + self, + event: ChangeType, + collection: 'RichCollectionBase', + name: str, + item: CollectionItem, + ) -> None: + """Notify any subscribed callback about a change to this collection + + .. ATTENTION:: Internal + This method is only meant to be used by other Collection methods. + + If a callback has been registered with `RichCollectionBase.subscribe`, + it will be called with information about a change to this collection. + Then this notification will be propagated to this collection's root. + + Arguments: + + * event: Literal[ADD, DEL, MOD, TOK] --- The type of modification to + the collection. + + * collection: arvados.collection.RichCollectionBase --- The + collection that was modified. + + * name: str --- The name of the file or stream within `collection` that + was modified. + + * item: arvados.arvfile.ArvadosFile | + arvados.collection.Subcollection --- The new contents at `name` + within `collection`. + """ if self._callback: self._callback(event, collection, name, item) self.root_collection().notify(event, collection, name, item) @synchronized - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: + """Indicate whether this collection object is equal to another""" if other is self: return True if not isinstance(other, RichCollectionBase): @@ -1204,102 +1017,97 @@ class RichCollectionBase(CollectionBase): return False return True - def __ne__(self, other): + def __ne__(self, other: Any) -> bool: + """Indicate whether this collection object is not equal to another""" return not self.__eq__(other) @synchronized - def flush(self): - """Flush bufferblocks to Keep.""" + def flush(self) -> None: + """Upload any pending data to Keep""" for e in listvalues(self): e.flush() class Collection(RichCollectionBase): - """Represents the root of an Arvados Collection. - - This class is threadsafe. The root collection object, all subcollections - and files are protected by a single lock (i.e. each access locks the entire - collection). - - Brief summary of - useful methods: - - :To read an existing file: - `c.open("myfile", "r")` - - :To write a new file: - `c.open("myfile", "w")` - - :To determine if a file exists: - `c.find("myfile") is not None` - - :To copy a file: - `c.copy("source", "dest")` - - :To delete a file: - `c.remove("myfile")` - - :To save to an existing collection record: - `c.save()` - - :To save a new collection record: - `c.save_new()` + """Read and manipulate an Arvados collection - :To merge remote changes into this object: - `c.update()` - - Must be associated with an API server Collection record (during - initialization, or using `save_new`) to use `save` or `update` + This class provides a high-level interface to create, read, and update + Arvados collections and their contents. Refer to the Arvados Python SDK + cookbook for [an introduction to using the Collection class][cookbook]. + [cookbook]: https://doc.arvados.org/sdk/python/cookbook.html#working-with-collections """ - def __init__(self, manifest_locator_or_text=None, - api_client=None, - keep_client=None, - num_retries=None, - parent=None, - apiconfig=None, - block_manager=None, - replication_desired=None, - storage_classes_desired=None, - put_threads=None, - get_threads=None): - """Collection constructor. - - :manifest_locator_or_text: - An Arvados collection UUID, portable data hash, raw manifest - text, or (if creating an empty collection) None. - - :parent: - the parent Collection, may be None. - - :apiconfig: - A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN. - Prefer this over supplying your own api_client and keep_client (except in testing). - Will use default config settings if not specified. - - :api_client: - The API client object to use for requests. If not specified, create one using `apiconfig`. - - :keep_client: - the Keep client to use for requests. If not specified, create one using `apiconfig`. - - :num_retries: - the number of retries for API and Keep requests. - - :block_manager: - the block manager to use. If not specified, create one. - - :replication_desired: - How many copies should Arvados maintain. If None, API server default - configuration applies. If not None, this value will also be used - for determining the number of block copies being written. - - :storage_classes_desired: - A list of storage class names where to upload the data. If None, - the keep client is expected to store the data into the cluster's - default storage class(es). + def __init__(self, manifest_locator_or_text: Optional[str]=None, + api_client: Optional['arvados.api_resources.ArvadosAPIClient']=None, + keep_client: Optional['arvados.keep.KeepClient']=None, + num_retries: int=10, + parent: Optional['Collection']=None, + apiconfig: Optional[Mapping[str, str]]=None, + block_manager: Optional['arvados.arvfile._BlockManager']=None, + replication_desired: Optional[int]=None, + storage_classes_desired: Optional[List[str]]=None, + put_threads: Optional[int]=None): + """Initialize a Collection object + + Arguments: + * manifest_locator_or_text: str | None --- This string can contain a + collection manifest text, portable data hash, or UUID. When given a + portable data hash or UUID, this instance will load a collection + record from the API server. Otherwise, this instance will represent a + new collection without an API server record. The default value `None` + instantiates a new collection with an empty manifest. + + * api_client: arvados.api_resources.ArvadosAPIClient | None --- The + Arvados API client object this instance uses to make requests. If + none is given, this instance creates its own client using the + settings from `apiconfig` (see below). If your client instantiates + many Collection objects, you can help limit memory utilization by + calling `arvados.api.api` to construct an + `arvados.safeapi.ThreadSafeApiCache`, and use that as the `api_client` + for every Collection. + + * keep_client: arvados.keep.KeepClient | None --- The Keep client + object this instance uses to make requests. If none is given, this + instance creates its own client using its `api_client`. + + * num_retries: int --- The number of times that client requests are + retried. Default 10. + + * parent: arvados.collection.Collection | None --- The parent Collection + object of this instance, if any. This argument is primarily used by + other Collection methods; user client code shouldn't need to use it. + + * apiconfig: Mapping[str, str] | None --- A mapping with entries for + `ARVADOS_API_HOST`, `ARVADOS_API_TOKEN`, and optionally + `ARVADOS_API_HOST_INSECURE`. When no `api_client` is provided, the + Collection object constructs one from these settings. If no + mapping is provided, calls `arvados.config.settings` to get these + parameters from user configuration. + + * block_manager: arvados.arvfile._BlockManager | None --- The + _BlockManager object used by this instance to coordinate reading + and writing Keep data blocks. If none is given, this instance + constructs its own. This argument is primarily used by other + Collection methods; user client code shouldn't need to use it. + + * replication_desired: int | None --- This controls both the value of + the `replication_desired` field on API collection records saved by + this class, as well as the number of Keep services that the object + writes new data blocks to. If none is given, uses the default value + configured for the cluster. + + * storage_classes_desired: list[str] | None --- This controls both + the value of the `storage_classes_desired` field on API collection + records saved by this class, as well as selecting which specific + Keep services the object writes new data blocks to. If none is + given, defaults to an empty list. + + * put_threads: int | None --- The number of threads to run + simultaneously to upload data blocks to Keep. This value is used when + building a new `block_manager`. It is unused when a `block_manager` + is provided. """ if storage_classes_desired and type(storage_classes_desired) is not list: @@ -1308,18 +1116,22 @@ class Collection(RichCollectionBase): super(Collection, self).__init__(parent) self._api_client = api_client self._keep_client = keep_client + + # Use the keep client from ThreadSafeApiCache + if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache): + self._keep_client = self._api_client.keep + self._block_manager = block_manager self.replication_desired = replication_desired self._storage_classes_desired = storage_classes_desired self.put_threads = put_threads - self.get_threads = get_threads if apiconfig: self._config = apiconfig else: self._config = config.settings() - self.num_retries = num_retries if num_retries is not None else 0 + self.num_retries = num_retries self._manifest_locator = None self._manifest_text = None self._portable_data_hash = None @@ -1349,19 +1161,33 @@ class Collection(RichCollectionBase): except errors.SyntaxError as e: raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None - def storage_classes_desired(self): + def storage_classes_desired(self) -> List[str]: + """Get this collection's `storage_classes_desired` value""" return self._storage_classes_desired or [] - def root_collection(self): + def root_collection(self) -> 'Collection': return self - def get_properties(self): + def get_properties(self) -> Properties: + """Get this collection's properties + + This method always returns a dict. If this collection object does not + have an associated API record, or that record does not have any + properties set, this method returns an empty dict. + """ if self._api_response and self._api_response["properties"]: return self._api_response["properties"] else: return {} - def get_trash_at(self): + def get_trash_at(self) -> Optional[datetime.datetime]: + """Get this collection's `trash_at` field + + This method parses the `trash_at` field of the collection's API + record and returns a datetime from it. If that field is not set, or + this collection object does not have an associated API record, + returns None. + """ if self._api_response and self._api_response["trash_at"]: try: return ciso8601.parse_datetime(self._api_response["trash_at"]) @@ -1370,21 +1196,57 @@ class Collection(RichCollectionBase): else: return None - def stream_name(self): + def stream_name(self) -> str: return "." - def writable(self): + def writable(self) -> bool: return True @synchronized - def known_past_version(self, modified_at_and_portable_data_hash): + def known_past_version( + self, + modified_at_and_portable_data_hash: Tuple[Optional[str], Optional[str]] + ) -> bool: + """Indicate whether an API record for this collection has been seen before + + As this collection object loads records from the API server, it records + their `modified_at` and `portable_data_hash` fields. This method accepts + a 2-tuple with values for those fields, and returns `True` if the + combination was previously loaded. + """ return modified_at_and_portable_data_hash in self._past_versions @synchronized @retry_method - def update(self, other=None, num_retries=None): - """Merge the latest collection on the API server with the current collection.""" + def update( + self, + other: Optional['Collection']=None, + num_retries: Optional[int]=None, + ) -> None: + """Merge another collection's contents into this one + + This method compares the manifest of this collection instance with + another, then updates this instance's manifest with changes from the + other, renaming files to flag conflicts where necessary. + + When called without any arguments, this method reloads the collection's + API record, and updates this instance with any changes that have + appeared server-side. If this instance does not have a corresponding + API record, this method raises `arvados.errors.ArgumentError`. + + Arguments: + + * other: arvados.collection.Collection | None --- The collection + whose contents should be merged into this instance. When not + provided, this method reloads this collection's API record and + constructs a Collection object from it. If this instance does not + have a corresponding API record, this method raises + `arvados.errors.ArgumentError`. + * num_retries: int | None --- The number of times to retry reloading + the collection's API record from the API server. If not specified, + uses the `num_retries` provided when this instance was constructed. + """ if other is None: if self._manifest_locator is None: raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid") @@ -1406,7 +1268,7 @@ class Collection(RichCollectionBase): @synchronized def _my_api(self): if self._api_client is None: - self._api_client = ThreadSafeApiCache(self._config) + self._api_client = ThreadSafeApiCache(self._config, version='v1') if self._keep_client is None: self._keep_client = self._api_client.keep return self._api_client @@ -1430,8 +1292,7 @@ class Collection(RichCollectionBase): copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, - storage_classes_func=self.storage_classes_desired, - get_threads=self.get_threads,) + storage_classes_func=self.storage_classes_desired) return self._block_manager def _remember_api_response(self, response): @@ -1478,32 +1339,65 @@ class Collection(RichCollectionBase): return self def __exit__(self, exc_type, exc_value, traceback): - """Support scoped auto-commit in a with: block.""" - if exc_type is None: + """Exit a context with this collection instance + + If no exception was raised inside the context block, and this + collection is writable and has a corresponding API record, that + record will be updated to match the state of this instance at the end + of the block. + """ + if exc_type is None: if self.writable() and self._has_collection_uuid(): self.save() self.stop_threads() - def stop_threads(self): + def stop_threads(self) -> None: + """Stop background Keep upload/download threads""" if self._block_manager is not None: self._block_manager.stop_threads() @synchronized - def manifest_locator(self): - """Get the manifest locator, if any. - - The manifest locator will be set when the collection is loaded from an - API server record or the portable data hash of a manifest. - - The manifest locator will be None if the collection is newly created or - was created directly from manifest text. The method `save_new()` will - assign a manifest locator. - + def manifest_locator(self) -> Optional[str]: + """Get this collection's manifest locator, if any + + * If this collection instance is associated with an API record with a + UUID, return that. + * Otherwise, if this collection instance was loaded from an API record + by portable data hash, return that. + * Otherwise, return `None`. """ return self._manifest_locator @synchronized - def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None): + def clone( + self, + new_parent: Optional['Collection']=None, + new_name: Optional[str]=None, + readonly: bool=False, + new_config: Optional[Mapping[str, str]]=None, + ) -> 'Collection': + """Create a Collection object with the same contents as this instance + + This method creates a new Collection object with contents that match + this instance's. The new collection will not be associated with any API + record. + + Arguments: + + * new_parent: arvados.collection.Collection | None --- This value is + passed to the new Collection's constructor as the `parent` + argument. + + * new_name: str | None --- This value is unused. + + * readonly: bool --- If this value is true, this method constructs and + returns a `CollectionReader`. Otherwise, it returns a mutable + `Collection`. Default `False`. + + * new_config: Mapping[str, str] | None --- This value is passed to the + new Collection's constructor as `apiconfig`. If no value is provided, + defaults to the configuration passed to this instance's constructor. + """ if new_config is None: new_config = self._config if readonly: @@ -1515,31 +1409,31 @@ class Collection(RichCollectionBase): return newcollection @synchronized - def api_response(self): - """Returns information about this Collection fetched from the API server. - - If the Collection exists in Keep but not the API server, currently - returns None. Future versions may provide a synthetic response. + def api_response(self) -> Optional[Dict[str, Any]]: + """Get this instance's associated API record + If this Collection instance has an associated API record, return it. + Otherwise, return `None`. """ return self._api_response - def find_or_create(self, path, create_type): - """See `RichCollectionBase.find_or_create`""" + def find_or_create( + self, + path: str, + create_type: CreateType, + ) -> CollectionItem: if path == ".": return self else: return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type) - def find(self, path): - """See `RichCollectionBase.find`""" + def find(self, path: str) -> CollectionItem: if path == ".": return self else: return super(Collection, self).find(path[2:] if path.startswith("./") else path) - def remove(self, path, recursive=False): - """See `RichCollectionBase.remove`""" + def remove(self, path: str, recursive: bool=False) -> None: if path == ".": raise errors.ArgumentError("Cannot remove '.'") else: @@ -1548,49 +1442,52 @@ class Collection(RichCollectionBase): @must_be_writable @synchronized @retry_method - def save(self, - properties=None, - storage_classes=None, - trash_at=None, - merge=True, - num_retries=None, - preserve_version=False): - """Save collection to an existing collection record. - - Commit pending buffer blocks to Keep, merge with remote record (if - merge=True, the default), and update the collection record. Returns - the current manifest text. - - Will raise AssertionError if not associated with a collection record on - the API server. If you want to save a manifest to Keep only, see - `save_new()`. - - :properties: - Additional properties of collection. This value will replace any existing - properties of collection. - - :storage_classes: - Specify desirable storage classes to be used when writing data to Keep. - - :trash_at: - A collection is *expiring* when it has a *trash_at* time in the future. - An expiring collection can be accessed as normal, - but is scheduled to be trashed automatically at the *trash_at* time. - - :merge: - Update and merge remote changes before saving. Otherwise, any - remote changes will be ignored and overwritten. - - :num_retries: - Retry count on API calls (if None, use the collection default) - - :preserve_version: - If True, indicate that the collection content being saved right now - should be preserved in a version snapshot if the collection record is - updated in the future. Requires that the API server has - Collections.CollectionVersioning enabled, if not, setting this will - raise an exception. + def save( + self, + properties: Optional[Properties]=None, + storage_classes: Optional[StorageClasses]=None, + trash_at: Optional[datetime.datetime]=None, + merge: bool=True, + num_retries: Optional[int]=None, + preserve_version: bool=False, + ) -> str: + """Save collection to an existing API record + + This method updates the instance's corresponding API record to match + the instance's state. If this instance does not have a corresponding API + record yet, raises `AssertionError`. (To create a new API record, use + `Collection.save_new`.) This method returns the saved collection + manifest. + Arguments: + + * properties: dict[str, Any] | None --- If provided, the API record will + be updated with these properties. Note this will completely replace + any existing properties. + + * storage_classes: list[str] | None --- If provided, the API record will + be updated with this value in the `storage_classes_desired` field. + This value will also be saved on the instance and used for any + changes that follow. + + * trash_at: datetime.datetime | None --- If provided, the API record + will be updated with this value in the `trash_at` field. + + * merge: bool --- If `True` (the default), this method will first + reload this collection's API record, and merge any new contents into + this instance before saving changes. See `Collection.update` for + details. + + * num_retries: int | None --- The number of times to retry reloading + the collection's API record from the API server. If not specified, + uses the `num_retries` provided when this instance was constructed. + + * preserve_version: bool --- This value will be passed to directly + to the underlying API call. If `True`, the Arvados API will + preserve the versions of this collection both immediately before + and after the update. If `True` when the API server is not + configured with collection versioning, this method raises + `arvados.errors.ArgumentError`. """ if properties and type(properties) is not dict: raise errors.ArgumentError("properties must be dictionary type.") @@ -1654,60 +1551,66 @@ class Collection(RichCollectionBase): @must_be_writable @synchronized @retry_method - def save_new(self, name=None, - create_collection_record=True, - owner_uuid=None, - properties=None, - storage_classes=None, - trash_at=None, - ensure_unique_name=False, - num_retries=None, - preserve_version=False): - """Save collection to a new collection record. - - Commit pending buffer blocks to Keep and, when create_collection_record - is True (default), create a new collection record. After creating a - new collection record, this Collection object will be associated with - the new record used by `save()`. Returns the current manifest text. - - :name: - The collection name. - - :create_collection_record: - If True, create a collection record on the API server. - If False, only commit blocks to Keep and return the manifest text. - - :owner_uuid: - the user, or project uuid that will own this collection. - If None, defaults to the current user. - - :properties: - Additional properties of collection. This value will replace any existing - properties of collection. - - :storage_classes: - Specify desirable storage classes to be used when writing data to Keep. - - :trash_at: - A collection is *expiring* when it has a *trash_at* time in the future. - An expiring collection can be accessed as normal, - but is scheduled to be trashed automatically at the *trash_at* time. - - :ensure_unique_name: - If True, ask the API server to rename the collection - if it conflicts with a collection with the same name and owner. If - False, a name conflict will result in an error. - - :num_retries: - Retry count on API calls (if None, use the collection default) - - :preserve_version: - If True, indicate that the collection content being saved right now - should be preserved in a version snapshot if the collection record is - updated in the future. Requires that the API server has - Collections.CollectionVersioning enabled, if not, setting this will - raise an exception. + def save_new( + self, + name: Optional[str]=None, + create_collection_record: bool=True, + owner_uuid: Optional[str]=None, + properties: Optional[Properties]=None, + storage_classes: Optional[StorageClasses]=None, + trash_at: Optional[datetime.datetime]=None, + ensure_unique_name: bool=False, + num_retries: Optional[int]=None, + preserve_version: bool=False, + ): + """Save collection to a new API record + + This method finishes uploading new data blocks and (optionally) + creates a new API collection record with the provided data. If a new + record is created, this instance becomes associated with that record + for future updates like `save()`. This method returns the saved + collection manifest. + + Arguments: + + * name: str | None --- The `name` field to use on the new collection + record. If not specified, a generic default name is generated. + + * create_collection_record: bool --- If `True` (the default), creates a + collection record on the API server. If `False`, the method finishes + all data uploads and only returns the resulting collection manifest + without sending it to the API server. + + * owner_uuid: str | None --- The `owner_uuid` field to use on the + new collection record. + * properties: dict[str, Any] | None --- The `properties` field to use on + the new collection record. + + * storage_classes: list[str] | None --- The + `storage_classes_desired` field to use on the new collection record. + + * trash_at: datetime.datetime | None --- The `trash_at` field to use + on the new collection record. + + * ensure_unique_name: bool --- This value is passed directly to the + Arvados API when creating the collection record. If `True`, the API + server may modify the submitted `name` to ensure the collection's + `name`+`owner_uuid` combination is unique. If `False` (the default), + if a collection already exists with this same `name`+`owner_uuid` + combination, creating a collection record will raise a validation + error. + + * num_retries: int | None --- The number of times to retry reloading + the collection's API record from the API server. If not specified, + uses the `num_retries` provided when this instance was constructed. + + * preserve_version: bool --- This value will be passed to directly + to the underlying API call. If `True`, the Arvados API will + preserve the versions of this collection both immediately before + and after the update. If `True` when the API server is not + configured with collection versioning, this method raises + `arvados.errors.ArgumentError`. """ if properties and type(properties) is not dict: raise errors.ArgumentError("properties must be dictionary type.") @@ -1768,7 +1671,7 @@ class Collection(RichCollectionBase): _segment_re = re.compile(r'(\d+):(\d+):(\S+)') def _unescape_manifest_path(self, path): - return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) + return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path) @synchronized def _import_manifest(self, manifest_text): @@ -1845,17 +1748,24 @@ class Collection(RichCollectionBase): self.set_committed(True) @synchronized - def notify(self, event, collection, name, item): + def notify( + self, + event: ChangeType, + collection: 'RichCollectionBase', + name: str, + item: CollectionItem, + ) -> None: if self._callback: self._callback(event, collection, name, item) class Subcollection(RichCollectionBase): - """This is a subdirectory within a collection that doesn't have its own API - server record. - - Subcollection locking falls under the umbrella lock of its root collection. + """Read and manipulate a stream/directory within an Arvados collection + This class represents a single stream (like a directory) within an Arvados + `Collection`. It is returned by `Collection.find` and provides the same API. + Operations that work on the API collection record propagate to the parent + `Collection` object. """ def __init__(self, parent, name): @@ -1865,10 +1775,10 @@ class Subcollection(RichCollectionBase): self.name = name self.num_retries = parent.num_retries - def root_collection(self): + def root_collection(self) -> 'Collection': return self.parent.root_collection() - def writable(self): + def writable(self) -> bool: return self.root_collection().writable() def _my_api(self): @@ -1880,11 +1790,15 @@ class Subcollection(RichCollectionBase): def _my_block_manager(self): return self.root_collection()._my_block_manager() - def stream_name(self): + def stream_name(self) -> str: return os.path.join(self.parent.stream_name(), self.name) @synchronized - def clone(self, new_parent, new_name): + def clone( + self, + new_parent: Optional['Collection']=None, + new_name: Optional[str]=None, + ) -> 'Subcollection': c = Subcollection(new_parent, new_name) c._clonefrom(self) return c @@ -1911,11 +1825,11 @@ class Subcollection(RichCollectionBase): class CollectionReader(Collection): - """A read-only collection object. - - Initialize from a collection UUID or portable data hash, or raw - manifest text. See `Collection` constructor for detailed options. + """Read-only `Collection` subclass + This class will never create or update any API collection records. You can + use this class for additional code safety when you only need to read + existing collections. """ def __init__(self, manifest_locator_or_text, *args, **kwargs): self._in_init = True @@ -1929,7 +1843,7 @@ class CollectionReader(Collection): # all_streams() and all_files() self._streams = None - def writable(self): + def writable(self) -> bool: return self._in_init def _populate_streams(orig_func): @@ -1946,16 +1860,10 @@ class CollectionReader(Collection): return orig_func(self, *args, **kwargs) return populate_streams_wrapper + @arvados.util._deprecated('3.0', 'Collection iteration') @_populate_streams def normalize(self): - """Normalize the streams returned by `all_streams`. - - This method is kept for backwards compatability and only affects the - behavior of `all_streams()` and `all_files()` - - """ - - # Rearrange streams + """Normalize the streams returned by `all_streams`""" streams = {} for s in self.all_streams(): for f in s.all_files(): @@ -1969,13 +1877,436 @@ class CollectionReader(Collection): self._streams = [normalize_stream(s, streams[s]) for s in sorted(streams)] + + @arvados.util._deprecated('3.0', 'Collection iteration') @_populate_streams def all_streams(self): return [StreamReader(s, self._my_keep(), num_retries=self.num_retries) for s in self._streams] + @arvados.util._deprecated('3.0', 'Collection iteration') @_populate_streams def all_files(self): for s in self.all_streams(): for f in s.all_files(): yield f + + +class CollectionWriter(CollectionBase): + """Create a new collection from scratch + + .. WARNING:: Deprecated + This class is deprecated. Prefer `arvados.collection.Collection` + instead. + """ + + @arvados.util._deprecated('3.0', 'arvados.collection.Collection') + def __init__(self, api_client=None, num_retries=0, replication=None): + """Instantiate a CollectionWriter. + + CollectionWriter lets you build a new Arvados Collection from scratch. + Write files to it. The CollectionWriter will upload data to Keep as + appropriate, and provide you with the Collection manifest text when + you're finished. + + Arguments: + * api_client: The API client to use to look up Collections. If not + provided, CollectionReader will build one from available Arvados + configuration. + * num_retries: The default number of times to retry failed + service requests. Default 0. You may change this value + after instantiation, but note those changes may not + propagate to related objects like the Keep client. + * replication: The number of copies of each block to store. + If this argument is None or not supplied, replication is + the server-provided default if available, otherwise 2. + """ + self._api_client = api_client + self.num_retries = num_retries + self.replication = (2 if replication is None else replication) + self._keep_client = None + self._data_buffer = [] + self._data_buffer_len = 0 + self._current_stream_files = [] + self._current_stream_length = 0 + self._current_stream_locators = [] + self._current_stream_name = '.' + self._current_file_name = None + self._current_file_pos = 0 + self._finished_streams = [] + self._close_file = None + self._queued_file = None + self._queued_dirents = deque() + self._queued_trees = deque() + self._last_open = None + + def __exit__(self, exc_type, exc_value, traceback): + if exc_type is None: + self.finish() + + def do_queued_work(self): + # The work queue consists of three pieces: + # * _queued_file: The file object we're currently writing to the + # Collection. + # * _queued_dirents: Entries under the current directory + # (_queued_trees[0]) that we want to write or recurse through. + # This may contain files from subdirectories if + # max_manifest_depth == 0 for this directory. + # * _queued_trees: Directories that should be written as separate + # streams to the Collection. + # This function handles the smallest piece of work currently queued + # (current file, then current directory, then next directory) until + # no work remains. The _work_THING methods each do a unit of work on + # THING. _queue_THING methods add a THING to the work queue. + while True: + if self._queued_file: + self._work_file() + elif self._queued_dirents: + self._work_dirents() + elif self._queued_trees: + self._work_trees() + else: + break + + def _work_file(self): + while True: + buf = self._queued_file.read(config.KEEP_BLOCK_SIZE) + if not buf: + break + self.write(buf) + self.finish_current_file() + if self._close_file: + self._queued_file.close() + self._close_file = None + self._queued_file = None + + def _work_dirents(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + if stream_name != self.current_stream_name(): + self.start_new_stream(stream_name) + while self._queued_dirents: + dirent = self._queued_dirents.popleft() + target = os.path.join(path, dirent) + if os.path.isdir(target): + self._queue_tree(target, + os.path.join(stream_name, dirent), + max_manifest_depth - 1) + else: + self._queue_file(target, dirent) + break + if not self._queued_dirents: + self._queued_trees.popleft() + + def _work_trees(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + d = arvados.util.listdir_recursive( + path, max_depth = (None if max_manifest_depth == 0 else 0)) + if d: + self._queue_dirents(stream_name, d) + else: + self._queued_trees.popleft() + + def _queue_file(self, source, filename=None): + assert (self._queued_file is None), "tried to queue more than one file" + if not hasattr(source, 'read'): + source = open(source, 'rb') + self._close_file = True + else: + self._close_file = False + if filename is None: + filename = os.path.basename(source.name) + self.start_new_file(filename) + self._queued_file = source + + def _queue_dirents(self, stream_name, dirents): + assert (not self._queued_dirents), "tried to queue more than one tree" + self._queued_dirents = deque(sorted(dirents)) + + def _queue_tree(self, path, stream_name, max_manifest_depth): + self._queued_trees.append((path, stream_name, max_manifest_depth)) + + def write_file(self, source, filename=None): + self._queue_file(source, filename) + self.do_queued_work() + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + self._queue_tree(path, stream_name, max_manifest_depth) + self.do_queued_work() + + def write(self, newdata): + if isinstance(newdata, bytes): + pass + elif isinstance(newdata, str): + newdata = newdata.encode() + elif hasattr(newdata, '__iter__'): + for s in newdata: + self.write(s) + return + self._data_buffer.append(newdata) + self._data_buffer_len += len(newdata) + self._current_stream_length += len(newdata) + while self._data_buffer_len >= config.KEEP_BLOCK_SIZE: + self.flush_data() + + def open(self, streampath, filename=None): + """open(streampath[, filename]) -> file-like object + + Pass in the path of a file to write to the Collection, either as a + single string or as two separate stream name and file name arguments. + This method returns a file-like object you can write to add it to the + Collection. + + You may only have one file object from the Collection open at a time, + so be sure to close the object when you're done. Using the object in + a with statement makes that easy:: + + with cwriter.open('./doc/page1.txt') as outfile: + outfile.write(page1_data) + with cwriter.open('./doc/page2.txt') as outfile: + outfile.write(page2_data) + """ + if filename is None: + streampath, filename = split(streampath) + if self._last_open and not self._last_open.closed: + raise errors.AssertionError( + u"can't open '{}' when '{}' is still open".format( + filename, self._last_open.name)) + if streampath != self.current_stream_name(): + self.start_new_stream(streampath) + self.set_current_file_name(filename) + self._last_open = _WriterFile(self, filename) + return self._last_open + + def flush_data(self): + data_buffer = b''.join(self._data_buffer) + if data_buffer: + self._current_stream_locators.append( + self._my_keep().put( + data_buffer[0:config.KEEP_BLOCK_SIZE], + copies=self.replication)) + self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]] + self._data_buffer_len = len(self._data_buffer[0]) + + def start_new_file(self, newfilename=None): + self.finish_current_file() + self.set_current_file_name(newfilename) + + def set_current_file_name(self, newfilename): + if re.search(r'[\t\n]', newfilename): + raise errors.AssertionError( + "Manifest filenames cannot contain whitespace: %s" % + newfilename) + elif re.search(r'\x00', newfilename): + raise errors.AssertionError( + "Manifest filenames cannot contain NUL characters: %s" % + newfilename) + self._current_file_name = newfilename + + def current_file_name(self): + return self._current_file_name + + def finish_current_file(self): + if self._current_file_name is None: + if self._current_file_pos == self._current_stream_length: + return + raise errors.AssertionError( + "Cannot finish an unnamed file " + + "(%d bytes at offset %d in '%s' stream)" % + (self._current_stream_length - self._current_file_pos, + self._current_file_pos, + self._current_stream_name)) + self._current_stream_files.append([ + self._current_file_pos, + self._current_stream_length - self._current_file_pos, + self._current_file_name]) + self._current_file_pos = self._current_stream_length + self._current_file_name = None + + def start_new_stream(self, newstreamname='.'): + self.finish_current_stream() + self.set_current_stream_name(newstreamname) + + def set_current_stream_name(self, newstreamname): + if re.search(r'[\t\n]', newstreamname): + raise errors.AssertionError( + "Manifest stream names cannot contain whitespace: '%s'" % + (newstreamname)) + self._current_stream_name = '.' if newstreamname=='' else newstreamname + + def current_stream_name(self): + return self._current_stream_name + + def finish_current_stream(self): + self.finish_current_file() + self.flush_data() + if not self._current_stream_files: + pass + elif self._current_stream_name is None: + raise errors.AssertionError( + "Cannot finish an unnamed stream (%d bytes in %d files)" % + (self._current_stream_length, len(self._current_stream_files))) + else: + if not self._current_stream_locators: + self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR) + self._finished_streams.append([self._current_stream_name, + self._current_stream_locators, + self._current_stream_files]) + self._current_stream_files = [] + self._current_stream_length = 0 + self._current_stream_locators = [] + self._current_stream_name = None + self._current_file_pos = 0 + self._current_file_name = None + + def finish(self): + """Store the manifest in Keep and return its locator. + + This is useful for storing manifest fragments (task outputs) + temporarily in Keep during a Crunch job. + + In other cases you should make a collection instead, by + sending manifest_text() to the API server's "create + collection" endpoint. + """ + return self._my_keep().put(self.manifest_text().encode(), + copies=self.replication) + + def portable_data_hash(self): + stripped = self.stripped_manifest().encode() + return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped)) + + def manifest_text(self): + self.finish_current_stream() + manifest = '' + + for stream in self._finished_streams: + if not re.search(r'^\.(/.*)?$', stream[0]): + manifest += './' + manifest += stream[0].replace(' ', '\\040') + manifest += ' ' + ' '.join(stream[1]) + manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2]) + manifest += "\n" + + return manifest + + def data_locators(self): + ret = [] + for name, locators, files in self._finished_streams: + ret += locators + return ret + + def save_new(self, name=None): + return self._api_client.collections().create( + ensure_unique_name=True, + body={ + 'name': name, + 'manifest_text': self.manifest_text(), + }).execute(num_retries=self.num_retries) + + +class ResumableCollectionWriter(CollectionWriter): + """CollectionWriter that can serialize internal state to disk + + .. WARNING:: Deprecated + This class is deprecated. Prefer `arvados.collection.Collection` + instead. + """ + + STATE_PROPS = ['_current_stream_files', '_current_stream_length', + '_current_stream_locators', '_current_stream_name', + '_current_file_name', '_current_file_pos', '_close_file', + '_data_buffer', '_dependencies', '_finished_streams', + '_queued_dirents', '_queued_trees'] + + @arvados.util._deprecated('3.0', 'arvados.collection.Collection') + def __init__(self, api_client=None, **kwargs): + self._dependencies = {} + super(ResumableCollectionWriter, self).__init__(api_client, **kwargs) + + @classmethod + def from_state(cls, state, *init_args, **init_kwargs): + # Try to build a new writer from scratch with the given state. + # If the state is not suitable to resume (because files have changed, + # been deleted, aren't predictable, etc.), raise a + # StaleWriterStateError. Otherwise, return the initialized writer. + # The caller is responsible for calling writer.do_queued_work() + # appropriately after it's returned. + writer = cls(*init_args, **init_kwargs) + for attr_name in cls.STATE_PROPS: + attr_value = state[attr_name] + attr_class = getattr(writer, attr_name).__class__ + # Coerce the value into the same type as the initial value, if + # needed. + if attr_class not in (type(None), attr_value.__class__): + attr_value = attr_class(attr_value) + setattr(writer, attr_name, attr_value) + # Check dependencies before we try to resume anything. + if any(KeepLocator(ls).permission_expired() + for ls in writer._current_stream_locators): + raise errors.StaleWriterStateError( + "locators include expired permission hint") + writer.check_dependencies() + if state['_current_file'] is not None: + path, pos = state['_current_file'] + try: + writer._queued_file = open(path, 'rb') + writer._queued_file.seek(pos) + except IOError as error: + raise errors.StaleWriterStateError( + u"failed to reopen active file {}: {}".format(path, error)) + return writer + + def check_dependencies(self): + for path, orig_stat in listitems(self._dependencies): + if not S_ISREG(orig_stat[ST_MODE]): + raise errors.StaleWriterStateError(u"{} not file".format(path)) + try: + now_stat = tuple(os.stat(path)) + except OSError as error: + raise errors.StaleWriterStateError( + u"failed to stat {}: {}".format(path, error)) + if ((not S_ISREG(now_stat[ST_MODE])) or + (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or + (orig_stat[ST_SIZE] != now_stat[ST_SIZE])): + raise errors.StaleWriterStateError(u"{} changed".format(path)) + + def dump_state(self, copy_func=lambda x: x): + state = {attr: copy_func(getattr(self, attr)) + for attr in self.STATE_PROPS} + if self._queued_file is None: + state['_current_file'] = None + else: + state['_current_file'] = (os.path.realpath(self._queued_file.name), + self._queued_file.tell()) + return state + + def _queue_file(self, source, filename=None): + try: + src_path = os.path.realpath(source) + except Exception: + raise errors.AssertionError(u"{} not a file path".format(source)) + try: + path_stat = os.stat(src_path) + except OSError as stat_error: + path_stat = None + super(ResumableCollectionWriter, self)._queue_file(source, filename) + fd_stat = os.fstat(self._queued_file.fileno()) + if not S_ISREG(fd_stat.st_mode): + # We won't be able to resume from this cache anyway, so don't + # worry about further checks. + self._dependencies[source] = tuple(fd_stat) + elif path_stat is None: + raise errors.AssertionError( + u"could not stat {}: {}".format(source, stat_error)) + elif path_stat.st_ino != fd_stat.st_ino: + raise errors.AssertionError( + u"{} changed between open and stat calls".format(source)) + else: + self._dependencies[src_path] = tuple(fd_stat) + + def write(self, data): + if self._queued_file is None: + raise errors.AssertionError( + "resumable writer can't accept unsourced data") + return super(ResumableCollectionWriter, self).write(data)